Skip to content

Commit

Permalink
subscriber: added to channels
Browse files Browse the repository at this point in the history
  • Loading branch information
arthyn committed Feb 12, 2024
1 parent 867fffd commit 3ff34a1
Showing 1 changed file with 54 additions and 21 deletions.
75 changes: 54 additions & 21 deletions desk/app/channels.hoon
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
/- c=channels, g=groups, ha=hark
/- meta
/+ default-agent, verb, dbug, sparse, neg=negotiate
/+ utils=channel-utils, volume
/+ utils=channel-utils, volume, s=subscriber
:: performance, keep warm
/+ channel-json
::
Expand All @@ -24,7 +24,7 @@
|%
+$ card card:agent:gall
+$ current-state
$: %2
$: %3
=v-channels:c
voc=(map [nest:c plan:c] (unit said:c))
pins=(list nest:c) ::TODO vestigial, in groups-ui now, remove me
Expand All @@ -33,6 +33,8 @@
:: .pending-ref-edits: for migration, see also +poke %negotiate-notif
::
pending-ref-edits=(jug ship [=kind:c name=term])
:: delayed resubscribes
=^subs:s
==
--
=| current-state
Expand Down Expand Up @@ -99,30 +101,52 @@
::
++ safe-watch
|= [=wire =dock =path]
|= delay=?
^+ cor
?: (~(has by wex.bowl) wire dock) cor
(emit %pass wire %agent dock %watch path)
=^ card=(unit card) subs
(~(subscribe s [subs bowl]) wire dock path delay)
?~ card cor
(emit u.card)
::
++ load
|= =vase
|^ ^+ cor
=+ !<(old=versioned-state vase)
=? old ?=(%0 -.old) (state-0-to-1 old)
=? old ?=(%1 -.old) (state-1-to-2 old)
?> ?=(%2 -.old)
=? old ?=(%2 -.old) (state-2-to-3 old)
?> ?=(%3 -.old)
=. state old
inflate-io
::
+$ versioned-state $%(state-2 state-1 state-0)
+$ state-2 current-state
+$ versioned-state $%(state-3 state-2 state-1 state-0)
+$ state-3 current-state
::
+$ state-2
$: %2
=v-channels:c
voc=(map [nest:c plan:c] (unit said:c))
pins=(list nest:c) ::TODO vestigial, in groups-ui now, remove me
hidden-posts=(set id-post:c)
::
:: .pending-ref-edits: for migration, see also +poke %negotiate-notif
::
pending-ref-edits=(jug ship [=kind:c name=term])
==
+$ state-1
$: %1
v-channels=(map nest:c v-channel-1)
voc=(map [nest:c plan:c] (unit said:c))
pins=(list nest:c)
hidden-posts=(set id-post:c)
==
++ state-2-to-3
|= s=state-2
^- state-3
%= s - %3
pending-ref-edits [pending-ref-edits.s ~]
==
++ v-channel-1
|^ ,[global local]
+$ global
Expand Down Expand Up @@ -289,13 +313,13 @@
::
:: watch all the subscriptions we expect to have
::
=. cor watch-groups
=. cor (watch-groups |)
::
=. cor
%+ roll
~(tap by v-channels)
|= [[=nest:c *] core=_cor]
ca-abet:ca-safe-sub:(ca-abed:ca-core:core nest)
ca-abet:(ca-safe-sub:(ca-abed:ca-core:core nest) |)
::
cor
::
Expand Down Expand Up @@ -411,7 +435,7 @@
|= [=nest:c =plan:c]
?. (~(has by v-channels) nest)
=/ wire (said-wire nest plan)
(safe-watch wire [ship.nest server] wire)
((safe-watch wire [ship.nest server] wire) |)
::TODO not guaranteed to resolve, we might have partial backlog
ca-abet:(ca-said:(ca-abed:ca-core nest) plan)
::
Expand Down Expand Up @@ -476,7 +500,7 @@
::
[%groups ~]
?+ -.sign !!
%kick watch-groups
%kick (watch-groups &)
%watch-ack
?~ p.sign
cor
Expand Down Expand Up @@ -566,8 +590,11 @@
++ emit |=(=card ca-core(cor (^emit card)))
++ emil |=(caz=(list card) ca-core(cor (^emil caz)))
++ give |=(=gift:agent:gall ca-core(cor (^give gift)))
++ safe-watch |=([=wire =dock =path] ca-core(cor (^safe-watch +<)))
++ ca-perms ~(. perms:utils our.bowl now.bowl nest group.perm.perm.channel)
++ safe-watch
|= [=wire =dock =path]
|= delay=?
ca-core(cor ((^safe-watch wire dock path) delay))
++ ca-abet
%_ cor
v-channels
Expand Down Expand Up @@ -608,7 +635,7 @@
=. last-read.remark.channel now.bowl
=. ca-core ca-give-unread
=. ca-core (ca-response %join group)
ca-safe-sub
(ca-safe-sub |)
::
:: handle an action from the client
::
Expand Down Expand Up @@ -687,19 +714,23 @@
(~(has by wex.bowl) [ca-sub-wire ship.nest dap.bowl])
::
++ ca-safe-sub
|= delay=?
?: ca-has-sub ca-core
?^ posts.channel ca-start-updates
?^ posts.channel (ca-start-updates delay)
=. load.net.channel |
%. delay
%^ safe-watch (weld ca-area /checkpoint) [ship.nest server]
?. =(our.bowl ship.nest)
=/ count ?:(=(%diary kind.nest) '20' '100')
/[kind.nest]/[name.nest]/checkpoint/before/[count]
/[kind.nest]/[name.nest]/checkpoint/time-range/(scot %da *@da)
::
++ ca-start-updates
|= delay=?
:: not most optimal time, should maintain last heard time instead
=/ tim=(unit time)
(bind (ram:on-v-posts:c posts.channel) head)
%. delay
%^ safe-watch ca-sub-wire [ship.nest server]
/[kind.nest]/[name.nest]/updates/(scot %da (fall tim *@da))
::
Expand All @@ -726,7 +757,7 @@
=/ =wire (weld ca-area /create)
(emit %pass wire %agent [our.bowl server] %watch path)
::
%kick ca-safe-sub
%kick (ca-safe-sub &)
%watch-ack
?~ p.sign ca-core
%- (slog leaf+"{<dap.bowl>}: Failed creation" u.p.sign)
Expand All @@ -741,14 +772,14 @@
=. ca-core ca-give-unread
=. ca-core
(emit %pass (weld ca-area /create) %agent [ship.nest server] %leave ~)
ca-safe-sub
(ca-safe-sub |)
==
::
++ ca-take-update
|= =sign:agent:gall
^+ ca-core
?+ -.sign ca-core
%kick ca-safe-sub
%kick (ca-safe-sub &)
%watch-ack
?~ p.sign ca-core
%- (slog leaf+"{<dap.bowl>}: Failed subscription" u.p.sign)
Expand All @@ -767,7 +798,7 @@
^+ ca-core
?+ -.sign ca-core
:: only if kicked prematurely
%kick ?:(load.net.channel ca-core ca-safe-sub)
%kick ?:(load.net.channel ca-core (ca-safe-sub &))
%watch-ack
?~ p.sign ca-core
%- (slog leaf+"{<dap.bowl>}: Failed partial checkpoint" u.p.sign)
Expand All @@ -786,7 +817,7 @@
^+ ca-core
?+ -.sign ca-core
:: only hit if kicked prematurely (we %leave after the first %fact)
%kick ca-sync-backlog
%kick (ca-sync-backlog &)
%watch-ack
?~ p.sign ca-core
%- (slog leaf+"{<dap.bowl>}: Failed backlog" u.p.sign)
Expand All @@ -805,9 +836,9 @@
^+ ca-core
=. load.net.channel &
=. ca-core (ca-apply-checkpoint chk &)
=. ca-core ca-start-updates
=. ca-core (ca-start-updates |)
=. ca-core (ca-fetch-contacts chk)
=. ca-core ca-sync-backlog
=. ca-core (ca-sync-backlog |)
=/ wire (weld ca-area /checkpoint)
(emit %pass wire %agent [ship.nest server] %leave ~)
::
Expand Down Expand Up @@ -848,8 +879,10 @@
ca-core
::
++ ca-sync-backlog
|= delay=?
=/ checkpoint-start (pry:on-v-posts:c posts.channel)
?~ checkpoint-start ca-core
%. delay
%^ safe-watch (weld ca-area /backlog) [ship.nest server]
%+ welp
/[kind.nest]/[name.nest]/checkpoint/time-range
Expand Down Expand Up @@ -1565,7 +1598,7 @@
++ ca-recheck
|= sects=(set sect:g)
:: if our read permissions restored, re-subscribe
?: (can-read:ca-perms our.bowl) ca-safe-sub
?: (can-read:ca-perms our.bowl) (ca-safe-sub |)
ca-core
::
:: assorted helpers
Expand Down

0 comments on commit 3ff34a1

Please sign in to comment.