From 06ba7a6068d8f56290bb3914f6262a5422d539b3 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Tue, 26 Jun 2018 20:51:15 +0200 Subject: [PATCH] Add Watch() method to watch pubsub updates This method returns a channel on which all pubsub updates for a key are sent. Channels are closed when Cancel() is called on a key. Updates are sent to all channels which have registered for a key (one per call to Watch()). --- pubsub.go | 64 ++++++++++++++++++++++++++++++++++++++++++++++ pubsub_test.go | 69 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 133 insertions(+) diff --git a/pubsub.go b/pubsub.go index afd5ba1..391b865 100644 --- a/pubsub.go +++ b/pubsub.go @@ -36,9 +36,17 @@ type PubsubValueStore struct { mx sync.Mutex subs map[string]*floodsub.Subscription + watchMux sync.RWMutex + watch map[string]*watchChannels + Validator record.Validator } +type watchChannels struct { + mux sync.RWMutex + channels []chan []byte +} + // NewPubsubPublisher constructs a new Publisher that publishes IPNS records through pubsub. // The constructor interface is complicated by the need to bootstrap the pubsub topic. // This could be greatly simplified if the pubsub implementation handled bootstrap itself @@ -51,6 +59,7 @@ func NewPubsubValueStore(ctx context.Context, host p2phost.Host, cr routing.Cont ps: ps, Validator: validator, subs: make(map[string]*floodsub.Subscription), + watch: make(map[string]*watchChannels), } } @@ -138,6 +147,23 @@ func (p *PubsubValueStore) Subscribe(key string) error { return nil } +func (p *PubsubValueStore) Watch(key string) <-chan []byte { + p.watchMux.Lock() + defer p.watchMux.Unlock() + wChs, ok := p.watch[key] + if !ok { + wChs = &watchChannels{ + channels: make([]chan []byte, 0), + } + p.watch[key] = wChs + } + newCh := make(chan []byte) + wChs.mux.Lock() + wChs.channels = append(wChs.channels, newCh) + wChs.mux.Unlock() + return newCh +} + func (p *PubsubValueStore) getLocal(key string) ([]byte, error) { dsval, err := p.ds.Get(dshelp.NewKeyFromBinary([]byte(key))) if err != nil { @@ -189,6 +215,8 @@ func (p *PubsubValueStore) Cancel(name string) bool { delete(p.subs, name) } + p.cancelWatchers(name) + return ok } @@ -209,10 +237,46 @@ func (p *PubsubValueStore) handleSubscription(sub *floodsub.Subscription, key st if err != nil { log.Warningf("PubsubResolve: error writing update for %s: %s", key, err) } + p.notifyWatchers(key, msg.GetData()) + } + } +} + +func (p *PubsubValueStore) notifyWatchers(key string, data []byte) { + p.watchMux.RLock() + watchChannels, ok := p.watch[key] + if !ok { + p.watchMux.RUnlock() + return + } + watchChannels.mux.RLock() + p.watchMux.RUnlock() + + defer watchChannels.mux.RUnlock() + for _, ch := range watchChannels.channels { + select { + case ch <- data: + default: } } } +func (p *PubsubValueStore) cancelWatchers(key string) { + p.watchMux.Lock() + defer p.watchMux.Unlock() + watchChannels, ok := p.watch[key] + if !ok { + return + } + + watchChannels.mux.Lock() + for _, ch := range watchChannels.channels { + close(ch) + } + watchChannels.mux.Unlock() + delete(p.watch, key) +} + // rendezvous with peers in the name topic through provider records // Note: rendezvous/boostrap should really be handled by the pubsub implementation itself! func bootstrapPubsub(ctx context.Context, cr routing.ContentRouting, host p2phost.Host, name string) { diff --git a/pubsub_test.go b/pubsub_test.go index 2bd2247..4ecb56b 100644 --- a/pubsub_test.go +++ b/pubsub_test.go @@ -3,6 +3,7 @@ package namesys import ( "bytes" "context" + "sync" "testing" "time" @@ -196,6 +197,74 @@ func TestPubsubPublishSubscribe(t *testing.T) { } } +func TestWatch(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + key := "/namespace/key" + + hosts := newNetHosts(ctx, t, 5) + vss := make([]*PubsubValueStore, len(hosts)) + for i := 0; i < len(vss); i++ { + + fs, err := floodsub.NewFloodSub(ctx, hosts[i]) + if err != nil { + t.Fatal(err) + } + + vss[i] = NewPubsubValueStore(ctx, hosts[i], rhelper.Null{}, fs, testValidator{}) + } + pub := vss[0] + vss = vss[1:] + + pubinfo := hosts[0].Peerstore().PeerInfo(hosts[0].ID()) + for _, h := range hosts[1:] { + if err := h.Connect(ctx, pubinfo); err != nil { + t.Fatal(err) + } + } + + time.Sleep(time.Millisecond * 100) + for i, vs := range vss { + checkNotFound(ctx, t, i, vs, key) + // delay to avoid connection storms + time.Sleep(time.Millisecond * 100) + } + + // let the bootstrap finish + time.Sleep(time.Second * 1) + + ch := pub.Watch(key) + var watched []byte + var wg sync.WaitGroup + go func() { + wg.Add(1) + defer wg.Done() + for v := range ch { + watched = v + } + }() + + err := pub.Subscribe(key) + if err != nil { + t.Fatal(err) + } + time.Sleep(time.Second) + + val := []byte("valid for key 1") + err = pub.PutValue(ctx, key, val) + if err != nil { + t.Fatal(err) + } + + time.Sleep(time.Second * 1) + pub.Cancel(key) + wg.Wait() + if !bytes.Equal(val, watched) { + t.Fatal("should have watched the update") + } +} + func checkNotFound(ctx context.Context, t *testing.T, i int, vs routing.ValueStore, key string) { t.Helper() _, err := vs.GetValue(ctx, key)