Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go-ipfs#8586 added EOL for subscriptions #92

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 90 additions & 9 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (

var log = logging.Logger("pubsub-valuestore")

// DefaultSubscriptionLifetime is the default lifetime for PubSub subscriptions.
const DefaultSubscriptionLifetime = 365 * 24 * time.Hour

// Pubsub is the minimal subset of the pubsub interface required by the pubsub
// value store. This way, users can wrap the underlying pubsub implementation
// without re-exporting/implementing the entire interface.
Expand All @@ -47,6 +50,7 @@ type PubsubValueStore struct {

rebroadcastInitialDelay time.Duration
rebroadcastInterval time.Duration
unusedSubscriptionTTL map[string]time.Duration

// Map of keys to topics
mx sync.Mutex
Expand All @@ -62,6 +66,7 @@ type topicInfo struct {
topic *pubsub.Topic
evts *pubsub.TopicEventHandler
sub *pubsub.Subscription
eol time.Time

cancel context.CancelFunc
finished chan struct{}
Expand Down Expand Up @@ -89,6 +94,7 @@ func NewPubsubValueStore(ctx context.Context, host host.Host, ps Pubsub, validat
host: host,
rebroadcastInitialDelay: 100 * time.Millisecond,
rebroadcastInterval: time.Minute * 10,
unusedSubscriptionTTL: make(map[string]time.Duration),

topics: make(map[string]*topicInfo),
watching: make(map[string]*watchGroup),
Expand Down Expand Up @@ -116,7 +122,7 @@ func (p *PubsubValueStore) PutValue(ctx context.Context, key string, value []byt
return err
}

log.Debugf("PubsubPublish: publish value for key", key)
log.Debugf("PubsubPublish: publish value for key %s", formatKey(key))

p.mx.Lock()
ti, ok := p.topics[key]
Expand Down Expand Up @@ -175,8 +181,14 @@ func (p *PubsubValueStore) Subscribe(key string) error {
defer p.mx.Unlock()

// see if we already have a pubsub subscription; if not, subscribe
_, ok := p.topics[key]
ti, ok := p.topics[key]
if ok {
// bump the EOL deadline
ttl, err := p.getTTLForKey(key)
if err != nil {
return err
}
ti.eol = time.Now().Add(ttl)
return nil
}

Expand All @@ -203,7 +215,7 @@ func (p *PubsubValueStore) Subscribe(key string) error {
return pubsub.ValidationIgnore
})

ti, err := p.createTopicHandler(topic)
ti, err := p.createTopicHandler(topic, key)
if err != nil {
return err
}
Expand All @@ -214,13 +226,13 @@ func (p *PubsubValueStore) Subscribe(key string) error {

go p.handleSubscription(ctx, ti, key)

log.Debugf("PubsubResolve: subscribed to %s", key)
log.Debugf("PubsubResolve: subscribed to %s", formatKey(key))

return nil
}

// createTopicHandler creates an internal topic object. Must be called with p.mx held
func (p *PubsubValueStore) createTopicHandler(topic string) (*topicInfo, error) {
func (p *PubsubValueStore) createTopicHandler(topic string, key string) (*topicInfo, error) {
t, err := p.ps.Join(topic)
if err != nil {
return nil, err
Expand All @@ -238,10 +250,16 @@ func (p *PubsubValueStore) createTopicHandler(topic string) (*topicInfo, error)
_ = t.Close()
}

ttl, err := p.getTTLForKey(key)
if err != nil {
return nil, err
}

ti := &topicInfo{
topic: t,
evts: evts,
sub: sub,
eol: time.Now().Add(ttl),
finished: make(chan struct{}, 1),
}

Expand Down Expand Up @@ -444,6 +462,8 @@ func (p *PubsubValueStore) closeTopic(key string, ti *topicInfo) {
ti.evts.Cancel()
_ = ti.topic.Close()
delete(p.topics, key)

log.Debugf("PubsubResolve: closeTopic %s", formatKey(key))
}

func (p *PubsubValueStore) handleSubscription(ctx context.Context, ti *topicInfo, key string) {
Expand Down Expand Up @@ -472,6 +492,28 @@ func (p *PubsubValueStore) handleSubscription(ctx context.Context, ti *topicInfo
}
}()

eol := make(chan bool)
go func() {
defer close(eol)
timer := time.NewTimer(time.Until(ti.eol))
defer timer.Stop()
for {
select {
case <-timer.C:
// before-or-now
if !ti.eol.After(time.Now()) {
eol <- true
return
}
// EOL deadline changed in the meantime
timer.Reset(time.Until(ti.eol))
case <-ctx.Done():
return
}

}
}()

newPeerData := make(chan []byte)
go func() {
defer close(newPeerData)
Expand Down Expand Up @@ -508,6 +550,9 @@ func (p *PubsubValueStore) handleSubscription(ctx context.Context, ti *topicInfo
if !ok {
return
}
case <-eol:
log.Debugf("PubsubResolve: EOL %s", formatKey(key))
return
case <-ctx.Done():
return
}
Expand All @@ -517,7 +562,7 @@ func (p *PubsubValueStore) handleSubscription(ctx context.Context, ti *topicInfo
ti.dbWriteMx.Unlock()
if recCmp > 0 {
if err != nil {
log.Warnf("PubsubResolve: error writing update for %s: %s", key, err)
log.Warnf("PubsubResolve: error writing update for %s: %s", formatKey(key), err)
}
p.notifyWatchers(key, data)
}
Expand All @@ -528,7 +573,7 @@ func (p *PubsubValueStore) handleNewMsgs(ctx context.Context, sub *pubsub.Subscr
msg, err := sub.Next(ctx)
if err != nil {
if err != context.Canceled {
log.Warnf("PubsubResolve: subscription error in %s: %s", key, err.Error())
log.Warnf("PubsubResolve: subscription error in %s: %s", formatKey(key), err.Error())
}
return nil, err
}
Expand All @@ -540,7 +585,7 @@ func (p *PubsubValueStore) handleNewPeer(ctx context.Context, peerEvtHandler *pu
peerEvt, err := peerEvtHandler.NextPeerEvent(ctx)
if err != nil {
if err != context.Canceled {
log.Warnf("PubsubNewPeer: subscription error in %s: %s", key, err.Error())
log.Warnf("PubsubNewPeer: subscription error in %s: %s", formatKey(key), err.Error())
}
return nil, err
}
Expand All @@ -554,7 +599,7 @@ func (p *PubsubValueStore) handleNewPeer(ctx context.Context, peerEvtHandler *pu
if err == nil {
return value, nil
}
log.Debugf("failed to fetch latest pubsub value for key '%s' from peer '%s': %s", key, pid, err)
log.Debugf("failed to fetch latest pubsub value for key '%s' from peer '%s': %s", formatKey(key), pid, err)
}
return nil, ctx.Err()
}
Expand All @@ -576,6 +621,18 @@ func (p *PubsubValueStore) notifyWatchers(key string, data []byte) {
}
}

func (p *PubsubValueStore) getTTLForKey(key string) (time.Duration, error) {
ns, _, err := record.SplitKey(key)
if err != nil {
return DefaultSubscriptionLifetime, err
}
nsTTL, ok := p.unusedSubscriptionTTL[ns]
if ok {
return nsTTL, nil
}
return DefaultSubscriptionLifetime, nil
}

func WithRebroadcastInterval(duration time.Duration) Option {
return func(store *PubsubValueStore) error {
store.rebroadcastInterval = duration
Expand All @@ -597,3 +654,27 @@ func WithDatastore(datastore ds.Datastore) Option {
return nil
}
}

// WithDatastore returns an option that sets a TTL for a specific namespace.
func WithUnusedSubscriptionTTL(ttl time.Duration, namespace string) Option {
return func(store *PubsubValueStore) error {
store.unusedSubscriptionTTL[namespace] = ttl
return nil
}
}

func formatKey(key string) string {
ns, k, err := record.SplitKey(key)
if err != nil {
log.Error(err)
return key
} else if ns != "ipns" {
return key
}
pid, err := peer.IDFromString(k)
if err != nil {
log.Error(err)
return key
}
return "/ipns/" + peer.Encode(pid)
}
43 changes: 43 additions & 0 deletions pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,49 @@ func TestPutMany(t *testing.T) {
}
}

func TestGC(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

pub, _ := setupTest(ctx, t)
defer pub.host.Close()

// set separate TTLs per namespace
pub.unusedSubscriptionTTL["namespace1"] = time.Millisecond * 50
pub.unusedSubscriptionTTL["namespace2"] = time.Millisecond * 200

// subscribe to both namespaces
key1 := "/namespace1/key"
key2 := "/namespace2/key"
val := []byte("foo")
err := pub.PutValue(ctx, key1, val)
if err != nil {
t.Fatal(err)
}
err = pub.PutValue(ctx, key2, val)
if err != nil {
t.Fatal(err)
}

if len(pub.GetSubscriptions()) != 2 {
t.Fatal("subscriptions not added")
}

// wait 100ms for the 1st GC
time.Sleep(time.Millisecond * 100)

if len(pub.GetSubscriptions()) != 1 {
t.Fatal("first subscription not garbage collected after TTL")
}

// wait 200ms for the 2nd GC
time.Sleep(time.Millisecond * 200)

if len(pub.GetSubscriptions()) != 0 {
t.Fatal("second subscription not garbage collected after TTL")
}
}

func checkNotFound(ctx context.Context, t *testing.T, i int, vs routing.ValueStore, key string) {
t.Helper()
_, err := vs.GetValue(ctx, key)
Expand Down