From bf36a384db4a1df24a22859f68cbeb4adff527c8 Mon Sep 17 00:00:00 2001 From: ttterence927 <65581462+ttterence927@users.noreply.github.com> Date: Fri, 28 May 2021 23:24:17 +0200 Subject: [PATCH 1/2] Update ccxtfeed.py 1. Increase historical ohlcv backfilling speed 2. Fix incorrect ohlcv during live fetching --- ccxtbt/ccxtfeed.py | 110 +++++++++++---------------------------------- 1 file changed, 27 insertions(+), 83 deletions(-) diff --git a/ccxtbt/ccxtfeed.py b/ccxtbt/ccxtfeed.py index eaaa28c..16f3f5a 100644 --- a/ccxtbt/ccxtfeed.py +++ b/ccxtbt/ccxtfeed.py @@ -22,17 +22,13 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) -import time from collections import deque from datetime import datetime import backtrader as bt from backtrader.feed import DataBase from backtrader.utils.py3 import with_metaclass - from .ccxtstore import CCXTStore - - class MetaCCXTFeed(DataBase.__class__): def __init__(cls, name, bases, dct): '''Class has already been created ... register''' @@ -42,7 +38,6 @@ def __init__(cls, name, bases, dct): # Register with the store CCXTStore.DataCls = cls - class CCXTFeed(with_metaclass(MetaCCXTFeed, DataBase)): """ CryptoCurrency eXchange Trading Library Data Feed. @@ -55,37 +50,26 @@ class CCXTFeed(with_metaclass(MetaCCXTFeed, DataBase)): - ``backfill_start`` (default: ``True``) Perform backfilling at the start. The maximum possible historical data will be fetched in a single request. - - Changes From Ed's pacakge - - - Added option to send some additional fetch_ohlcv_params. Some exchanges (e.g Bitmex) - support sending some additional fetch parameters. - - Added drop_newest option to avoid loading incomplete candles where exchanges - do not support sending ohlcv params to prevent returning partial data - """ params = ( ('historical', False), # only historical download ('backfill_start', False), # do backfilling at the start - ('fetch_ohlcv_params', {}), - ('ohlcv_limit', 20), - ('drop_newest', False), - ('debug', False) ) - _store = CCXTStore - # States for the Finite State Machine in _load _ST_LIVE, _ST_HISTORBACK, _ST_OVER = range(3) - # def __init__(self, exchange, symbol, ohlcv_limit=None, config={}, retries=5): def __init__(self, **kwargs): - # self.store = CCXTStore(exchange, config, retries) + self.symbol = self.p.dataname + self.ohlcv_limit = 1000 + self.store = self._store(**kwargs) - self._data = deque() # data queue for price data - self._last_id = '' # last processed trade id for ohlcv - self._last_ts = 0 # last processed timestamp for ohlcv + + self._data = deque() # data queue for price data + self._last_id = '' # last processed trade id for ohlcv + self._last_ts = 0 # last processed timestamp for ohlcv + self._last_ohlc = [] def start(self, ): DataBase.start(self) @@ -93,8 +77,8 @@ def start(self, ): if self.p.fromdate: self._state = self._ST_HISTORBACK self.put_notification(self.DELAYED) - self._fetch_ohlcv(self.p.fromdate) + self._fetch_ohlcv(self.p.fromdate) else: self._state = self._ST_LIVE self.put_notification(self.LIVE) @@ -109,12 +93,7 @@ def _load(self): return self._load_ticks() else: self._fetch_ohlcv() - ret = self._load_ohlcv() - if self.p.debug: - print('---- LOAD ----') - print('{} Load OHLCV Returning: {}'.format(datetime.utcnow(), ret)) - return ret - + return self._load_ohlcv() elif self._state == self._ST_HISTORBACK: ret = self._load_ohlcv() if ret: @@ -142,68 +121,33 @@ def _fetch_ohlcv(self, fromdate=None): else: since = None - limit = self.p.ohlcv_limit + limit = self.ohlcv_limit while True: dlen = len(self._data) - - if self.p.debug: - # TESTING - since_dt = datetime.utcfromtimestamp(since // 1000) if since is not None else 'NA' - print('---- NEW REQUEST ----') - print('{} - Requesting: Since TS {} Since date {} granularity {}, limit {}, params'.format( - datetime.utcnow(), since, since_dt, granularity, limit, self.p.fetch_ohlcv_params)) - data = sorted(self.store.fetch_ohlcv(self.p.dataname, timeframe=granularity, - since=since, limit=limit, params=self.p.fetch_ohlcv_params)) - try: - for i, ohlcv in enumerate(data): - tstamp, open_, high, low, close, volume = ohlcv - print('{} - Data {}: {} - TS {} Time {}'.format(datetime.utcnow(), i, - datetime.utcfromtimestamp(tstamp // 1000), - tstamp, (time.time() * 1000))) - # ------------------------------------------------------------------ - except IndexError: - print('Index Error: Data = {}'.format(data)) - print('---- REQUEST END ----') - else: - - data = sorted(self.store.fetch_ohlcv(self.p.dataname, timeframe=granularity, - since=since, limit=limit, params=self.p.fetch_ohlcv_params)) - - # Check to see if dropping the latest candle will help with - # exchanges which return partial data - if self.p.drop_newest: - del data[-1] - - for ohlcv in data: - - # for ohlcv in sorted(self.store.fetch_ohlcv(self.p.dataname, timeframe=granularity, - # since=since, limit=limit, params=self.p.fetch_ohlcv_params)): - + for ohlcv in sorted(self.store.fetch_ohlcv(self.symbol, timeframe=granularity, + since=since, limit=limit)): if None in ohlcv: continue - - tstamp = ohlcv[0] - - # Prevent from loading incomplete data - # if tstamp > (time.time() * 1000): - # continue - + if len(self._last_ohlc) == 0: + self._last_ohlc = ohlcv + tstamp = ohlcv[0] if tstamp > self._last_ts: - if self.p.debug: - print('Adding: {}'.format(ohlcv)) - self._data.append(ohlcv) + self._data.append(self._last_ohlc) self._last_ts = tstamp + since = tstamp + 1 + + self._last_ohlc = ohlcv if dlen == len(self._data): break def _load_ticks(self): - if self._last_id is None: - # first time get the latest trade only - trades = [self.store.fetch_trades(self.p.dataname)[-1]] + if self._last_id: + trades = self.store.fetch_trades(self.symbol) else: - trades = self.store.fetch_trades(self.p.dataname) + # first time get the latest trade only + trades = [self.store.fetch_trades(self.symbol)[-1]] for trade in trades: trade_id = trade['id'] @@ -216,7 +160,7 @@ def _load_ticks(self): try: trade = self._data.popleft() except IndexError: - return None # no data in the queue + return None # no data in the queue trade_time, price, size = trade @@ -233,7 +177,7 @@ def _load_ohlcv(self): try: ohlcv = self._data.popleft() except IndexError: - return None # no data in the queue + return None # no data in the queue tstamp, open_, high, low, close, volume = ohlcv @@ -252,4 +196,4 @@ def haslivedata(self): return self._state == self._ST_LIVE and self._data def islive(self): - return not self.p.historical + return not self.p.historical \ No newline at end of file From ab74417626b4661fce468c429fc5051dc7fa7dae Mon Sep 17 00:00:00 2001 From: ttterence927 <65581462+ttterence927@users.noreply.github.com> Date: Fri, 28 May 2021 23:29:09 +0200 Subject: [PATCH 2/2] Update ccxtfeed.py 1. Increase historical ohlcv backfilling speed 2. Fix incorrect ohlcv during live fetching --- ccxtbt/ccxtfeed.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ccxtbt/ccxtfeed.py b/ccxtbt/ccxtfeed.py index 16f3f5a..5db133f 100644 --- a/ccxtbt/ccxtfeed.py +++ b/ccxtbt/ccxtfeed.py @@ -55,6 +55,7 @@ class CCXTFeed(with_metaclass(MetaCCXTFeed, DataBase)): params = ( ('historical', False), # only historical download ('backfill_start', False), # do backfilling at the start + ('ohlcv_limit', 1000), ) _store = CCXTStore # States for the Finite State Machine in _load @@ -62,7 +63,6 @@ class CCXTFeed(with_metaclass(MetaCCXTFeed, DataBase)): def __init__(self, **kwargs): self.symbol = self.p.dataname - self.ohlcv_limit = 1000 self.store = self._store(**kwargs) @@ -121,7 +121,7 @@ def _fetch_ohlcv(self, fromdate=None): else: since = None - limit = self.ohlcv_limit + limit = self.p.ohlcv_limit while True: dlen = len(self._data)