Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

subscriptions: kick resub ddos protection #3212

Merged
merged 11 commits into from
Feb 22, 2024
94 changes: 65 additions & 29 deletions desk/app/groups.hoon
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
/- meta
/- e=epic
/+ default-agent, verb, dbug
/+ v=volume
/+ v=volume, s=subscriber
/+ of
/+ epos-lib=saga
:: performance, keep warm
Expand All @@ -14,18 +14,19 @@
+$ card card:agent:gall
++ import-epoch ~2022.10.11
+$ current-state
$: %2
$: %3
groups=net-groups:g
::
$= volume
$: base=level:v
area=(map flag:g level:v) :: override per group
chan=(map nest:g level:v) :: override per channel
chan=(map nest:g level:v) :: override per channel
arthyn marked this conversation as resolved.
Show resolved Hide resolved
==
::
xeno=gangs:g
:: graph -> agent
shoal=(map flag:g dude:gall)
=^subs:s
==
::
--
Expand Down Expand Up @@ -288,12 +289,13 @@
?- -.old
%0 $(old (state-0-to-1 old))
%1 $(old (state-1-to-2 old))
::
%2
%2 $(old (state-2-to-3 old))
::
%3
=. state old
=. cor restore-missing-subs
=. cor (emit %pass /groups/role %agent [our.bowl dap.bowl] %poke noun+!>(%verify-cabals))
=. cor watch-contact
=. cor (watch-contact |)
?: =(okay:g cool) cor
=. cor (emil (drop load:epos))
=/ groups ~(tap in ~(key by groups))
Expand All @@ -304,7 +306,7 @@
go-abet:go-upgrade:(go-abed:group-core i.groups)
$(groups t.groups)
==
+$ versioned-state $%(current-state state-1 state-0)
+$ versioned-state $%(current-state state-2 state-1 state-0)
+$ state-0
$: %0
groups=net-groups:zero
Expand All @@ -327,16 +329,35 @@
shoal=(map flag:zero dude:gall)
==
::
+$ state-2
$: %2
groups=net-groups:g
::
$= volume
$: base=level:v
area=(map flag:g level:v) :: override per group
chan=(map nest:g level:v) :: override per channel
==
::
xeno=gangs:g
:: graph -> agent
shoal=(map flag:g dude:gall)
==
++ state-0-to-1
|= state-0
^- state-1
[%1 groups [*level:v ~ ~] xeno shoal]
::
++ state-1-to-2
|= state-1
^- current-state
^- state-2
[%2 (groups-1-to-2 groups) volume xeno shoal]
::
++ state-2-to-3
|= state-2
^- current-state
[%3 groups volume xeno shoal ~]
arthyn marked this conversation as resolved.
Show resolved Hide resolved
::
++ groups-1-to-2
|= groups=net-groups:zero
^- net-groups:g
Expand Down Expand Up @@ -584,9 +605,25 @@
==
::
++ arvo
|= [=wire sign=sign-arvo]
|= [=(pole knot) sign=sign-arvo]
^+ cor
!!
?+ pole ~|(bad-arvo-take/pole !!)
[%~.~ %cancel-retry rest=*] cor
::
[%~.~ %retry rest=*]
=^ card=(unit card) subs
(~(handle-wakeup s [subs bowl]) pole)
?~ card cor
(emit u.card)
==
::
++ subscribe
|= [=wire =dock =path]
|= delay=?
=^ card=(unit card) subs
(~(subscribe s [subs bowl]) wire dock path delay)
?~ card cor
(emit u.card)
::
++ cast
|= [grp=flag:g gra=flag:g]
Expand Down Expand Up @@ -632,13 +669,13 @@
==
::
++ watch-contact
(emit %pass /contact %agent [our.bowl %contacts] %watch /contact)
(subscribe /contact [our.bowl %contacts] /contact)
::
++ take-contact
|= =sign:agent:gall
?+ -.sign cor
%kick
watch-contact
(watch-contact &)
::
%watch-ack
cor
Expand All @@ -653,20 +690,20 @@
==
::
++ watch-epic
|= her=ship
|= [her=ship delay=?]
^+ cor
=/ =wire /epic
=/ =dock [her dap.bowl]
?: (~(has by wex.bowl) [wire dock])
cor
(emit %pass wire %agent [her dap.bowl] %watch /epic)
((subscribe wire dock wire) delay)
::
++ take-epic
|= =sign:agent:gall
^+ cor
?+ -.sign cor
%kick
(watch-epic src.bowl)
(watch-epic src.bowl &)
::
%fact
?. =(%epic p.cage.sign)
Expand Down Expand Up @@ -928,18 +965,16 @@
^+ go-core
?: |(go-has-sub =(our.bowl p.flag))
go-core
(go-sub init)
(go-sub init |)
::
++ go-sub
|= init=_|
|= [init=_| delay=?]
^+ go-core
=/ =time
?.(?=(%sub -.net) *time p.net)
=/ base=wire (snoc go-area %updates)
=/ =path (snoc base ?:(init %init (scot %da time)))
=/ =card
[%pass base %agent [p.flag dap.bowl] %watch path]
=. cor (emit card)
=. cor ((subscribe base [p.flag dap.bowl] path) delay)
go-core
::
++ go-watch
Expand Down Expand Up @@ -1082,11 +1117,11 @@
++ go-take-update
|= =sign:agent:gall
^+ go-core
?+ -.sign (go-sub |)
?+ -.sign (go-sub | &)
%kick
?> ?=(%sub -.net)
?. ?=(%chi -.saga.net) go-core
(go-sub !load.net)
(go-sub !load.net &)
::
%watch-ack
=? cor (~(has by xeno) flag)
Expand Down Expand Up @@ -1119,7 +1154,7 @@
go-core
~& "took lev epic: {<flag>}"
=. saga.net lev/~
=. cor (watch-epic p.flag)
=. cor (watch-epic p.flag |)
go-core
::
++ go-make-chi
Expand Down Expand Up @@ -1791,8 +1826,9 @@
=/ =action:g [flag now.bowl %cordon %shut %del-ships %ask ships]
(poke-host /rescind act:mar:g !>(action))
++ get-preview
=/ =task:agent:gall [%watch /groups/(scot %p p.flag)/[q.flag]/preview]
(pass-host /preview task)
%^ subscribe (welp ga-area /preview)
[p.flag dap.bowl]
/groups/(scot %p p.flag)/[q.flag]/preview
--
++ ga-start-join
^+ ga-core
Expand All @@ -1816,7 +1852,7 @@
++ ga-watch
|= =(pole knot)
^+ ga-core
=. cor (emit get-preview:ga-pass)
=. cor (get-preview:ga-pass |)
ga-core
::
++ ga-give-update
Expand Down Expand Up @@ -1869,7 +1905,7 @@
%kick
?. (~(has by xeno) flag) ga-core
?^ pev.gang ga-core
ga-core(cor (emit get-preview:ga-pass))
ga-core(cor (get-preview:ga-pass &))
==
::
[%join %add ~]
Expand All @@ -1885,7 +1921,7 @@
=. groups (~(put by groups) flag net group)
::
=. cor
go-abet:(go-sub:(go-abed:group-core flag) &)
go-abet:(go-sub:(go-abed:group-core flag) & |)
ga-core
[%knock ~]
?> ?=(%poke-ack -.sign)
Expand Down Expand Up @@ -1920,7 +1956,7 @@
++ ga-invite
|= =invite:g
=. vit.gang `invite
=. cor (emit get-preview:ga-pass)
=. cor (get-preview:ga-pass |)
=. cor ga-give-update
ga-core
::
Expand Down
43 changes: 43 additions & 0 deletions desk/lib/subscriber.hoon
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
=< subscriber
|%
+$ sub
$: =dock
=path
fires-at=@da
==
::
+$ subs (map wire sub)
::
++ subscriber
|_ [=subs bowl:gall]
++ interval ~s30
++ handle-wakeup
|= =wire
^- [(unit card:agent:gall) _subs]
arthyn marked this conversation as resolved.
Show resolved Hide resolved
?> ?=([%~.~ %retry *] wire)
:: ~& ['waking up' wire]
=/ sub (~(get by subs) t.t.wire)
?~ sub [~ subs]
:- `[%pass t.t.wire %agent dock.u.sub %watch path.u.sub]
(~(del by subs) t.t.wire)
++ subscribe
|= [=wire =dock =path delay=?]
^- [(unit card:agent:gall) _subs]
?: (~(has by subs) wire)
((slog 'Duplicate subscription' >[wire dock]< ~) [~ subs])
?. delay [`[%pass wire %agent dock %watch path] subs]
:: ~& ['subscribing with delay' wire]
=/ fires-at (add now interval)
:_ (~(put by subs) wire [dock path fires-at])
`[%pass (weld /~/retry wire) %arvo %b %wait fires-at]
++ cancel
|= =wire
^- [(unit card:agent:gall) _subs]
=/ sub (~(get by subs) wire)
?~ sub
((slog 'No such subscription' >[wire]< ~) [~ subs])
arthyn marked this conversation as resolved.
Show resolved Hide resolved
:: ~& ['cancelling' wire]
:_ (~(del by subs) wire)
`[%pass (weld /~/cancel-retry wire) %arvo %b %rest fires-at.u.sub]
arthyn marked this conversation as resolved.
Show resolved Hide resolved
--
--
32 changes: 32 additions & 0 deletions desk/tests/app/groups.hoon
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/- g=groups
/+ *test-agent
/= groups-agent /app/groups
|%
++ dap %groups-test
++ the-wire /groups/(scot %p ~zod)/test/updates
++ the-path (weld the-wire /init)
++ the-dock [~zod dap]
++ the-group [%test 'test' '' '' '' [%open ~ ~] ~ |]
++ retry (weld /~/retry the-wire)
++ test-subscription-loop
%- eval-mare
=/ m (mare ,~)
:: init and add group
;< * bind:m (do-init %groups-test groups-agent)
;< * bind:m (jab-bowl |=(b=bowl b(our ~dev, src ~dev)))
;< * bind:m (do-poke %group-join !>([[~zod %test] &]))
;< * bind:m (jab-bowl |=(b=bowl b(our ~dev, src ~zod)))
arthyn marked this conversation as resolved.
Show resolved Hide resolved
;< * bind:m (do-agent /gangs/(scot %p ~zod)/test/join/add the-dock %poke-ack ~)
;< * bind:m (do-agent the-wire the-dock %watch-ack ~)
;< bw=bowl bind:m get-bowl
=/ now=time now.bw
:: kick & resubscribe with delay
;< caz=(list card) bind:m (do-agent the-wire the-dock %kick ~)
=/ next=time (add now ~s30)
;< * bind:m
(ex-cards caz (ex-arvo retry %b %wait next) ~)
;< * bind:m (jab-bowl |=(b=bowl b(now next)))
:: wakeup & resubscribe no delay
;< caz=(list card) bind:m (do-arvo retry %behn %wake ~)
(ex-cards caz (ex-task the-wire the-dock %watch the-path) ~)
--
Loading