diff --git a/.gitignore b/.gitignore index 223c866..7c608e1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +.vscode + # Keys *.pem diff --git a/bitpanda/BitpandaClient.py b/bitpanda/BitpandaClient.py index 8ef5be8..23afd78 100644 --- a/bitpanda/BitpandaClient.py +++ b/bitpanda/BitpandaClient.py @@ -5,6 +5,7 @@ import datetime import pytz import json +import sys from typing import List, Callable, Any from bitpanda.Pair import Pair @@ -147,14 +148,27 @@ async def delete_account_orders(self, pair : Pair = None) -> dict: async def delete_account_order(self, order_id : str) -> dict: return await self._create_delete("account/orders/" + order_id, headers=self._get_header_api_key()) - async def get_candlesticks(self, pair : Pair, unit : enums.TimeUnit, period : str, from_timestamp : datetime.datetime, to_timestamp : datetime.datetime) -> dict: + async def get_candlesticks(self, pair : Pair, unit : enums.TimeUnit, period : str, from_timestamp, to_timestamp) -> dict: + # params = { + # "unit": unit.value, + # "period": period, + # "from": from_timestamp.astimezone(pytz.utc).isoformat(), + # "to": to_timestamp.astimezone(pytz.utc).isoformat(), + # } + + if type(from_timestamp) == datetime.datetime: + from_timestamp = from_timestamp.astimezone(pytz.utc).isoformat() + if type(to_timestamp) == datetime.datetime: + to_timestamp = to_timestamp.astimezone(pytz.utc).isoformat() + params = { "unit": unit.value, "period": period, - "from": from_timestamp.astimezone(pytz.utc).isoformat(), - "to": to_timestamp.astimezone(pytz.utc).isoformat(), + "from": from_timestamp, + "to": to_timestamp, } + return await self._create_get("candlesticks/" + str(pair), params = params) async def get_instruments(self) -> dict: @@ -175,10 +189,17 @@ def compose_subscriptions(self, subscriptions : List[Subscription]) -> None: async def start_subscriptions(self) -> None: if len(self.subscription_sets): - done, pending = await asyncio.wait( - [asyncio.create_task(SubscriptionMgr(subscriptions, self.api_key, self.ssl_context).run()) for subscriptions in self.subscription_sets], - return_when = asyncio.FIRST_EXCEPTION - ) + if sys.version_info.major == 3 and sys.version_info.minor >= 7: + done, pending = await asyncio.wait( + [asyncio.create_task(SubscriptionMgr(subscriptions, self.api_key, self.ssl_context).run()) for subscriptions in self.subscription_sets], + return_when = asyncio.FIRST_EXCEPTION + ) + else: + loop = asyncio.get_event_loop() + done, pending = await asyncio.wait( + [loop.create_task(SubscriptionMgr(subscriptions, self.api_key, self.ssl_context).run()) for subscriptions in self.subscription_sets], + return_when = asyncio.FIRST_EXCEPTION + ) for task in done: try: task.result() diff --git a/bitpanda/Timer.py b/bitpanda/Timer.py index f697fdf..3bcc539 100644 --- a/bitpanda/Timer.py +++ b/bitpanda/Timer.py @@ -8,9 +8,11 @@ def __init__(self, name, active = True): self.name = name self.active = active + self.current_milli_time = lambda: int(round(time.time() * 1000)) + def __enter__(self): - self.start_tmstmp = time.time_ns() + self.start_tmstmp = self.current_milli_time() def __exit__(self, type, value, traceback): if self.active: - LOG.debug(f'Timer {self.name} finished. Took {round((time.time_ns() - self.start_tmstmp) / 1000000, 3)} ms.') \ No newline at end of file + LOG.debug(f'Timer {self.name} finished. Took {(self.current_milli_time() - self.start_tmstmp)} ms.') \ No newline at end of file diff --git a/bitpanda/subscriptions.py b/bitpanda/subscriptions.py index dc1724c..55e3a25 100644 --- a/bitpanda/subscriptions.py +++ b/bitpanda/subscriptions.py @@ -8,6 +8,8 @@ from bitpanda.Pair import Pair from bitpanda import enums +import threading + LOG = logging.getLogger(__name__) class Subscription(ABC): @@ -23,11 +25,11 @@ def get_channel_subscription_message(self) -> dict: pass async def process_message(self, response : dict) -> None: - await self.process_callbacks(response) + return await self.process_callbacks(response) async def process_callbacks(self, response : dict) -> None: if self.callbacks is not None: - await asyncio.gather(*[asyncio.create_task(cb(response)) for cb in self.callbacks]) + return await asyncio.gather(*[asyncio.create_task(cb(response)) for cb in self.callbacks]) @staticmethod def _get_subscription_instrument_codes(pairs : List[Pair]) -> List[str]: @@ -38,6 +40,9 @@ class SubscriptionMgr(object): CONNECTION_REINITIALIZATION_AFTER_ERROR = True MAX_MESSAGE_SIZE = 3 * 1024 * 1024 # 3MB + subscriptionResultAvailable = threading.Event() + subscriptionResult = None + def __init__(self, subscriptions : List[Subscription], api_key : str, ssl_context = None): self.api_key = api_key self.ssl_context = ssl_context @@ -58,7 +63,7 @@ async def run(self) -> None: # start processing incoming messages while True: response = json.loads(await websocket.recv()) - LOG.debug(f"< {response}") + # LOG.debug(f"< {response}") # subscription negative response if "error" in response or response['type'] == "ERROR": @@ -79,7 +84,12 @@ async def run(self) -> None: # regular message else: + SubscriptionMgr.subscriptionResult = response + SubscriptionMgr.subscriptionResultAvailable.set() + await self.process_message(response) + + except websockets.ConnectionClosedError as e: if SubscriptionMgr.CONNECTION_REINITIALIZATION_AFTER_ERROR: LOG.exception(f"ERROR: Connection closed with reason: {e}. Connection will be reinitialized.") @@ -102,7 +112,7 @@ def _create_subscription_message(self) -> dict: async def process_message(self, response : dict) -> None: for subscription in self.subscriptions: if subscription.get_channel_name() == response["channel_name"]: - await subscription.process_message(response) + return await subscription.process_message(response) break class AccountSubscription(Subscription):