Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Low prio] Add Watch() method to watch pubsub updates #3

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,17 @@ type PubsubValueStore struct {
mx sync.Mutex
subs map[string]*floodsub.Subscription

watchMux sync.RWMutex
watch map[string]*watchChannels

Validator record.Validator
}

type watchChannels struct {
mux sync.RWMutex
channels []chan []byte
}

// NewPubsubPublisher constructs a new Publisher that publishes IPNS records through pubsub.
// The constructor interface is complicated by the need to bootstrap the pubsub topic.
// This could be greatly simplified if the pubsub implementation handled bootstrap itself
Expand All @@ -51,6 +59,7 @@ func NewPubsubValueStore(ctx context.Context, host p2phost.Host, cr routing.Cont
ps: ps,
Validator: validator,
subs: make(map[string]*floodsub.Subscription),
watch: make(map[string]*watchChannels),
}
}

Expand Down Expand Up @@ -138,6 +147,23 @@ func (p *PubsubValueStore) Subscribe(key string) error {
return nil
}

func (p *PubsubValueStore) Watch(key string) <-chan []byte {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

godoc?

p.watchMux.Lock()
defer p.watchMux.Unlock()
wChs, ok := p.watch[key]
if !ok {
wChs = &watchChannels{
channels: make([]chan []byte, 0),
}
p.watch[key] = wChs
}
newCh := make(chan []byte)
wChs.mux.Lock()
wChs.channels = append(wChs.channels, newCh)
wChs.mux.Unlock()
return newCh
}

func (p *PubsubValueStore) getLocal(key string) ([]byte, error) {
dsval, err := p.ds.Get(dshelp.NewKeyFromBinary([]byte(key)))
if err != nil {
Expand Down Expand Up @@ -189,6 +215,8 @@ func (p *PubsubValueStore) Cancel(name string) bool {
delete(p.subs, name)
}

p.cancelWatchers(name)

return ok
}

Expand All @@ -209,10 +237,46 @@ func (p *PubsubValueStore) handleSubscription(sub *floodsub.Subscription, key st
if err != nil {
log.Warningf("PubsubResolve: error writing update for %s: %s", key, err)
}
p.notifyWatchers(key, msg.GetData())
}
}
}

func (p *PubsubValueStore) notifyWatchers(key string, data []byte) {
p.watchMux.RLock()
watchChannels, ok := p.watch[key]
if !ok {
p.watchMux.RUnlock()
return
}
watchChannels.mux.RLock()
p.watchMux.RUnlock()

defer watchChannels.mux.RUnlock()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could be wrong but there is no difference between deferring the RUnlock here as opposed to right under the call to RLock? And it would be clearer if the defer was right after the lock. For example:

	watchChannels.mux.RLock()
	defer watchChannels.mux.RUnlock()

	p.watchMux.RUnlock()

for _, ch := range watchChannels.channels {
select {
case ch <- data:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd consider:

  1. Adding a buffer size of 1.
  2. Replacing the buffered value in this case.

That is,

select {
case ch <- data:
case <-ch:
    ch <- data
}

That way, the user always gets the latest value.

default:
}
}
}

func (p *PubsubValueStore) cancelWatchers(key string) {
p.watchMux.Lock()
defer p.watchMux.Unlock()
watchChannels, ok := p.watch[key]
if !ok {
return
}

watchChannels.mux.Lock()
for _, ch := range watchChannels.channels {
close(ch)
}
watchChannels.mux.Unlock()
delete(p.watch, key)
}

// rendezvous with peers in the name topic through provider records
// Note: rendezvous/boostrap should really be handled by the pubsub implementation itself!
func bootstrapPubsub(ctx context.Context, cr routing.ContentRouting, host p2phost.Host, name string) {
Expand Down
69 changes: 69 additions & 0 deletions pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package namesys
import (
"bytes"
"context"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -196,6 +197,74 @@ func TestPubsubPublishSubscribe(t *testing.T) {
}
}

func TestWatch(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

key := "/namespace/key"

hosts := newNetHosts(ctx, t, 5)
vss := make([]*PubsubValueStore, len(hosts))
for i := 0; i < len(vss); i++ {

fs, err := floodsub.NewFloodSub(ctx, hosts[i])
if err != nil {
t.Fatal(err)
}

vss[i] = NewPubsubValueStore(ctx, hosts[i], rhelper.Null{}, fs, testValidator{})
}
pub := vss[0]
vss = vss[1:]

pubinfo := hosts[0].Peerstore().PeerInfo(hosts[0].ID())
for _, h := range hosts[1:] {
if err := h.Connect(ctx, pubinfo); err != nil {
t.Fatal(err)
}
}

time.Sleep(time.Millisecond * 100)
for i, vs := range vss {
checkNotFound(ctx, t, i, vs, key)
// delay to avoid connection storms
time.Sleep(time.Millisecond * 100)
}

// let the bootstrap finish
time.Sleep(time.Second * 1)

ch := pub.Watch(key)
var watched []byte
var wg sync.WaitGroup
go func() {
wg.Add(1)
defer wg.Done()
for v := range ch {
watched = v
}
}()

err := pub.Subscribe(key)
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)

val := []byte("valid for key 1")
err = pub.PutValue(ctx, key, val)
if err != nil {
t.Fatal(err)
}

time.Sleep(time.Second * 1)
pub.Cancel(key)
wg.Wait()
if !bytes.Equal(val, watched) {
t.Fatal("should have watched the update")
}
}

func checkNotFound(ctx context.Context, t *testing.T, i int, vs routing.ValueStore, key string) {
t.Helper()
_, err := vs.GetValue(ctx, key)
Expand Down