Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go-ipfs#8586 added EOL for subscriptions #92

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 53 additions & 7 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (

var log = logging.Logger("pubsub-valuestore")

// DefaultSubscriptionLifetime is the default lifetime for PubSub subscriptions
const DefaultSubscriptionLifetime = time.Hour * 36

// Pubsub is the minimal subset of the pubsub interface required by the pubsub
// value store. This way, users can wrap the underlying pubsub implementation
// without re-exporting/implementing the entire interface.
Expand Down Expand Up @@ -62,6 +65,7 @@ type topicInfo struct {
topic *pubsub.Topic
evts *pubsub.TopicEventHandler
sub *pubsub.Subscription
eol time.Time

cancel context.CancelFunc
finished chan struct{}
Expand Down Expand Up @@ -116,7 +120,7 @@ func (p *PubsubValueStore) PutValue(ctx context.Context, key string, value []byt
return err
}

log.Debugf("PubsubPublish: publish value for key", key)
log.Debugf("PubsubPublish: publish value for key %s", formatKey(key))

p.mx.Lock()
ti, ok := p.topics[key]
Expand Down Expand Up @@ -175,8 +179,10 @@ func (p *PubsubValueStore) Subscribe(key string) error {
defer p.mx.Unlock()

// see if we already have a pubsub subscription; if not, subscribe
_, ok := p.topics[key]
ti, ok := p.topics[key]
if ok {
// bump the EOL deadline
ti.eol = time.Now().Add(DefaultSubscriptionLifetime)
return nil
}

Expand Down Expand Up @@ -214,7 +220,7 @@ func (p *PubsubValueStore) Subscribe(key string) error {

go p.handleSubscription(ctx, ti, key)

log.Debugf("PubsubResolve: subscribed to %s", key)
log.Debugf("PubsubResolve: subscribed to %s", formatKey(key))

return nil
}
Expand Down Expand Up @@ -242,6 +248,7 @@ func (p *PubsubValueStore) createTopicHandler(topic string) (*topicInfo, error)
topic: t,
evts: evts,
sub: sub,
eol: time.Now().Add(DefaultSubscriptionLifetime),
finished: make(chan struct{}, 1),
}

Expand Down Expand Up @@ -444,6 +451,8 @@ func (p *PubsubValueStore) closeTopic(key string, ti *topicInfo) {
ti.evts.Cancel()
_ = ti.topic.Close()
delete(p.topics, key)

log.Debugf("PubsubResolve: closeTopic %s", formatKey(key))
}

func (p *PubsubValueStore) handleSubscription(ctx context.Context, ti *topicInfo, key string) {
Expand Down Expand Up @@ -472,6 +481,28 @@ func (p *PubsubValueStore) handleSubscription(ctx context.Context, ti *topicInfo
}
}()

eol := make(chan bool)
go func() {
defer close(eol)
timer := time.NewTimer(time.Until(ti.eol))
defer timer.Stop()
for {
select {
case <-timer.C:
// before-or-now
if !ti.eol.After(time.Now()) {
eol <- true
return
}
// EOL deadline changed in the meantime
timer.Reset(time.Until(ti.eol))
case <-ctx.Done():
return
}

}
}()

newPeerData := make(chan []byte)
go func() {
defer close(newPeerData)
Expand Down Expand Up @@ -508,6 +539,9 @@ func (p *PubsubValueStore) handleSubscription(ctx context.Context, ti *topicInfo
if !ok {
return
}
case <-eol:
log.Debugf("PubsubResolve: EOL %s", formatKey(key))
return
case <-ctx.Done():
return
}
Expand All @@ -517,7 +551,7 @@ func (p *PubsubValueStore) handleSubscription(ctx context.Context, ti *topicInfo
ti.dbWriteMx.Unlock()
if recCmp > 0 {
if err != nil {
log.Warnf("PubsubResolve: error writing update for %s: %s", key, err)
log.Warnf("PubsubResolve: error writing update for %s: %s", formatKey(key), err)
}
p.notifyWatchers(key, data)
}
Expand All @@ -528,7 +562,7 @@ func (p *PubsubValueStore) handleNewMsgs(ctx context.Context, sub *pubsub.Subscr
msg, err := sub.Next(ctx)
if err != nil {
if err != context.Canceled {
log.Warnf("PubsubResolve: subscription error in %s: %s", key, err.Error())
log.Warnf("PubsubResolve: subscription error in %s: %s", formatKey(key), err.Error())
}
return nil, err
}
Expand All @@ -540,7 +574,7 @@ func (p *PubsubValueStore) handleNewPeer(ctx context.Context, peerEvtHandler *pu
peerEvt, err := peerEvtHandler.NextPeerEvent(ctx)
if err != nil {
if err != context.Canceled {
log.Warnf("PubsubNewPeer: subscription error in %s: %s", key, err.Error())
log.Warnf("PubsubNewPeer: subscription error in %s: %s", formatKey(key), err.Error())
}
return nil, err
}
Expand All @@ -554,7 +588,7 @@ func (p *PubsubValueStore) handleNewPeer(ctx context.Context, peerEvtHandler *pu
if err == nil {
return value, nil
}
log.Debugf("failed to fetch latest pubsub value for key '%s' from peer '%s': %s", key, pid, err)
log.Debugf("failed to fetch latest pubsub value for key '%s' from peer '%s': %s", formatKey(key), pid, err)
}
return nil, ctx.Err()
}
Expand Down Expand Up @@ -597,3 +631,15 @@ func WithDatastore(datastore ds.Datastore) Option {
return nil
}
}

func formatKey(key string) string {
ns, k, err := record.SplitKey(key)
if err != nil || ns != "ipns" {
log.Error(err)
TobiaszCudnik marked this conversation as resolved.
Show resolved Hide resolved
}
pid, err := peer.IDFromString(k)
if err != nil {
log.Error(err)
}
return "/ipns/" + peer.Encode(pid)
}