Skip to content

Commit

Permalink
fix some bug
Browse files Browse the repository at this point in the history
  • Loading branch information
mm2175 committed Jan 13, 2022
1 parent f336e5f commit 97ee8c0
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 39 deletions.
23 changes: 0 additions & 23 deletions v2/websocket/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,6 @@ func (c *Client) Send(ctx context.Context, msg interface{}) error {
return socket.Asynchronous.Send(ctx, msg)
}

// Submit a request to enable the given flag
func (c *Client) EnableFlag(ctx context.Context, flag int) (string, error) {
req := &FlagRequest{
Event: "conf",
Flags: flag,
}
// TODO enable flag on reconnect?
// create sublist to stop concurrent map read
socks := make([]*Socket, len(c.sockets))
c.mtx.RLock()
for i, socket := range c.sockets {
socks[i] = socket
}
c.mtx.RUnlock()
for _, socket := range socks {
err := socket.Asynchronous.Send(ctx, req)
if err != nil {
return "", err
}
}
return "", nil
}

// Gen the count of currently active websocket connections
func (c *Client) ConnectionCount() int {
c.mtx.RLock()
Expand Down
15 changes: 10 additions & 5 deletions v2/websocket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,24 +493,29 @@ func (c *Client) checkResubscription(socketId SocketId) {
if c.parameters.ManageOrderbook {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
_, err_flag := c.EnableFlag(ctx, common.Checksum)
if err_flag != nil {
c.log.Errorf("could not enable checksum flag %s ", err_flag)
req := &FlagRequest{
Event: "conf",
Flags: common.Checksum,
}
if err := socket.Asynchronous.Send(ctx, req); err != nil {
c.log.Errorf("socket(%d) could not enable checksum flag %s ", socket.Id, err)
}
}

if c.parameters.ResubscribeOnReconnect && socket.ResetSubscriptions != nil {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
for _, sub := range socket.ResetSubscriptions {
if sub.Request.Event == "auth" {
continue
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
sub.Request.SubID = c.nonce.GetNonce() // new nonce
c.log.Infof("socket (id=%d) resubscribing to %s with nonce %s", socket.Id, sub.Request.String(), sub.Request.SubID)
_, err := c.subscribeBySocket(ctx, socket, sub.Request)
if err != nil {
c.log.Errorf("could not resubscribe: %s", err.Error())
}
time.Sleep(50 * time.Millisecond)
}
socket.ResetSubscriptions = nil
}
Expand Down
13 changes: 9 additions & 4 deletions v2/websocket/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,15 @@ func (s *subscriptions) heartbeat(chanID int64) {

func (s *subscriptions) sweep(exp time.Time) {
s.lock.RLock()
if !s.hbActive {
s.lock.RUnlock()
return
}

// always heartbeat
/*
if !s.hbActive {
s.lock.RUnlock()
return
}
*/

disconnects := make([]HeartbeatDisconnect, 0)
// use subsBySubID instead of subsByChanID to avoid ineffective heartbeat when re sub err on reconnect
// since subsByChanID is empty when subscription err
Expand Down
8 changes: 1 addition & 7 deletions v2/websocket/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"net"
"net/http"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -182,16 +181,11 @@ func (w *ws) listenWs() {
_, msg, err := w.ws.ReadMessage()
if err != nil {
w.log.Errorf("%s ws read err: %s", w.connStr, err.Error())
// a read during normal shutdown results in an OpError: op on closed connection
if _, ok := err.(*net.OpError); ok {
// general read error on a closed network connection, OK
return
}

w.stop(err)
return
}
w.log.Debugf("%s srv->ws: %s", w.connStr, string(msg))

w.lock.RLock()
if w.downstream == nil {
w.lock.RUnlock()
Expand Down

0 comments on commit 97ee8c0

Please sign in to comment.