Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebSocket Generic Callback Architecture #130

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.idea

3 changes: 3 additions & 0 deletions examples/v2/ws-callbacks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Websocket callbacks

This is a simple example showing off how to connect to the websocket service, subscribe to a few events, and register callbacks for them
56 changes: 56 additions & 0 deletions examples/v2/ws-callbacks/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package main

import (
"context"
"log"
"time"

"github.com/bitfinexcom/bitfinex-api-go/v2"
"github.com/bitfinexcom/bitfinex-api-go/v2/websocket"
)

func bookUpdateCallback(msg interface{}) {
log.Printf("BookUpdate - callback triggered for: %#v", msg)
}

func tradeCallback(msg interface{}) {
log.Printf("Trade ------ callback triggered for: %#v", msg)
}

func main() {
c := websocket.New()

err := c.Connect()
if err != nil {
log.Fatal("Error connecting to web socket : ", err)
}

// register callbacks for book updates and trades
c.RegisterCallback(bitfinex.BookUpdate{}, bookUpdateCallback)
c.RegisterCallback(bitfinex.Trade{}, tradeCallback)

// subscribe to BTCUSD book
ctx, cxl1 := context.WithTimeout(context.Background(), time.Second*5)
defer cxl1()
_, err = c.SubscribeBook(ctx, bitfinex.TradingPrefix+bitfinex.BTCUSD, bitfinex.Precision0, bitfinex.FrequencyRealtime, 25)
if err != nil {
log.Fatal(err)
}

// subscribe to BTCUSD trades
ctx, cxl2 := context.WithTimeout(context.Background(), time.Second*5)
defer cxl2()
_, err = c.SubscribeTrades(ctx, bitfinex.TradingPrefix+bitfinex.BTCUSD)
if err != nil {
log.Fatal(err)
}

for obj := range c.Listen() {
switch obj.(type) {
case error:
log.Printf("channel closed: %s", obj)
break
default:
}
}
}
70 changes: 70 additions & 0 deletions tests/integration/v2/live_websocket_public_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,76 @@ func TestPublicBooks(t *testing.T) {
}
}

func TestPublicCallbacks(t *testing.T) {
c := websocket.New()
wg := sync.WaitGroup{}
wg.Add(3) // 1. Info with version, 2. Subscription event, 3. data message

err := c.Connect()
if err != nil {
t.Fatal("Error connecting to web socket : ", err)
}
defer c.Close()

unSubs := make(chan interface{}, 10)
bookSnaps := make(chan interface{}, 10)
bookUps1 := make(chan interface{}, 10)
bookUps2 := make(chan interface{}, 10)

//Register callbacks for above interfaces
c.RegisterCallback(websocket.UnsubscribeEvent{}, func(m interface{}) { unSubs <- m })
c.RegisterCallback(bitfinex.BookUpdate{}, func(m interface{}) { bookUps1 <- m })
c.RegisterCallback(bitfinex.BookUpdate{}, func(m interface{}) { bookUps2 <- m })
c.RegisterCallback(bitfinex.BookUpdateSnapshot{}, func(m interface{}) { bookSnaps <- m })

errch := make(chan error)
go func() {
for {
select {
case msg := <-c.Listen():
if msg == nil {
return
}
log.Printf("recv msg: %#v", msg)
switch msg.(type) {
case error:
errch <- msg.(error)
default:
t.Logf("test recv: %#v", msg)
}
}
}
}()

ctx, cxl := context.WithTimeout(context.Background(), time.Second*5)
defer cxl()
id, err := c.SubscribeBook(ctx, bitfinex.TradingPrefix+bitfinex.BTCUSD, bitfinex.Precision0, bitfinex.FrequencyRealtime, 1)
if err != nil {
t.Fatal(err)
}

if err := wait2(bookSnaps, 1, errch, 5*time.Second); err != nil {
t.Fatalf("failed to receive book snapshot message from websocket: %s", err)
}

if err := wait2(bookUps1, 1, errch, 5*time.Second); err != nil {
t.Fatalf("failed to receive book update message to 1/2 channels from websocket: %s", err)
}

if err := wait2(bookUps2, 1, errch, 5*time.Second); err != nil {
t.Fatalf("failed to receive book update message to 2/2 channels from websocket: %s", err)
}

err = c.Unsubscribe(ctx, id)
if err != nil {
t.Fatal(err)
}

if err := wait2(unSubs, 1, errch, 5*time.Second); err != nil {
t.Errorf("failed to receive unsubscribe message from websocket: %s", err)
}
}

func TestPublicCandles(t *testing.T) {
c := websocket.New()
wg := sync.WaitGroup{}
Expand Down
111 changes: 108 additions & 3 deletions tests/integration/v2/mock_ws_public_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package tests

import (
"context"
"testing"

bitfinex "github.com/bitfinexcom/bitfinex-api-go/v2"
"github.com/bitfinexcom/bitfinex-api-go/v2"
"github.com/bitfinexcom/bitfinex-api-go/v2/websocket"
"testing"
"time"
)

// method of testing with mocked endpoints
Expand Down Expand Up @@ -82,3 +82,108 @@ func TestTicker(t *testing.T) {
}
assert(t, &websocket.UnsubscribeEvent{ChanID: 5, Status: "OK"}, unsub)
}

func TestMockTickerCallback(t *testing.T) {
// create transport & nonce mocks
async := newTestAsync()
nonce := &IncrementingNonceGenerator{}

// create client
ws := websocket.NewWithAsyncFactoryNonce(newTestAsyncFactory(async), nonce)

// setup listener
listener := newListener()
listener.run(ws.Listen())

// set ws options
ws.Connect()
defer ws.Close()

// register callbacks for messages
callbackInfo := make(chan interface{}, 10)
callbackTick1 := make(chan interface{}, 10)
callbackTick2 := make(chan interface{}, 10)
ws.RegisterCallback(websocket.InfoEvent{}, func (info interface{}) { callbackInfo <- info })
ws.RegisterCallback(bitfinex.Ticker{}, func (tick interface{}) { callbackTick1 <- tick })
ws.RegisterCallback(bitfinex.Ticker{}, func (tick interface{}) { callbackTick2 <- tick })

// info welcome msg
expectedInfoEvent := websocket.InfoEvent{Version: 2}
async.Publish(`{"event":"info","version":2}`)
ev, err := listener.nextInfoEvent()
if err != nil {
t.Fatal(err)
}
assert(t, &expectedInfoEvent, ev)
select {
case val := <-callbackInfo:
assert(t, &expectedInfoEvent, val)
case <-time.After(time.Second):
t.Fatal("Did not receive an info callback")
}


// subscribe
id, err := ws.SubscribeTicker(context.Background(), "tBTCUSD")
if err != nil {
t.Fatal(err)
}

// subscribe ack
async.Publish(`{"event":"subscribed","channel":"ticker","chanId":5,"symbol":"tBTCUSD","subId":"nonce1","pair":"BTCUSD"}`)
sub, err := listener.nextSubscriptionEvent()
if err != nil {
t.Fatal(err)
}
assert(t, &websocket.SubscribeEvent{
SubID: "nonce1",
Channel: "ticker",
ChanID: 5,
Symbol: "tBTCUSD",
Pair: "BTCUSD",
}, sub)

// publish the tick data
expectedTick := bitfinex.Ticker{
Symbol: "tBTCUSD",
Bid: 14957,
Ask: 14958,
BidSize: 68.17328796,
AskSize: 55.29588132,
DailyChange: -659,
DailyChangePerc: -0.0422,
LastPrice: 14971,
Volume: 53723.08813995,
High: 16494,
Low: 14454,
}
async.Publish(`[5,[14957,68.17328796,14958,55.29588132,-659,-0.0422,14971,53723.08813995,16494,14454]]`)
tick, err := listener.nextTick()
if err != nil {
t.Fatal(err)
}
// ensure delivery of the tick matches between serial methods and the multiple callbacks
assert(t, &expectedTick, tick)
select {
case val := <-callbackTick1:
assert(t, &expectedTick, val)
case <-time.After(time.Second):
t.Fatal("Did not receive an tick callback 1/2")
}
select {
case val := <-callbackTick2:
assert(t, &expectedTick, val)
case <-time.After(time.Second):
t.Fatal("Did not receive an tick callback 2/2")
}

// unsubscribe
ws.Unsubscribe(context.Background(), id)
async.Publish(`{"event":"unsubscribed","chanId":5,"status":"OK"}`)
unsub, err := listener.nextUnsubscriptionEvent()
if err != nil {
t.Fatal(err)
}
assert(t, &websocket.UnsubscribeEvent{ChanID: 5, Status: "OK"}, unsub)
}

6 changes: 3 additions & 3 deletions v2/websocket/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func (c *Client) Send(ctx context.Context, msg interface{}) error {
// Subscribe sends a subscription request to the Bitfinex API and tracks the subscription status by ID.
func (c *Client) Subscribe(ctx context.Context, req *SubscriptionRequest) (string, error) {
c.subscriptions.add(req)
err := c.asynchronous.Send(ctx, req)
err := c.Send(ctx, req)
if err != nil {
// propagate send error
return "", err
Expand Down Expand Up @@ -80,12 +80,12 @@ func (c *Client) SubscribeCandles(ctx context.Context, symbol string, resolution

// SubmitOrder sends an order request.
func (c *Client) SubmitOrder(ctx context.Context, order *bitfinex.OrderNewRequest) error {
return c.asynchronous.Send(ctx, order)
return c.Send(ctx, order)
}

// SubmitCancel sends a cancel request.
func (c *Client) SubmitCancel(ctx context.Context, cancel *bitfinex.OrderCancelRequest) error {
return c.asynchronous.Send(ctx, cancel)
return c.Send(ctx, cancel)
}

// LookupSubscription looks up a subscription request by ID
Expand Down
6 changes: 3 additions & 3 deletions v2/websocket/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ func (c *Client) handlePublicChannel(chanID int64, channel, objType string, data
return err
}
if msg != nil {
c.listener <- msg
c.deliverMsg(msg)
}
} else if len(flt) > 1 {
msg, err := factory.BuildSnapshot(chanID, flt)
if err != nil {
return err
}
if msg != nil {
c.listener <- msg
c.deliverMsg(msg)
}
}
} else {
Expand Down Expand Up @@ -108,7 +108,7 @@ func (c *Client) handlePrivateChannel(raw []interface{}) error {
}
// private data is returned as strongly typed data, publish directly
if obj != nil {
c.listener <- obj
c.deliverMsg(obj)
}
}
}
Expand Down
Loading