Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase historical ohlcv backfilling speed AND Fix incorrect ohlcv during live fetching #46

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 26 additions & 82 deletions ccxtbt/ccxtfeed.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,13 @@
from __future__ import (absolute_import, division, print_function,
unicode_literals)

import time
from collections import deque
from datetime import datetime

import backtrader as bt
from backtrader.feed import DataBase
from backtrader.utils.py3 import with_metaclass

from .ccxtstore import CCXTStore


class MetaCCXTFeed(DataBase.__class__):
def __init__(cls, name, bases, dct):
'''Class has already been created ... register'''
Expand All @@ -42,7 +38,6 @@ def __init__(cls, name, bases, dct):
# Register with the store
CCXTStore.DataCls = cls


class CCXTFeed(with_metaclass(MetaCCXTFeed, DataBase)):
"""
CryptoCurrency eXchange Trading Library Data Feed.
Expand All @@ -55,46 +50,35 @@ class CCXTFeed(with_metaclass(MetaCCXTFeed, DataBase)):
- ``backfill_start`` (default: ``True``)
Perform backfilling at the start. The maximum possible historical data
will be fetched in a single request.

Changes From Ed's pacakge

- Added option to send some additional fetch_ohlcv_params. Some exchanges (e.g Bitmex)
support sending some additional fetch parameters.
- Added drop_newest option to avoid loading incomplete candles where exchanges
do not support sending ohlcv params to prevent returning partial data

"""

params = (
('historical', False), # only historical download
('backfill_start', False), # do backfilling at the start
('fetch_ohlcv_params', {}),
('ohlcv_limit', 20),
('drop_newest', False),
('debug', False)
('ohlcv_limit', 1000),
)

_store = CCXTStore

# States for the Finite State Machine in _load
_ST_LIVE, _ST_HISTORBACK, _ST_OVER = range(3)

# def __init__(self, exchange, symbol, ohlcv_limit=None, config={}, retries=5):
def __init__(self, **kwargs):
# self.store = CCXTStore(exchange, config, retries)
self.symbol = self.p.dataname

self.store = self._store(**kwargs)
self._data = deque() # data queue for price data
self._last_id = '' # last processed trade id for ohlcv
self._last_ts = 0 # last processed timestamp for ohlcv

self._data = deque() # data queue for price data
self._last_id = '' # last processed trade id for ohlcv
self._last_ts = 0 # last processed timestamp for ohlcv
self._last_ohlc = []

def start(self, ):
DataBase.start(self)

if self.p.fromdate:
self._state = self._ST_HISTORBACK
self.put_notification(self.DELAYED)
self._fetch_ohlcv(self.p.fromdate)

self._fetch_ohlcv(self.p.fromdate)
else:
self._state = self._ST_LIVE
self.put_notification(self.LIVE)
Expand All @@ -109,12 +93,7 @@ def _load(self):
return self._load_ticks()
else:
self._fetch_ohlcv()
ret = self._load_ohlcv()
if self.p.debug:
print('---- LOAD ----')
print('{} Load OHLCV Returning: {}'.format(datetime.utcnow(), ret))
return ret

return self._load_ohlcv()
elif self._state == self._ST_HISTORBACK:
ret = self._load_ohlcv()
if ret:
Expand Down Expand Up @@ -146,64 +125,29 @@ def _fetch_ohlcv(self, fromdate=None):

while True:
dlen = len(self._data)

if self.p.debug:
# TESTING
since_dt = datetime.utcfromtimestamp(since // 1000) if since is not None else 'NA'
print('---- NEW REQUEST ----')
print('{} - Requesting: Since TS {} Since date {} granularity {}, limit {}, params'.format(
datetime.utcnow(), since, since_dt, granularity, limit, self.p.fetch_ohlcv_params))
data = sorted(self.store.fetch_ohlcv(self.p.dataname, timeframe=granularity,
since=since, limit=limit, params=self.p.fetch_ohlcv_params))
try:
for i, ohlcv in enumerate(data):
tstamp, open_, high, low, close, volume = ohlcv
print('{} - Data {}: {} - TS {} Time {}'.format(datetime.utcnow(), i,
datetime.utcfromtimestamp(tstamp // 1000),
tstamp, (time.time() * 1000)))
# ------------------------------------------------------------------
except IndexError:
print('Index Error: Data = {}'.format(data))
print('---- REQUEST END ----')
else:

data = sorted(self.store.fetch_ohlcv(self.p.dataname, timeframe=granularity,
since=since, limit=limit, params=self.p.fetch_ohlcv_params))

# Check to see if dropping the latest candle will help with
# exchanges which return partial data
if self.p.drop_newest:
del data[-1]

for ohlcv in data:

# for ohlcv in sorted(self.store.fetch_ohlcv(self.p.dataname, timeframe=granularity,
# since=since, limit=limit, params=self.p.fetch_ohlcv_params)):

for ohlcv in sorted(self.store.fetch_ohlcv(self.symbol, timeframe=granularity,
since=since, limit=limit)):
if None in ohlcv:
continue

tstamp = ohlcv[0]

# Prevent from loading incomplete data
# if tstamp > (time.time() * 1000):
# continue

if len(self._last_ohlc) == 0:
self._last_ohlc = ohlcv
tstamp = ohlcv[0]
if tstamp > self._last_ts:
if self.p.debug:
print('Adding: {}'.format(ohlcv))
self._data.append(ohlcv)
self._data.append(self._last_ohlc)
self._last_ts = tstamp
since = tstamp + 1

self._last_ohlc = ohlcv

if dlen == len(self._data):
break

def _load_ticks(self):
if self._last_id is None:
# first time get the latest trade only
trades = [self.store.fetch_trades(self.p.dataname)[-1]]
if self._last_id:
trades = self.store.fetch_trades(self.symbol)
else:
trades = self.store.fetch_trades(self.p.dataname)
# first time get the latest trade only
trades = [self.store.fetch_trades(self.symbol)[-1]]

for trade in trades:
trade_id = trade['id']
Expand All @@ -216,7 +160,7 @@ def _load_ticks(self):
try:
trade = self._data.popleft()
except IndexError:
return None # no data in the queue
return None # no data in the queue

trade_time, price, size = trade

Expand All @@ -233,7 +177,7 @@ def _load_ohlcv(self):
try:
ohlcv = self._data.popleft()
except IndexError:
return None # no data in the queue
return None # no data in the queue

tstamp, open_, high, low, close, volume = ohlcv

Expand All @@ -252,4 +196,4 @@ def haslivedata(self):
return self._state == self._ST_LIVE and self._data

def islive(self):
return not self.p.historical
return not self.p.historical