Skip to content

Commit

Permalink
formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
scientes committed Apr 8, 2021
1 parent 598d76e commit fd232e9
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 51 deletions.
95 changes: 66 additions & 29 deletions src/graph.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import ccxt
from datetime import datetime
from time import sleep, time_ns

import ccxt

class PricePath:

class PricePath:
def __init__(self, exchanges: list = None, gdict: dict = None, cache: dict = None):
if not gdict:
gdict = {}
Expand All @@ -22,29 +22,34 @@ def __init__(self, exchanges: list = None, gdict: dict = None, cache: dict = Non
exchange = exchange_class()
markets = []
markets = exchange.fetch_markets()
if exchange.has['fetchOHLCV']:
if exchange.has["fetchOHLCV"]:

allpairs.extend(
[(i["base"], i["quote"], exchange_id, i["symbol"])for i in markets])
[(i["base"], i["quote"], exchange_id, i["symbol"]) for i in markets]
)
else:
print(
f"{exchange.name} Does not support fetch ohlcv. ignoring exchange and {len(markets)} pairs.")
f"{exchange.name} Does not support fetch ohlcv. ignoring exchange and {len(markets)} pairs."
)
allpairs = list(set(allpairs))
#print("Total Pairs to check:", len(allpairs))
# print("Total Pairs to check:", len(allpairs))
allpairs.sort(key=lambda x: x[3])
for i in allpairs:
base = i[0]
quote = i[1]
self.addVertex(base)
self.addVertex(quote)
self.addEdge(base, quote, {
"exchange": i[2], "symbol": i[3], "inverted": False})
self.addEdge(quote, base, {
"exchange": i[2], "symbol": i[3], "inverted": True})
self.addEdge(
base, quote, {"exchange": i[2], "symbol": i[3], "inverted": False}
)
self.addEdge(
quote, base, {"exchange": i[2], "symbol": i[3], "inverted": True}
)

def edges(self):
return self.findedges()
# Find the distinct list of edges

# Find the distinct list of edges

def findedges(self):
edgename = []
Expand All @@ -57,7 +62,7 @@ def findedges(self):
def getVertices(self):
return list(self.gdict.keys())

# Add the vertex as a key
# Add the vertex as a key
def addVertex(self, vrtx):
if vrtx not in self.gdict:
self.gdict[vrtx] = []
Expand All @@ -73,16 +78,21 @@ def _getpath(self, start, stop, maxdepth, depth=0):
if (edges := self.gdict.get(start)) and maxdepth > depth:
for edge in edges:
if depth == 0 and edge[0] == stop:
paths.append([edge, ])
paths.append(
[
edge,
]
)
elif edge[0] == stop:
paths.append(edge)
else:
path = self._getpath(
edge[0], stop, maxdepth, depth=depth + 1)
path = self._getpath(edge[0], stop, maxdepth, depth=depth + 1)
if len(path) and path is not None:
for p in path:
if p[0] == stop:
newpath = [edge, ]
newpath = [
edge,
]
newpath.append(p)
paths.append(newpath)
return paths
Expand All @@ -94,13 +104,19 @@ def change_prio(self, key, value):
else:
self.priority[ke] = value

def getpath(self, start, stop, starttime=0, stoptime=0, preferredexchange=None, maxdepth=3):
def getpath(
self, start, stop, starttime=0, stoptime=0, preferredexchange=None, maxdepth=3
):
def comb_sort_key(path):
if preferredexchange:
# prioritze pairs with the preferred exchange
volume = 1
volumenew = 0
if not (priority := self.priority.get("-".join([a[1]["symbol"] for a in path]))):
if not (
priority := self.priority.get(
"-".join([a[1]["symbol"] for a in path])
)
):
priority = 0
for c in [a if (a := check_cache(pair)) else None for pair in path]:
if c and c[0]:
Expand All @@ -114,7 +130,17 @@ def comb_sort_key(path):
break
else:
volume = 1 / volumenew
return len(path) + sum([0 if pair[1]["exchange"] == preferredexchange else 1 for pair in path]) + volume + priority
return (
len(path)
+ sum(
[
0 if pair[1]["exchange"] == preferredexchange else 1
for pair in path
]
)
+ volume
+ priority
)
else:
return len(path)

Expand All @@ -131,7 +157,7 @@ def check_cache(pair):

def get_active_timeframe(path, starttimestamp=0, stoptimestamp=-1):
rangeinms = 0
timeframe = int(6.048e+8) # week in ms
timeframe = int(6.048e8) # week in ms
if starttimestamp == 0:
starttimestamp = 1325372400 * 1000
if stoptimestamp == -1:
Expand All @@ -156,19 +182,26 @@ def get_active_timeframe(path, starttimestamp=0, stoptimestamp=-1):
# maybe a more elaborate ratelimit wich counts execution time to waiting
sleep(exchange.rateLimit / 1000)
timeframeexchange = exchange.timeframes.get("1w")
if timeframeexchange: # this must be handled better maybe choose timeframe dynamically
if (
timeframeexchange
): # this must be handled better maybe choose timeframe dynamically
# maybe cache this per pair
ohlcv = exchange.fetch_ohlcv(
path[i][1]["symbol"], "1w", starttimestamp, rangeincandles)
path[i][1]["symbol"], "1w", starttimestamp, rangeincandles
)
else:
ohlcv = [] # do not check fail later
if len(ohlcv) > 1:
# (candle ends after the date + timeframe)
path[i][1]["stoptime"] = ohlcv[-1][0] + timeframe
path[i][1]["avg_vol"] = sum(
[vol[-1] for vol in ohlcv]) / len(ohlcv) # avg vol in curr
path[i][1]["avg_vol"] = sum([vol[-1] for vol in ohlcv]) / len(
ohlcv
) # avg vol in curr
path[i][1]["starttime"] = ohlcv[0][0]
if path[i][1]["stoptime"] < globalstoptime or globalstoptime == 0:
if (
path[i][1]["stoptime"] < globalstoptime
or globalstoptime == 0
):
globalstoptime = path[i][1]["stoptime"]
if path[i][1]["starttime"] > globalstarttime:
globalstarttime = path[i][1]["starttime"]
Expand All @@ -177,10 +210,15 @@ def get_active_timeframe(path, starttimestamp=0, stoptimestamp=-1):
path[i][1]["starttime"] = 0
path[i][1]["avg_vol"] = 0
self.cache[path[i][1]["exchange"] + path[i][1]["symbol"]] = (
path[i][1]["starttime"], path[i][1]["stoptime"], path[i][1]["avg_vol"])
path[i][1]["starttime"],
path[i][1]["stoptime"],
path[i][1]["avg_vol"],
)
else:

if (path[i][1]["stoptime"] < globalstoptime or globalstoptime == 0) and path[i][1]["stoptime"] != 0:
if (
path[i][1]["stoptime"] < globalstoptime or globalstoptime == 0
) and path[i][1]["stoptime"] != 0:
globalstoptime = path[i][1]["stoptime"]
if path[i][1]["starttime"] > globalstarttime:
globalstarttime = path[i][1]["starttime"]
Expand Down Expand Up @@ -215,8 +253,7 @@ def get_active_timeframe(path, starttimestamp=0, stoptimestamp=-1):
start = "IOTA"
to = "EUR"
preferredexchange = "binance"
path = g.getpath(start, to, maxdepth=2,
preferredexchange=preferredexchange)
path = g.getpath(start, to, maxdepth=2, preferredexchange=preferredexchange)
# debug only in actual use we would iterate over the path object fetching new paths as needed
path = list(path)
print(len(path))
81 changes: 60 additions & 21 deletions src/price_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import sqlite3
import time
from pathlib import Path
from typing import Any, Optional, Union
from time import sleep
from typing import Any, Optional, Union

import requests
import ccxt
import requests

import config
import misc
Expand Down Expand Up @@ -427,18 +427,25 @@ def get_cost(
def get_candles(self, start: int, stop: int, symbol: str, exchange: str) -> list:
exchange_class = getattr(ccxt, exchange)
exchange = exchange_class()
if exchange.has['fetchOHLCV']:
if exchange.has["fetchOHLCV"]:
sleep(exchange.rateLimit / 1000) # time.sleep wants seconds
# get 2min before and after range
startval = start - 1000 * 60 * 2
rang = max(int((stop - start) / 1000 / 60) + 2, 1)
return exchange.fetch_ohlcv(symbol, '1m', startval, rang)
return exchange.fetch_ohlcv(symbol, "1m", startval, rang)
else:
log.error(
"fetchOHLCV not implemented on exchange, skipping priceloading using ohlcv")
"fetchOHLCV not implemented on exchange, skipping priceloading using ohlcv"
)
return None

def _get_bulk_pair_data_path(self, operations: list, coin: str, reference_coin: str, preferredexchange: str = "binance") -> list:
def _get_bulk_pair_data_path(
self,
operations: list,
coin: str,
reference_coin: str,
preferredexchange: str = "binance",
) -> list:
def merge_prices(a: list, b: list = None):
prices = []
if not b:
Expand All @@ -454,14 +461,20 @@ def merge_prices(a: list, b: list = None):

timestamps = []
timestamppairs = []
maxminutes = 300 # coinbasepro only allows a max of 300 minutes need a better solution
maxminutes = (
300 # coinbasepro only allows a max of 300 minutes need a better solution
)
timestamps = (op.utc_time for op in operations)
if not preferredexchange:
preferredexchange = "binance"

current_first = None
for timestamp in timestamps:
if current_first and current_first + datetime.timedelta(minutes=maxminutes - 4) > timestamp:
if (
current_first
and current_first + datetime.timedelta(minutes=maxminutes - 4)
> timestamp
):
timestamppairs[-1].append(timestamp)
else:
current_first = timestamp
Expand All @@ -471,12 +484,14 @@ def merge_prices(a: list, b: list = None):
# ccxt works with timestamps in milliseconds
first = misc.to_ms_timestamp(batch[0])
last = misc.to_ms_timestamp(batch[-1])
firststr = batch[0].strftime('%d-%b-%Y (%H:%M)')
laststr = batch[-1].strftime('%d-%b-%Y (%H:%M)')
firststr = batch[0].strftime("%d-%b-%Y (%H:%M)")
laststr = batch[-1].strftime("%d-%b-%Y (%H:%M)")
log.info(
f"getting data from {str(firststr)} to {str(laststr)} for {str(coin)}")
path = self.path.getpath(coin, reference_coin, first,
last, preferredexchange=preferredexchange)
f"getting data from {str(firststr)} to {str(laststr)} for {str(coin)}"
)
path = self.path.getpath(
coin, reference_coin, first, last, preferredexchange=preferredexchange
)
for p in path:
tempdatalis = []
printstr = [a[1]["symbol"] for a in p[1]]
Expand All @@ -489,19 +504,32 @@ def merge_prices(a: list, b: list = None):
candles = self.get_candles(first, last, symbol, exchange)
if invert:
tempdata = list(
map(lambda x: (x[0], 1 / ((x[1] + x[4]) / 2)), candles))
map(lambda x: (x[0], 1 / ((x[1] + x[4]) / 2)), candles)
)
else:
tempdata = list(
map(lambda x: (x[0], (x[1] + x[4]) / 2), candles))
map(lambda x: (x[0], (x[1] + x[4]) / 2), candles)
)

if tempdata:
for operation in batch:
# TODO discuss which candle is picked current is closest to original date (often off by about 1-20s, but can be after the Trade)
# times do not always line up perfectly so take one nearest
ts = list(
map(lambda x: (abs(misc.to_ms_timestamp(operation) * 1000 - x[0]), x), tempdata))
map(
lambda x: (
abs(
misc.to_ms_timestamp(operation) * 1000
- x[0]
),
x,
),
tempdata,
)
)
tempdatalis[i].append(
(operation, min(ts, key=lambda x: x[0])[1][1]))
(operation, min(ts, key=lambda x: x[0])[1][1])
)
else:
tempdatalis = []
# do not try already failed again
Expand All @@ -523,15 +551,22 @@ def merge_prices(a: list, b: list = None):

return datacomb

def preload_price_data_path(self, operations: list, coin: str, exchange: str = None):
def preload_price_data_path(
self, operations: list, coin: str, exchange: str = None
):

reference_coin = config.FIAT
# get pairs used for calculating the price
operations_filtered = []

tablename = self.get_tablename(coin, reference_coin)
operations_filtered = [op for op in operations if not self.__get_price_db(
self.get_db_path(op.platform), tablename, op.utc_time)]
operations_filtered = [
op
for op in operations
if not self.__get_price_db(
self.get_db_path(op.platform), tablename, op.utc_time
)
]
operations_grouped = {}
if operations_filtered:
for i in operations_filtered:
Expand All @@ -543,6 +578,10 @@ def preload_price_data_path(self, operations: list, coin: str, exchange: str = N
operations_grouped[i.platform] = [i]
for platf in operations_grouped.keys():
data = self._get_bulk_pair_data_path(
operations_grouped[platf], coin, reference_coin, preferredexchange=platf)
operations_grouped[platf],
coin,
reference_coin,
preferredexchange=platf,
)
for p in data:
self.set_price_db(platf, coin, reference_coin, p[0], p[1])
2 changes: 1 addition & 1 deletion src/taxman.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def evaluate_taxation(self) -> None:
log.debug("Starting evaluation...")
for coin, operations in misc.group_by(self.book.operations, "coin").items():
operations = sorted(operations, key=lambda op: op.utc_time)
self.price_data.preload_price_data_path(operations,coin)
self.price_data.preload_price_data_path(operations, coin)
self.__evaluate_taxation(coin, operations)

def print_evaluation(self) -> None:
Expand Down

0 comments on commit fd232e9

Please sign in to comment.