Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

subscriptions: kick resub ddos protection #3212

Merged
merged 11 commits into from
Feb 22, 2024
118 changes: 87 additions & 31 deletions desk/app/channels.hoon
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
:: XX chat thread entries can no longer be edited. maybe fix before
:: release?
::
:: note: all subscriptions are handled by the subscriber library so
:: we can have resubscribe loop protection.
::
/- c=channels, g=groups, ha=hark
/- meta
/+ default-agent, verb, dbug, sparse, neg=negotiate
/+ utils=channel-utils, volume
/+ utils=channel-utils, volume, s=subscriber
:: performance, keep warm
/+ channel-json
::
Expand All @@ -24,7 +27,7 @@
|%
+$ card card:agent:gall
+$ current-state
$: %2
$: %3
=v-channels:c
voc=(map [nest:c plan:c] (unit said:c))
pins=(list nest:c) ::TODO vestigial, in groups-ui now, remove me
Expand All @@ -33,6 +36,8 @@
:: .pending-ref-edits: for migration, see also +poke %negotiate-notif
::
pending-ref-edits=(jug ship [=kind:c name=term])
:: delayed resubscribes
=^subs:s
==
--
=| current-state
Expand Down Expand Up @@ -83,8 +88,9 @@
++ on-arvo
|= [=wire sign=sign-arvo]
^- (quip card _this)
~& strange-diary-arvo+wire
`this
=^ cards state
abet:(arvo:cor wire sign)
[cards this]
--
|_ [=bowl:gall cards=(list card)]
++ abet [(flop cards) state]
Expand All @@ -99,30 +105,51 @@
::
++ safe-watch
|= [=wire =dock =path]
|= delay=?
^+ cor
?: (~(has by wex.bowl) wire dock) cor
(emit %pass wire %agent dock %watch path)
=^ caz=(list card) subs
(~(subscribe s [subs bowl]) wire dock path delay)
(emil caz)
::
++ load
|= =vase
|^ ^+ cor
=+ !<(old=versioned-state vase)
=? old ?=(%0 -.old) (state-0-to-1 old)
=? old ?=(%1 -.old) (state-1-to-2 old)
?> ?=(%2 -.old)
=? old ?=(%2 -.old) (state-2-to-3 old)
?> ?=(%3 -.old)
=. state old
inflate-io
::
+$ versioned-state $%(state-2 state-1 state-0)
+$ state-2 current-state
+$ versioned-state $%(state-3 state-2 state-1 state-0)
+$ state-3 current-state
::
+$ state-2
$: %2
=v-channels:c
voc=(map [nest:c plan:c] (unit said:c))
pins=(list nest:c) ::TODO vestigial, in groups-ui now, remove me
hidden-posts=(set id-post:c)
::
:: .pending-ref-edits: for migration, see also +poke %negotiate-notif
::
pending-ref-edits=(jug ship [=kind:c name=term])
==
+$ state-1
$: %1
v-channels=(map nest:c v-channel-1)
voc=(map [nest:c plan:c] (unit said:c))
pins=(list nest:c)
hidden-posts=(set id-post:c)
==
++ state-2-to-3
|= s=state-2
^- state-3
%= s - %3
pending-ref-edits [pending-ref-edits.s *^subs:^s]
==
++ v-channel-1
|^ ,[global local]
+$ global
Expand Down Expand Up @@ -242,6 +269,12 @@
:: happen. that way, local subs get established without issue.
inflate-io
::
++ unsubscribe
|= [=wire =dock]
^+ cor
=^ caz=(list card) subs
(~(unsubscribe s [subs bowl]) wire dock)
(emil caz)
++ inflate-io
:: initiate version negotiation with our own channels-server
::
Expand Down Expand Up @@ -285,17 +318,17 @@
=(wire path)
==
?: keep cor
(emit %pass pole %agent [sub-ship dude] %leave ~)
(unsubscribe pole [sub-ship dude])
::
:: watch all the subscriptions we expect to have
::
=. cor watch-groups
=. cor (watch-groups |)
::
=. cor
%+ roll
~(tap by v-channels)
|= [[=nest:c *] core=_cor]
ca-abet:ca-safe-sub:(ca-abed:ca-core:core nest)
ca-abet:(ca-safe-sub:(ca-abed:ca-core:core nest) |)
::
cor
::
Expand Down Expand Up @@ -411,7 +444,7 @@
|= [=nest:c =plan:c]
?. (~(has by v-channels) nest)
=/ wire (said-wire nest plan)
(safe-watch wire [ship.nest server] wire)
((safe-watch wire [ship.nest server] wire) |)
::TODO not guaranteed to resolve, we might have partial backlog
ca-abet:(ca-said:(ca-abed:ca-core nest) plan)
::
Expand Down Expand Up @@ -476,7 +509,7 @@
::
[%groups ~]
?+ -.sign !!
%kick watch-groups
%kick (watch-groups &)
%watch-ack
?~ p.sign
cor
Expand Down Expand Up @@ -545,6 +578,18 @@
``loob+!>((~(has by v-channels) kind.pole ship name.pole))
==
::
++ arvo
|= [=(pole knot) sign=sign-arvo]
^+ cor
?+ pole ~|(bad-arvo-take/pole !!)
[%~.~ %cancel-retry rest=*] cor
::
[%~.~ %retry rest=*]
=^ caz=(list card) subs
(~(handle-wakeup s [subs bowl]) pole)
(emil caz)
==
::
++ unreads
^- unreads:c
%- ~(gas by *unreads:c)
Expand All @@ -566,8 +611,14 @@
++ emit |=(=card ca-core(cor (^emit card)))
++ emil |=(caz=(list card) ca-core(cor (^emil caz)))
++ give |=(=gift:agent:gall ca-core(cor (^give gift)))
++ safe-watch |=([=wire =dock =path] ca-core(cor (^safe-watch +<)))
++ ca-perms ~(. perms:utils our.bowl now.bowl nest group.perm.perm.channel)
++ safe-watch
|= [=wire =dock =path]
|= delay=?
ca-core(cor ((^safe-watch wire dock path) delay))
++ unsubscribe
|= [=wire =dock]
ca-core(cor (^unsubscribe wire dock))
++ ca-abet
%_ cor
v-channels
Expand Down Expand Up @@ -608,7 +659,7 @@
=. last-read.remark.channel now.bowl
=. ca-core ca-give-unread
=. ca-core (ca-response %join group)
ca-safe-sub
(ca-safe-sub |)
::
:: handle an action from the client
::
Expand Down Expand Up @@ -687,19 +738,23 @@
(~(has by wex.bowl) [ca-sub-wire ship.nest dap.bowl])
::
++ ca-safe-sub
|= delay=?
?: ca-has-sub ca-core
?^ posts.channel ca-start-updates
?^ posts.channel (ca-start-updates delay)
=. load.net.channel |
%. delay
%^ safe-watch (weld ca-area /checkpoint) [ship.nest server]
?. =(our.bowl ship.nest)
=/ count ?:(=(%diary kind.nest) '20' '100')
/[kind.nest]/[name.nest]/checkpoint/before/[count]
/[kind.nest]/[name.nest]/checkpoint/time-range/(scot %da *@da)
::
++ ca-start-updates
|= delay=?
:: not most optimal time, should maintain last heard time instead
=/ tim=(unit time)
(bind (ram:on-v-posts:c posts.channel) head)
%. delay
%^ safe-watch ca-sub-wire [ship.nest server]
/[kind.nest]/[name.nest]/updates/(scot %da (fall tim *@da))
::
Expand All @@ -724,9 +779,9 @@
~
=/ =path /[kind.nest]/[name.nest]/create
=/ =wire (weld ca-area /create)
(emit %pass wire %agent [our.bowl server] %watch path)
((safe-watch wire [our.bowl server] path) |)
::
%kick ca-safe-sub
%kick (ca-safe-sub &)
%watch-ack
?~ p.sign ca-core
%- (slog leaf+"{<dap.bowl>}: Failed creation" u.p.sign)
Expand All @@ -739,16 +794,15 @@
=+ !<(=update:c q.cage)
=. ca-core (ca-u-channels update)
=. ca-core ca-give-unread
=. ca-core
(emit %pass (weld ca-area /create) %agent [ship.nest server] %leave ~)
ca-safe-sub
=. ca-core (unsubscribe (weld ca-area /create) [ship.nest server])
(ca-safe-sub |)
==
::
++ ca-take-update
|= =sign:agent:gall
^+ ca-core
?+ -.sign ca-core
%kick ca-safe-sub
%kick (ca-safe-sub &)
%watch-ack
?~ p.sign ca-core
%- (slog leaf+"{<dap.bowl>}: Failed subscription" u.p.sign)
Expand All @@ -767,7 +821,7 @@
^+ ca-core
?+ -.sign ca-core
:: only if kicked prematurely
%kick ?:(load.net.channel ca-core ca-safe-sub)
%kick ?:(load.net.channel ca-core (ca-safe-sub &))
%watch-ack
?~ p.sign ca-core
%- (slog leaf+"{<dap.bowl>}: Failed partial checkpoint" u.p.sign)
Expand All @@ -786,7 +840,7 @@
^+ ca-core
?+ -.sign ca-core
:: only hit if kicked prematurely (we %leave after the first %fact)
%kick ca-sync-backlog
%kick (ca-sync-backlog &)
%watch-ack
?~ p.sign ca-core
%- (slog leaf+"{<dap.bowl>}: Failed backlog" u.p.sign)
Expand All @@ -805,11 +859,11 @@
^+ ca-core
=. load.net.channel &
=. ca-core (ca-apply-checkpoint chk &)
=. ca-core ca-start-updates
=. ca-core (ca-start-updates |)
=. ca-core (ca-fetch-contacts chk)
=. ca-core ca-sync-backlog
=. ca-core (ca-sync-backlog |)
=/ wire (weld ca-area /checkpoint)
(emit %pass wire %agent [ship.nest server] %leave ~)
(unsubscribe wire [ship.nest server])
::
++ ca-fetch-contacts
|= chk=u-checkpoint:c
Expand Down Expand Up @@ -848,8 +902,10 @@
ca-core
::
++ ca-sync-backlog
|= delay=?
=/ checkpoint-start (pry:on-v-posts:c posts.channel)
?~ checkpoint-start ca-core
%. delay
%^ safe-watch (weld ca-area /backlog) [ship.nest server]
%+ welp
/[kind.nest]/[name.nest]/checkpoint/time-range
Expand All @@ -860,7 +916,7 @@
|= chk=u-checkpoint:c
=. ca-core (ca-apply-checkpoint chk |)
=/ wire (weld ca-area /backlog)
(emit %pass wire %agent [ship.nest server] %leave ~)
(unsubscribe wire [ship.nest server])
::
++ ca-apply-logs
|= =log:c
Expand Down Expand Up @@ -1083,7 +1139,7 @@
=/ =rope:ha (ca-rope -.kind-data.post id-post ~)
?: (was-mentioned:utils content.post our.bowl)
?. (want-hark %mention)
ca-core
ca-core
=/ cs=(list content:ha)
~[[%ship author.post] ' mentioned you: ' (flatten:utils content.post)]
(emit (pass-hark (ca-spin rope cs ~)))
Expand Down Expand Up @@ -1565,7 +1621,7 @@
++ ca-recheck
|= sects=(set sect:g)
:: if our read permissions restored, re-subscribe
?: (can-read:ca-perms our.bowl) ca-safe-sub
?: (can-read:ca-perms our.bowl) (ca-safe-sub |)
ca-core
::
:: assorted helpers
Expand All @@ -1575,7 +1631,7 @@
:: leave the subscription only
::
++ ca-simple-leave
(emit %pass ca-sub-wire %agent [ship.nest server] %leave ~)
(unsubscribe ca-sub-wire [ship.nest server])
::
:: Leave the subscription, tell people about it, and delete our local
:: state for the channel
Expand Down
Loading
Loading