Skip to content
This repository was archived by the owner on Aug 30, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.vscode

# Keys
*.pem

Expand Down
35 changes: 28 additions & 7 deletions bitpanda/BitpandaClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import datetime
import pytz
import json
import sys
from typing import List, Callable, Any

from bitpanda.Pair import Pair
Expand Down Expand Up @@ -147,14 +148,27 @@ async def delete_account_orders(self, pair : Pair = None) -> dict:
async def delete_account_order(self, order_id : str) -> dict:
return await self._create_delete("account/orders/" + order_id, headers=self._get_header_api_key())

async def get_candlesticks(self, pair : Pair, unit : enums.TimeUnit, period : str, from_timestamp : datetime.datetime, to_timestamp : datetime.datetime) -> dict:
async def get_candlesticks(self, pair : Pair, unit : enums.TimeUnit, period : str, from_timestamp, to_timestamp) -> dict:
# params = {
# "unit": unit.value,
# "period": period,
# "from": from_timestamp.astimezone(pytz.utc).isoformat(),
# "to": to_timestamp.astimezone(pytz.utc).isoformat(),
# }

if type(from_timestamp) == datetime.datetime:
from_timestamp = from_timestamp.astimezone(pytz.utc).isoformat()
if type(to_timestamp) == datetime.datetime:
to_timestamp = to_timestamp.astimezone(pytz.utc).isoformat()

params = {
"unit": unit.value,
"period": period,
"from": from_timestamp.astimezone(pytz.utc).isoformat(),
"to": to_timestamp.astimezone(pytz.utc).isoformat(),
"from": from_timestamp,
"to": to_timestamp,
}


return await self._create_get("candlesticks/" + str(pair), params = params)

async def get_instruments(self) -> dict:
Expand All @@ -175,10 +189,17 @@ def compose_subscriptions(self, subscriptions : List[Subscription]) -> None:

async def start_subscriptions(self) -> None:
if len(self.subscription_sets):
done, pending = await asyncio.wait(
[asyncio.create_task(SubscriptionMgr(subscriptions, self.api_key, self.ssl_context).run()) for subscriptions in self.subscription_sets],
return_when = asyncio.FIRST_EXCEPTION
)
if sys.version_info.major == 3 and sys.version_info.minor >= 7:
done, pending = await asyncio.wait(
[asyncio.create_task(SubscriptionMgr(subscriptions, self.api_key, self.ssl_context).run()) for subscriptions in self.subscription_sets],
return_when = asyncio.FIRST_EXCEPTION
)
else:
loop = asyncio.get_event_loop()
done, pending = await asyncio.wait(
[loop.create_task(SubscriptionMgr(subscriptions, self.api_key, self.ssl_context).run()) for subscriptions in self.subscription_sets],
return_when = asyncio.FIRST_EXCEPTION
)
for task in done:
try:
task.result()
Expand Down
6 changes: 4 additions & 2 deletions bitpanda/Timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ def __init__(self, name, active = True):
self.name = name
self.active = active

self.current_milli_time = lambda: int(round(time.time() * 1000))

def __enter__(self):
self.start_tmstmp = time.time_ns()
self.start_tmstmp = self.current_milli_time()

def __exit__(self, type, value, traceback):
if self.active:
LOG.debug(f'Timer {self.name} finished. Took {round((time.time_ns() - self.start_tmstmp) / 1000000, 3)} ms.')
LOG.debug(f'Timer {self.name} finished. Took {(self.current_milli_time() - self.start_tmstmp)} ms.')
18 changes: 14 additions & 4 deletions bitpanda/subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from bitpanda.Pair import Pair
from bitpanda import enums

import threading

LOG = logging.getLogger(__name__)

class Subscription(ABC):
Expand All @@ -23,11 +25,11 @@ def get_channel_subscription_message(self) -> dict:
pass

async def process_message(self, response : dict) -> None:
await self.process_callbacks(response)
return await self.process_callbacks(response)

async def process_callbacks(self, response : dict) -> None:
if self.callbacks is not None:
await asyncio.gather(*[asyncio.create_task(cb(response)) for cb in self.callbacks])
return await asyncio.gather(*[asyncio.create_task(cb(response)) for cb in self.callbacks])

@staticmethod
def _get_subscription_instrument_codes(pairs : List[Pair]) -> List[str]:
Expand All @@ -38,6 +40,9 @@ class SubscriptionMgr(object):
CONNECTION_REINITIALIZATION_AFTER_ERROR = True
MAX_MESSAGE_SIZE = 3 * 1024 * 1024 # 3MB

subscriptionResultAvailable = threading.Event()
subscriptionResult = None

def __init__(self, subscriptions : List[Subscription], api_key : str, ssl_context = None):
self.api_key = api_key
self.ssl_context = ssl_context
Expand All @@ -58,7 +63,7 @@ async def run(self) -> None:
# start processing incoming messages
while True:
response = json.loads(await websocket.recv())
LOG.debug(f"< {response}")
# LOG.debug(f"< {response}")

# subscription negative response
if "error" in response or response['type'] == "ERROR":
Expand All @@ -79,7 +84,12 @@ async def run(self) -> None:

# regular message
else:
SubscriptionMgr.subscriptionResult = response
SubscriptionMgr.subscriptionResultAvailable.set()

await self.process_message(response)


except websockets.ConnectionClosedError as e:
if SubscriptionMgr.CONNECTION_REINITIALIZATION_AFTER_ERROR:
LOG.exception(f"ERROR: Connection closed with reason: {e}. Connection will be reinitialized.")
Expand All @@ -102,7 +112,7 @@ def _create_subscription_message(self) -> dict:
async def process_message(self, response : dict) -> None:
for subscription in self.subscriptions:
if subscription.get_channel_name() == response["channel_name"]:
await subscription.process_message(response)
return await subscription.process_message(response)
break

class AccountSubscription(Subscription):
Expand Down