-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathindex.js
117 lines (86 loc) · 2.52 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
const WebSocket = require('ws');
const crypto = require('crypto');
const EventEmitter = require('events');
class Connection extends EventEmitter {
constructor(url = 'wss://ws.kraken.com') {
super();
this.url = url;
this.connected = false;
this.pairs = {};
this.lastMessageAt = 0;
}
disconnect() {
this.ws.disconnect();
}
connect() {
if(this.connected) {
return;
}
let readyHook;
this.onOpen = new Promise(r => readyHook = r);
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
this.connected = true;
readyHook();
}
this.ws.onerror = e => {
console.log(new Date, '[KRAKEN] error', e);
}
this.ws.onclose = e => {
console.log(new Date, '[KRAKEN] close', e);
}
// initial book data coming in on the same tick as the subscription data
// we defer this so the subscription promise resloves before we send
// initial OB data.
this.ws.onmessage = e => setImmediate(() => this.handleMessage(e));
return this.onOpen;
}
handleMessage = e => {
this.lastMessageAt = +new Date;
const payload = JSON.parse(e.data);
if(Array.isArray(payload)) {
this.emit('channel:' + payload[0], payload);
} else {
if(payload.event === 'subscriptionStatus' && payload.status === 'subscribed') {
if(this.pairs[payload.pair]) {
this.pairs[payload.pair].id = payload.channelID;
this.pairs[payload.pair].onReady(payload.channelID);
} else {
console.log(new Date, '[KRAKEN] received subscription event for unknown subscription', payload);
}
return;
}
this.emit('message', payload);
}
}
subscribe(pair, subscription, options) {
if(this.pairs[pair] && this.pairs[pair].subscriptions.includes(subscription)) {
console.log(new Date, '[KRAKEN] refusing to subscribe to subscription twice', {pair, subscription});
return;
}
let hook;
let onReady = new Promise(r => hook = r);
if(!this.pairs[pair]) {
this.pairs[pair] = {
subscriptions: [],
onReady: hook
};
}
this.pairs[pair].subscriptions.push(subscription);
this._subscribe(pair, subscription, options);
return onReady;
}
_subscribe(pair, subscription, options = {}) {
this.ws.send(JSON.stringify(
{
"event": "subscribe",
"pair": [ pair ],
"subscription": {
"name": subscription,
...options
}
}
));
}
}
module.exports = Connection;