diff --git a/src/graph.py b/src/graph.py index 7e49b596..f9d98271 100644 --- a/src/graph.py +++ b/src/graph.py @@ -1,10 +1,10 @@ -import ccxt from datetime import datetime from time import sleep, time_ns +import ccxt -class PricePath: +class PricePath: def __init__(self, exchanges: list = None, gdict: dict = None, cache: dict = None): if not gdict: gdict = {} @@ -22,29 +22,34 @@ def __init__(self, exchanges: list = None, gdict: dict = None, cache: dict = Non exchange = exchange_class() markets = [] markets = exchange.fetch_markets() - if exchange.has['fetchOHLCV']: + if exchange.has["fetchOHLCV"]: allpairs.extend( - [(i["base"], i["quote"], exchange_id, i["symbol"])for i in markets]) + [(i["base"], i["quote"], exchange_id, i["symbol"]) for i in markets] + ) else: print( - f"{exchange.name} Does not support fetch ohlcv. ignoring exchange and {len(markets)} pairs.") + f"{exchange.name} Does not support fetch ohlcv. ignoring exchange and {len(markets)} pairs." + ) allpairs = list(set(allpairs)) - #print("Total Pairs to check:", len(allpairs)) + # print("Total Pairs to check:", len(allpairs)) allpairs.sort(key=lambda x: x[3]) for i in allpairs: base = i[0] quote = i[1] self.addVertex(base) self.addVertex(quote) - self.addEdge(base, quote, { - "exchange": i[2], "symbol": i[3], "inverted": False}) - self.addEdge(quote, base, { - "exchange": i[2], "symbol": i[3], "inverted": True}) + self.addEdge( + base, quote, {"exchange": i[2], "symbol": i[3], "inverted": False} + ) + self.addEdge( + quote, base, {"exchange": i[2], "symbol": i[3], "inverted": True} + ) def edges(self): return self.findedges() -# Find the distinct list of edges + + # Find the distinct list of edges def findedges(self): edgename = [] @@ -57,7 +62,7 @@ def findedges(self): def getVertices(self): return list(self.gdict.keys()) -# Add the vertex as a key + # Add the vertex as a key def addVertex(self, vrtx): if vrtx not in self.gdict: self.gdict[vrtx] = [] @@ -73,16 +78,21 @@ def _getpath(self, start, stop, maxdepth, depth=0): if (edges := self.gdict.get(start)) and maxdepth > depth: for edge in edges: if depth == 0 and edge[0] == stop: - paths.append([edge, ]) + paths.append( + [ + edge, + ] + ) elif edge[0] == stop: paths.append(edge) else: - path = self._getpath( - edge[0], stop, maxdepth, depth=depth + 1) + path = self._getpath(edge[0], stop, maxdepth, depth=depth + 1) if len(path) and path is not None: for p in path: if p[0] == stop: - newpath = [edge, ] + newpath = [ + edge, + ] newpath.append(p) paths.append(newpath) return paths @@ -94,13 +104,19 @@ def change_prio(self, key, value): else: self.priority[ke] = value - def getpath(self, start, stop, starttime=0, stoptime=0, preferredexchange=None, maxdepth=3): + def getpath( + self, start, stop, starttime=0, stoptime=0, preferredexchange=None, maxdepth=3 + ): def comb_sort_key(path): if preferredexchange: # prioritze pairs with the preferred exchange volume = 1 volumenew = 0 - if not (priority := self.priority.get("-".join([a[1]["symbol"] for a in path]))): + if not ( + priority := self.priority.get( + "-".join([a[1]["symbol"] for a in path]) + ) + ): priority = 0 for c in [a if (a := check_cache(pair)) else None for pair in path]: if c and c[0]: @@ -114,7 +130,17 @@ def comb_sort_key(path): break else: volume = 1 / volumenew - return len(path) + sum([0 if pair[1]["exchange"] == preferredexchange else 1 for pair in path]) + volume + priority + return ( + len(path) + + sum( + [ + 0 if pair[1]["exchange"] == preferredexchange else 1 + for pair in path + ] + ) + + volume + + priority + ) else: return len(path) @@ -131,7 +157,7 @@ def check_cache(pair): def get_active_timeframe(path, starttimestamp=0, stoptimestamp=-1): rangeinms = 0 - timeframe = int(6.048e+8) # week in ms + timeframe = int(6.048e8) # week in ms if starttimestamp == 0: starttimestamp = 1325372400 * 1000 if stoptimestamp == -1: @@ -156,19 +182,26 @@ def get_active_timeframe(path, starttimestamp=0, stoptimestamp=-1): # maybe a more elaborate ratelimit wich counts execution time to waiting sleep(exchange.rateLimit / 1000) timeframeexchange = exchange.timeframes.get("1w") - if timeframeexchange: # this must be handled better maybe choose timeframe dynamically + if ( + timeframeexchange + ): # this must be handled better maybe choose timeframe dynamically # maybe cache this per pair ohlcv = exchange.fetch_ohlcv( - path[i][1]["symbol"], "1w", starttimestamp, rangeincandles) + path[i][1]["symbol"], "1w", starttimestamp, rangeincandles + ) else: ohlcv = [] # do not check fail later if len(ohlcv) > 1: # (candle ends after the date + timeframe) path[i][1]["stoptime"] = ohlcv[-1][0] + timeframe - path[i][1]["avg_vol"] = sum( - [vol[-1] for vol in ohlcv]) / len(ohlcv) # avg vol in curr + path[i][1]["avg_vol"] = sum([vol[-1] for vol in ohlcv]) / len( + ohlcv + ) # avg vol in curr path[i][1]["starttime"] = ohlcv[0][0] - if path[i][1]["stoptime"] < globalstoptime or globalstoptime == 0: + if ( + path[i][1]["stoptime"] < globalstoptime + or globalstoptime == 0 + ): globalstoptime = path[i][1]["stoptime"] if path[i][1]["starttime"] > globalstarttime: globalstarttime = path[i][1]["starttime"] @@ -177,10 +210,15 @@ def get_active_timeframe(path, starttimestamp=0, stoptimestamp=-1): path[i][1]["starttime"] = 0 path[i][1]["avg_vol"] = 0 self.cache[path[i][1]["exchange"] + path[i][1]["symbol"]] = ( - path[i][1]["starttime"], path[i][1]["stoptime"], path[i][1]["avg_vol"]) + path[i][1]["starttime"], + path[i][1]["stoptime"], + path[i][1]["avg_vol"], + ) else: - if (path[i][1]["stoptime"] < globalstoptime or globalstoptime == 0) and path[i][1]["stoptime"] != 0: + if ( + path[i][1]["stoptime"] < globalstoptime or globalstoptime == 0 + ) and path[i][1]["stoptime"] != 0: globalstoptime = path[i][1]["stoptime"] if path[i][1]["starttime"] > globalstarttime: globalstarttime = path[i][1]["starttime"] @@ -215,8 +253,7 @@ def get_active_timeframe(path, starttimestamp=0, stoptimestamp=-1): start = "IOTA" to = "EUR" preferredexchange = "binance" - path = g.getpath(start, to, maxdepth=2, - preferredexchange=preferredexchange) + path = g.getpath(start, to, maxdepth=2, preferredexchange=preferredexchange) # debug only in actual use we would iterate over the path object fetching new paths as needed path = list(path) print(len(path)) diff --git a/src/price_data.py b/src/price_data.py index 40a0071a..4900de3e 100644 --- a/src/price_data.py +++ b/src/price_data.py @@ -22,11 +22,11 @@ import sqlite3 import time from pathlib import Path -from typing import Any, Optional, Union from time import sleep +from typing import Any, Optional, Union -import requests import ccxt +import requests import config import misc @@ -427,18 +427,25 @@ def get_cost( def get_candles(self, start: int, stop: int, symbol: str, exchange: str) -> list: exchange_class = getattr(ccxt, exchange) exchange = exchange_class() - if exchange.has['fetchOHLCV']: + if exchange.has["fetchOHLCV"]: sleep(exchange.rateLimit / 1000) # time.sleep wants seconds # get 2min before and after range startval = start - 1000 * 60 * 2 rang = max(int((stop - start) / 1000 / 60) + 2, 1) - return exchange.fetch_ohlcv(symbol, '1m', startval, rang) + return exchange.fetch_ohlcv(symbol, "1m", startval, rang) else: log.error( - "fetchOHLCV not implemented on exchange, skipping priceloading using ohlcv") + "fetchOHLCV not implemented on exchange, skipping priceloading using ohlcv" + ) return None - def _get_bulk_pair_data_path(self, operations: list, coin: str, reference_coin: str, preferredexchange: str = "binance") -> list: + def _get_bulk_pair_data_path( + self, + operations: list, + coin: str, + reference_coin: str, + preferredexchange: str = "binance", + ) -> list: def merge_prices(a: list, b: list = None): prices = [] if not b: @@ -454,14 +461,20 @@ def merge_prices(a: list, b: list = None): timestamps = [] timestamppairs = [] - maxminutes = 300 # coinbasepro only allows a max of 300 minutes need a better solution + maxminutes = ( + 300 # coinbasepro only allows a max of 300 minutes need a better solution + ) timestamps = (op.utc_time for op in operations) if not preferredexchange: preferredexchange = "binance" current_first = None for timestamp in timestamps: - if current_first and current_first + datetime.timedelta(minutes=maxminutes - 4) > timestamp: + if ( + current_first + and current_first + datetime.timedelta(minutes=maxminutes - 4) + > timestamp + ): timestamppairs[-1].append(timestamp) else: current_first = timestamp @@ -471,12 +484,14 @@ def merge_prices(a: list, b: list = None): # ccxt works with timestamps in milliseconds first = misc.to_ms_timestamp(batch[0]) last = misc.to_ms_timestamp(batch[-1]) - firststr = batch[0].strftime('%d-%b-%Y (%H:%M)') - laststr = batch[-1].strftime('%d-%b-%Y (%H:%M)') + firststr = batch[0].strftime("%d-%b-%Y (%H:%M)") + laststr = batch[-1].strftime("%d-%b-%Y (%H:%M)") log.info( - f"getting data from {str(firststr)} to {str(laststr)} for {str(coin)}") - path = self.path.getpath(coin, reference_coin, first, - last, preferredexchange=preferredexchange) + f"getting data from {str(firststr)} to {str(laststr)} for {str(coin)}" + ) + path = self.path.getpath( + coin, reference_coin, first, last, preferredexchange=preferredexchange + ) for p in path: tempdatalis = [] printstr = [a[1]["symbol"] for a in p[1]] @@ -489,19 +504,32 @@ def merge_prices(a: list, b: list = None): candles = self.get_candles(first, last, symbol, exchange) if invert: tempdata = list( - map(lambda x: (x[0], 1 / ((x[1] + x[4]) / 2)), candles)) + map(lambda x: (x[0], 1 / ((x[1] + x[4]) / 2)), candles) + ) else: tempdata = list( - map(lambda x: (x[0], (x[1] + x[4]) / 2), candles)) + map(lambda x: (x[0], (x[1] + x[4]) / 2), candles) + ) if tempdata: for operation in batch: # TODO discuss which candle is picked current is closest to original date (often off by about 1-20s, but can be after the Trade) # times do not always line up perfectly so take one nearest ts = list( - map(lambda x: (abs(misc.to_ms_timestamp(operation) * 1000 - x[0]), x), tempdata)) + map( + lambda x: ( + abs( + misc.to_ms_timestamp(operation) * 1000 + - x[0] + ), + x, + ), + tempdata, + ) + ) tempdatalis[i].append( - (operation, min(ts, key=lambda x: x[0])[1][1])) + (operation, min(ts, key=lambda x: x[0])[1][1]) + ) else: tempdatalis = [] # do not try already failed again @@ -523,15 +551,22 @@ def merge_prices(a: list, b: list = None): return datacomb - def preload_price_data_path(self, operations: list, coin: str, exchange: str = None): + def preload_price_data_path( + self, operations: list, coin: str, exchange: str = None + ): reference_coin = config.FIAT # get pairs used for calculating the price operations_filtered = [] tablename = self.get_tablename(coin, reference_coin) - operations_filtered = [op for op in operations if not self.__get_price_db( - self.get_db_path(op.platform), tablename, op.utc_time)] + operations_filtered = [ + op + for op in operations + if not self.__get_price_db( + self.get_db_path(op.platform), tablename, op.utc_time + ) + ] operations_grouped = {} if operations_filtered: for i in operations_filtered: @@ -543,6 +578,10 @@ def preload_price_data_path(self, operations: list, coin: str, exchange: str = N operations_grouped[i.platform] = [i] for platf in operations_grouped.keys(): data = self._get_bulk_pair_data_path( - operations_grouped[platf], coin, reference_coin, preferredexchange=platf) + operations_grouped[platf], + coin, + reference_coin, + preferredexchange=platf, + ) for p in data: self.set_price_db(platf, coin, reference_coin, p[0], p[1]) diff --git a/src/taxman.py b/src/taxman.py index 68ea07f8..1df6a84e 100644 --- a/src/taxman.py +++ b/src/taxman.py @@ -178,7 +178,7 @@ def evaluate_taxation(self) -> None: log.debug("Starting evaluation...") for coin, operations in misc.group_by(self.book.operations, "coin").items(): operations = sorted(operations, key=lambda op: op.utc_time) - self.price_data.preload_price_data_path(operations,coin) + self.price_data.preload_price_data_path(operations, coin) self.__evaluate_taxation(coin, operations) def print_evaluation(self) -> None: