From 7e7f83a025b22c2db454f2e1a1fdff457fa39a30 Mon Sep 17 00:00:00 2001 From: 1yam Date: Mon, 30 Sep 2024 13:33:44 +0200 Subject: [PATCH 01/12] Feature: Internal account management + fix on _load_account to handle SolAccount --- pyproject.toml | 1 + src/aleph/sdk/account.py | 139 +++++++++++++++++++++++++- src/aleph/sdk/chains/common.py | 15 +++ src/aleph/sdk/conf.py | 17 ++-- src/aleph/sdk/types.py | 16 ++- src/aleph/sdk/utils.py | 171 +++++++++++++++++++++++++++++++- tests/unit/test_chain_solana.py | 98 ++++++++++++++++++ tests/unit/test_utils.py | 109 ++++++++++++++++++++ 8 files changed, 551 insertions(+), 15 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 2cffe116..f375a0ff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ dependencies = [ "aleph-superfluid>=0.2.1", "eth_typing==4.3.1", "web3==6.3.0", + "aiofiles>=24.1.0", ] [project.optional-dependencies] diff --git a/src/aleph/sdk/account.py b/src/aleph/sdk/account.py index 59eef815..73b928c1 100644 --- a/src/aleph/sdk/account.py +++ b/src/aleph/sdk/account.py @@ -1,18 +1,99 @@ import asyncio +import json import logging from pathlib import Path -from typing import Optional, Type, TypeVar +from typing import Dict, List, Optional, Type, TypeVar, Union, overload + +import base58 +from aleph_message.models import Chain from aleph.sdk.chains.common import get_fallback_private_key from aleph.sdk.chains.ethereum import ETHAccount from aleph.sdk.chains.remote import RemoteAccount +from aleph.sdk.chains.solana import SOLAccount from aleph.sdk.conf import settings from aleph.sdk.types import AccountFromPrivateKey +from aleph.sdk.utils import parse_solana_private_key, solana_private_key_from_bytes logger = logging.getLogger(__name__) T = TypeVar("T", bound=AccountFromPrivateKey) +CHAIN_TO_ACCOUNT_MAP: Dict[Chain, Type[AccountFromPrivateKey]] = { + Chain.ETH: ETHAccount, + Chain.AVAX: ETHAccount, + Chain.SOL: SOLAccount, + Chain.BASE: ETHAccount, +} + + +def detect_chain_from_private_key(private_key: Union[str, List[int], bytes]) -> Chain: + """ + Detect the blockchain chain based on the private key format. + - Chain.ETH for Ethereum (EVM) private keys + - Chain.SOL for Solana private keys (base58 or uint8 format). + + Raises: + ValueError: If the private key format is invalid or not recognized. + """ + if isinstance(private_key, (str, bytes)) and is_valid_private_key( + private_key, ETHAccount + ): + return Chain.ETH + + elif is_valid_private_key(private_key, SOLAccount): + return Chain.SOL + + else: + raise ValueError("Unsupported private key format. Unable to detect chain.") + + +@overload +def is_valid_private_key( + private_key: Union[str, bytes], account_type: Type[ETHAccount] +) -> bool: ... + + +@overload +def is_valid_private_key( + private_key: Union[str, List[int], bytes], account_type: Type[SOLAccount] +) -> bool: ... + + +def is_valid_private_key( + private_key: Union[str, List[int], bytes], account_type: Type[T] +) -> bool: + """ + Check if the private key is valid for either Ethereum or Solana based on the account type. + """ + try: + if account_type == ETHAccount: + # Handle Ethereum private key validation + if isinstance(private_key, str): + if private_key.startswith("0x"): + private_key = private_key[2:] + private_key = bytes.fromhex(private_key) + elif isinstance(private_key, list): + raise ValueError("Ethereum keys cannot be a list of integers") + + account_type(private_key) + + elif account_type == SOLAccount: + # Handle Solana private key validation + if isinstance(private_key, bytes): + return len(private_key) == 64 + elif isinstance(private_key, str): + decoded_key = base58.b58decode(private_key) + return len(decoded_key) == 64 + elif isinstance(private_key, list): + return len(private_key) == 64 and all( + isinstance(i, int) and 0 <= i <= 255 for i in private_key + ) + + return True + except Exception: + return False + def account_from_hex_string(private_key_str: str, account_type: Type[T]) -> T: if private_key_str.startswith("0x"): @@ -22,6 +103,11 @@ def account_from_hex_string(private_key_str: str, account_type: Type[T]) -> T: def account_from_file(private_key_path: Path, account_type: Type[T]) -> T: private_key = private_key_path.read_bytes() + if account_type == SOLAccount: + private_key = parse_solana_private_key( + solana_private_key_from_bytes(private_key) + ) + return account_type(private_key) @@ -33,10 +119,59 @@ def _load_account( """Load private key from a string or a file. takes the string argument in priority""" if private_key_str: + # Check Account type based on private-key string format (base58 / uint for solana) + private_key_chain = detect_chain_from_private_key(private_key=private_key_str) + if private_key_chain == Chain.SOL: + account_type = SOLAccount + logger.debug("Solana private key is detected") + parsed_key = parse_solana_private_key(private_key_str) + return account_type(parsed_key) logger.debug("Using account from string") return account_from_hex_string(private_key_str, account_type) elif private_key_path and private_key_path.is_file(): - logger.debug("Using account from file") + if private_key_path: + try: + # Look for the account by private_key_path in CHAINS_CONFIG_FILE + with open(settings.CHAINS_CONFIG_FILE, "r") as file: + accounts = json.load(file) + + matching_account = next( + ( + account + for account in accounts + if account["path"] == str(private_key_path) + ), + None, + ) + + if matching_account: + chain = Chain(matching_account["chain"]) + account_type = CHAIN_TO_ACCOUNT_MAP.get(chain, ETHAccount) + if account_type is None: + account_type = ETHAccount + logger.debug( + f"Detected {chain} account for path {private_key_path}" + ) + else: + logger.warning( + f"No matching account found for path {private_key_path}, defaulting to {account_type.__name__}" + ) + + except FileNotFoundError: + logger.warning( + f"CHAINS_CONFIG_FILE not found, using default account type {account_type.__name__}" + ) + except json.JSONDecodeError: + logger.error( + f"Invalid format in CHAINS_CONFIG_FILE, unable to load account info." + ) + raise ValueError(f"Invalid format in {settings.CHAINS_CONFIG_FILE}.") + except Exception as e: + logger.error(f"Error loading accounts from config: {e}") + raise ValueError( + f"Could not find matching account for path {private_key_path}." + ) + return account_from_file(private_key_path, account_type) elif settings.REMOTE_CRYPTO_HOST: logger.debug("Using remote account") diff --git a/src/aleph/sdk/chains/common.py b/src/aleph/sdk/chains/common.py index 0a90183c..6087ece6 100644 --- a/src/aleph/sdk/chains/common.py +++ b/src/aleph/sdk/chains/common.py @@ -3,6 +3,7 @@ from pathlib import Path from typing import Dict, Optional +import nacl.signing from coincurve.keys import PrivateKey from typing_extensions import deprecated @@ -149,6 +150,20 @@ def generate_key() -> bytes: return privkey.secret +def generate_key_solana() -> bytes: + """ + Generate a new Solana private key (32 bytes) using Ed25519. + + Returns: + A bytes object representing the 32-byte Solana private key. + """ + # Generate a new signing key (this is the private key part) + private_key = nacl.signing.SigningKey.generate() + + # Return only the private key (32 bytes) + return private_key.encode() + + def get_fallback_private_key(path: Optional[Path] = None) -> bytes: path = path or settings.PRIVATE_KEY_FILE private_key: bytes diff --git a/src/aleph/sdk/conf.py b/src/aleph/sdk/conf.py index 4236370a..a68f9993 100644 --- a/src/aleph/sdk/conf.py +++ b/src/aleph/sdk/conf.py @@ -25,8 +25,13 @@ class Settings(BaseSettings): description="Path to the mnemonic used to create Substrate keypairs", ) + CHAINS_CONFIG_FILE: Path = Field( + default=Path("chains_config.json"), + description="Path to the JSON file containing chain account configurations", + ) + PRIVATE_KEY_STRING: Optional[str] = None - API_HOST: str = "https://api2.aleph.im" + API_HOST: str = "https://api2.aleph.im/" MAX_INLINE_SIZE: int = 50000 API_UNIX_SOCKET: Optional[str] = None REMOTE_CRYPTO_HOST: Optional[str] = None @@ -162,13 +167,3 @@ class Config: settings.PRIVATE_MNEMONIC_FILE = Path( settings.CONFIG_HOME, "private-keys", "substrate.mnemonic" ) - -# Update CHAINS settings and remove placeholders -CHAINS_ENV = [(key[7:], value) for key, value in settings if key.startswith("CHAINS_")] -for fields, value in CHAINS_ENV: - if value: - chain, field = fields.split("_", 1) - chain = chain if chain not in Chain.__members__ else Chain[chain] - field = field.lower() - settings.CHAINS[chain].__dict__[field] = value - settings.__delattr__(f"CHAINS_{fields}") diff --git a/src/aleph/sdk/types.py b/src/aleph/sdk/types.py index 081a3465..84c44725 100644 --- a/src/aleph/sdk/types.py +++ b/src/aleph/sdk/types.py @@ -1,12 +1,13 @@ from abc import abstractmethod from enum import Enum +from pathlib import Path from typing import Dict, Optional, Protocol, TypeVar from pydantic import BaseModel __all__ = ("StorageEnum", "Account", "AccountFromPrivateKey", "GenericMessage") -from aleph_message.models import AlephMessage +from aleph_message.models import AlephMessage, Chain class StorageEnum(str, Enum): @@ -76,3 +77,16 @@ class ChainInfo(BaseModel): token: str super_token: Optional[str] = None active: bool = True + + +class ChainAccount(BaseModel): + """ + Intern Chain Management with Account. + """ + + name: str + path: Path + chain: Chain + + class Config: + use_enum_values = True diff --git a/src/aleph/sdk/utils.py b/src/aleph/sdk/utils.py index 116c7b42..f4a037f2 100644 --- a/src/aleph/sdk/utils.py +++ b/src/aleph/sdk/utils.py @@ -28,6 +28,8 @@ from uuid import UUID from zipfile import BadZipFile, ZipFile +import aiofiles +import base58 from aleph_message.models import ItemHash, MessageType from aleph_message.models.execution.program import Encoding from aleph_message.models.execution.volume import MachineVolume @@ -37,7 +39,7 @@ from pydantic.json import pydantic_encoder from aleph.sdk.conf import settings -from aleph.sdk.types import GenericMessage, SEVInfo, SEVMeasurement +from aleph.sdk.types import ChainAccount, GenericMessage, SEVInfo, SEVMeasurement logger = logging.getLogger(__name__) @@ -393,3 +395,170 @@ def make_packet_header( header[20:52] = h.digest() return header + + +def parse_solana_private_key(private_key: Union[str, List[int], bytes]) -> bytes: + """ + Parse the private key which could be either: + - a base58-encoded string (which may contain both private and public key) + - a list of uint8 integers (which may contain both private and public key) + - a byte array (exactly 32 bytes) + + Returns: + bytes: The private key in byte format (32 bytes). + + Raises: + ValueError: If the private key format is invalid or the length is incorrect. + """ + # If the private key is already in byte format + if isinstance(private_key, bytes): + if len(private_key) != 32: + raise ValueError("The private key in bytes must be exactly 32 bytes long.") + return private_key + + # If the private key is a base58-encoded string + elif isinstance(private_key, str): + try: + decoded_key = base58.b58decode(private_key) + logger.debug(f"Decoded key length: {len(decoded_key)}") # Debugging print + if len(decoded_key) not in [32, 64]: + raise ValueError( + "The base58 decoded private key must be either 32 or 64 bytes long." + ) + return decoded_key[:32] + except Exception as e: + raise ValueError(f"Invalid base58 encoded private key: {e}") + + # If the private key is a list of uint8 integers + elif isinstance(private_key, list): + if all(isinstance(i, int) and 0 <= i <= 255 for i in private_key): + byte_key = bytes(private_key) + if len(byte_key) < 32: + raise ValueError("The uint8 array must contain at least 32 elements.") + return byte_key[:32] # Take the first 32 bytes (private key) + else: + raise ValueError( + "Invalid uint8 array, must contain integers between 0 and 255." + ) + + else: + raise ValueError( + "Unsupported private key format. Must be a base58 string, bytes, or a list of uint8 integers." + ) + + +async def load_json(file_path: Path): + """Asynchronously load JSON from a file. If the file does not exist or is empty, return an empty list.""" + if not file_path.exists() or file_path.stat().st_size == 0: + logger.debug( + f"File {file_path} does not exist or is empty. Returning an empty list." + ) + return [] + async with aiofiles.open(file_path, "r") as file: + content = await file.read() + return json.loads(content) + + +async def save_json(file_path: Path, data: list): + """Asynchronously save a list of JSON objects to a file.""" + async with aiofiles.open(file_path, "w") as file: + data_serializable = [{**item, "path": str(item["path"])} for item in data] + await file.write(json.dumps(data_serializable, indent=4)) + + +async def add_chain_account(new_account: ChainAccount): + """Add a new chain account to the JSON file asynchronously.""" + accounts = await load_json(settings.CHAINS_CONFIG_FILE) + + for account in accounts: + if account["name"] == new_account.name: + logger.error(f"Account with name {new_account.name} already exists.") + raise ValueError(f"Account with name {new_account.name} already exists.") + + accounts.append(new_account.dict()) + await save_json(settings.CHAINS_CONFIG_FILE, accounts) + + logger.debug( + f"Added account for {new_account.name} with chain {new_account.chain} and path {new_account.path}." + ) + + +async def get_chain_account(name: str) -> ChainAccount: + """Retrieve a chain account by name from the JSON file.""" + accounts = await load_json(settings.CHAINS_CONFIG_FILE) + + for account in accounts: + if account["name"] == name: + logger.debug(f"Found account with name {name}.") + return ChainAccount(**account) + + logger.error(f"Account with name {name} not found.") + raise ValueError(f"Account with name {name} not found.") + + +async def update_chain_account(updated_account: ChainAccount): + """Update an existing chain account in the JSON file.""" + accounts = await load_json(settings.CHAINS_CONFIG_FILE) + + for index, account in enumerate(accounts): + if account["name"] == updated_account.name: + accounts[index] = updated_account.dict() + await save_json(settings.CHAINS_CONFIG_FILE, accounts) + logger.debug(f"Updated account with name {updated_account.name}.") + return + + logger.error(f"Account with name {updated_account.name} not found for update.") + raise ValueError(f"Account with name {updated_account.name} not found.") + + +async def delete_chain_account(name: str): + """Delete a chain account from the JSON file.""" + accounts = await load_json(settings.CHAINS_CONFIG_FILE) + + updated_accounts = [account for account in accounts if account["name"] != name] + + if len(updated_accounts) == len(accounts): + logger.error(f"No account found with name {name}.") + raise ValueError(f"No account found with name {name}.") + + await save_json(settings.CHAINS_CONFIG_FILE, updated_accounts) + logger.debug(f"Deleted account with name {name}.") + + +def solana_private_key_from_bytes( + private_key_bytes: bytes, output_format: str = "base58" +) -> Union[str, List[int], bytes]: + """ + Convert a Solana private key in bytes back to different formats (base58 string, uint8 list, or raw bytes). + + - For base58 string: Encode the bytes into a base58 string. + - For uint8 list: Convert the bytes into a list of integers. + - For raw bytes: Return as-is. + + Args: + private_key_bytes (bytes): The private key in byte format. + output_format (str): The format to return ('base58', 'list', 'bytes'). + + Returns: + The private key in the requested format. + + Raises: + ValueError: If the output_format is not recognized or the private key length is invalid. + """ + if not isinstance(private_key_bytes, bytes): + raise ValueError("Expected the private key in bytes.") + + if len(private_key_bytes) != 32: + raise ValueError("Solana private key must be exactly 32 bytes long.") + + if output_format == "base58": + return base58.b58encode(private_key_bytes).decode("utf-8") + + elif output_format == "list": + return list(private_key_bytes) + + elif output_format == "bytes": + return private_key_bytes + + else: + raise ValueError("Invalid output format. Choose 'base58', 'list', or 'bytes'.") diff --git a/tests/unit/test_chain_solana.py b/tests/unit/test_chain_solana.py index ed2fff78..794056e2 100644 --- a/tests/unit/test_chain_solana.py +++ b/tests/unit/test_chain_solana.py @@ -5,11 +5,14 @@ import base58 import pytest +from aleph_message.models import Chain from nacl.signing import VerifyKey +from aleph.sdk.account import detect_chain_from_private_key from aleph.sdk.chains.common import get_verification_buffer from aleph.sdk.chains.solana import SOLAccount, get_fallback_account, verify_signature from aleph.sdk.exceptions import BadSignatureError +from aleph.sdk.utils import parse_solana_private_key @dataclass @@ -136,3 +139,98 @@ async def test_sign_raw(solana_account): assert isinstance(signature, bytes) verify_signature(signature, solana_account.get_address(), buffer) + + +def test_parse_private_key_base58(): + base58_key = "9beEbjn1Md7prQbH9kk9HjTM3npbj1S49BJQpSpYJKvnfATP8Eki9ofaq19tAzpijjV4TyTtibXGBkRjFnmTkiD" + private_key_bytes = parse_solana_private_key(base58_key) + + assert isinstance(private_key_bytes, bytes) + assert len(private_key_bytes) == 32 + + account = SOLAccount(private_key_bytes) + + assert account.get_address() == "8XnzZVqAD1GUEYjQvsyURG36F7ZEhyDGpvYD68TSkSLy" + assert detect_chain_from_private_key(base58_key) == Chain.SOL + assert isinstance(account.get_address(), str) + assert len(account.get_address()) > 0 + + +def test_parse_private_key_uint8_array(): + uint8_array_key = [ + 73, + 6, + 73, + 131, + 134, + 65, + 155, + 206, + 87, + 203, + 226, + 184, + 174, + 66, + 214, + 252, + 201, + 188, + 56, + 102, + 241, + 81, + 21, + 30, + 150, + 55, + 134, + 252, + 138, + 137, + 174, + 163, + 89, + 90, + 53, + 40, + 237, + 153, + 99, + 127, + 220, + 233, + 29, + 48, + 180, + 199, + 18, + 225, + 249, + 163, + 140, + 157, + 201, + 74, + 221, + 176, + 229, + 6, + 182, + 226, + 74, + 243, + 193, + 143, + ] + private_key_bytes = parse_solana_private_key(uint8_array_key) + + assert isinstance(private_key_bytes, bytes) + assert len(private_key_bytes) == 32 + + account = SOLAccount(private_key_bytes) + + assert account.get_address() == "71o4nN2BgB8MdD771U5VAPBj8jwufxkYJZwNnCr81VwL" + assert detect_chain_from_private_key(uint8_array_key) == Chain.SOL + assert isinstance(account.get_address(), str) + assert len(account.get_address()) > 0 diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index bfca23a5..60c99a21 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -1,6 +1,7 @@ import base64 import datetime +import base58 import pytest as pytest from aleph_message.models import ( AggregateMessage, @@ -19,12 +20,16 @@ PersistentVolume, ) +from aleph.sdk.account import detect_chain_from_private_key, is_valid_private_key +from aleph.sdk.chains.ethereum import ETHAccount +from aleph.sdk.chains.solana import SOLAccount from aleph.sdk.types import SEVInfo from aleph.sdk.utils import ( calculate_firmware_hash, compute_confidential_measure, enum_as_str, get_message_type_value, + parse_solana_private_key, parse_volume, ) @@ -235,3 +240,107 @@ def test_compute_confidential_measure(): ) == b"ls2jv10V3HVShVI/RHCo/a43WO0soLZf0huU9ZZstIw=" ) + + +def test_parse_solana_private_key_bytes(): + # Valid 32-byte private key + private_key_bytes = bytes(range(32)) + parsed_key = parse_solana_private_key(private_key_bytes) + assert isinstance(parsed_key, bytes) + assert len(parsed_key) == 32 + assert parsed_key == private_key_bytes + + # Invalid private key (too short) + with pytest.raises( + ValueError, match="The private key in bytes must be exactly 32 bytes long." + ): + parse_solana_private_key(bytes(range(31))) + + +def test_parse_solana_private_key_base58(): + # Valid base58 private key (32 bytes) + base58_key = base58.b58encode(bytes(range(32))).decode("utf-8") + parsed_key = parse_solana_private_key(base58_key) + assert isinstance(parsed_key, bytes) + assert len(parsed_key) == 32 + + # Invalid base58 key (not decodable) + with pytest.raises(ValueError, match="Invalid base58 encoded private key"): + parse_solana_private_key("invalid_base58_key") + + # Invalid base58 key (wrong length) + with pytest.raises( + ValueError, + match="The base58 decoded private key must be either 32 or 64 bytes long.", + ): + parse_solana_private_key(base58.b58encode(bytes(range(31))).decode("utf-8")) + + +def test_parse_solana_private_key_list(): + # Valid list of uint8 integers (64 elements, but we only take the first 32 for private key) + uint8_list = list(range(64)) + parsed_key = parse_solana_private_key(uint8_list) + assert isinstance(parsed_key, bytes) + assert len(parsed_key) == 32 + assert parsed_key == bytes(range(32)) + + # Invalid list (contains non-integers) + with pytest.raises(ValueError, match="Invalid uint8 array"): + parse_solana_private_key([1, 2, "not an int", 4]) # type: ignore # Ignore type check for string + + # Invalid list (less than 32 elements) + with pytest.raises( + ValueError, match="The uint8 array must contain at least 32 elements." + ): + parse_solana_private_key(list(range(31))) + + +def test_is_solana_private_key(): + sol_key = base58.b58encode(bytes(range(64))).decode("utf-8") + assert is_valid_private_key(sol_key, SOLAccount) is True + + short_sol_key = base58.b58encode(bytes(range(32))).decode("utf-8") + assert is_valid_private_key(short_sol_key, SOLAccount) is False + + sol_key_list = list(range(64)) + assert is_valid_private_key(sol_key_list, SOLAccount) is True + + short_sol_key_list = list(range(32)) + assert is_valid_private_key(short_sol_key_list, SOLAccount) is False + + sol_key_bytes = bytes(range(64)) + assert is_valid_private_key(sol_key_bytes, SOLAccount) is True + + short_sol_key_bytes = bytes(range(32)) + assert is_valid_private_key(short_sol_key_bytes, SOLAccount) is False + + +def test_detect_chain_from_private_key(): + eth_key = "0x" + "a" * 64 + assert detect_chain_from_private_key(eth_key) == Chain.ETH + + sol_key = base58.b58encode(bytes(range(64))).decode("utf-8") + assert detect_chain_from_private_key(sol_key) == Chain.SOL + + sol_key_list = list(range(64)) + assert detect_chain_from_private_key(sol_key_list) == Chain.SOL + + with pytest.raises(ValueError, match="Unsupported private key format"): + detect_chain_from_private_key("invalid_key") + + +def test_is_eth_private_key(): + eth_key = "0x" + "a" * 64 + assert is_valid_private_key(eth_key, ETHAccount) is True + + eth_key_no_prefix = "a" * 64 + assert is_valid_private_key(eth_key_no_prefix, ETHAccount) is True + + assert is_valid_private_key("a" * 63, ETHAccount) is False + + assert is_valid_private_key("zz" * 32, ETHAccount) is False + + eth_key_bytes = bytes(range(32)) + assert is_valid_private_key(eth_key_bytes, ETHAccount) is True + + assert is_valid_private_key(bytes(range(31)), ETHAccount) is False From 0b545d353043a07e5f0fdf3d1a6edd92956b11ba Mon Sep 17 00:00:00 2001 From: 1yam Date: Mon, 30 Sep 2024 13:37:28 +0200 Subject: [PATCH 02/12] fixup! Feature: Internal account management + fix on _load_account to handle SolAccount --- src/aleph/sdk/conf.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/aleph/sdk/conf.py b/src/aleph/sdk/conf.py index a68f9993..354ad849 100644 --- a/src/aleph/sdk/conf.py +++ b/src/aleph/sdk/conf.py @@ -31,7 +31,7 @@ class Settings(BaseSettings): ) PRIVATE_KEY_STRING: Optional[str] = None - API_HOST: str = "https://api2.aleph.im/" + API_HOST: str = "https://api2.aleph.im" MAX_INLINE_SIZE: int = 50000 API_UNIX_SOCKET: Optional[str] = None REMOTE_CRYPTO_HOST: Optional[str] = None @@ -167,3 +167,13 @@ class Config: settings.PRIVATE_MNEMONIC_FILE = Path( settings.CONFIG_HOME, "private-keys", "substrate.mnemonic" ) + +# Update CHAINS settings and remove placeholders +CHAINS_ENV = [(key[7:], value) for key, value in settings if key.startswith("CHAINS_")] +for fields, value in CHAINS_ENV: + if value: + chain, field = fields.split("_", 1) + chain = chain if chain not in Chain.__members__ else Chain[chain] + field = field.lower() + settings.CHAINS[chain].__dict__[field] = value + settings.__delattr__(f"CHAINS_{fields}") From 2811afed15f742b8f1e295428945badd38fee2e5 Mon Sep 17 00:00:00 2001 From: 1yam Date: Mon, 30 Sep 2024 15:06:14 +0200 Subject: [PATCH 03/12] Fix: chains_config wasn't using settings.CONFIG_HOME for locations --- src/aleph/sdk/conf.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/aleph/sdk/conf.py b/src/aleph/sdk/conf.py index 354ad849..484c50c7 100644 --- a/src/aleph/sdk/conf.py +++ b/src/aleph/sdk/conf.py @@ -167,6 +167,11 @@ class Config: settings.PRIVATE_MNEMONIC_FILE = Path( settings.CONFIG_HOME, "private-keys", "substrate.mnemonic" ) +if str(settings.CHAINS_CONFIG_FILE) == "chains_config.json": + settings.CHAINS_CONFIG_FILE = Path( + settings.CONFIG_HOME, "configs", "chains_config.json" + ) + # Update CHAINS settings and remove placeholders CHAINS_ENV = [(key[7:], value) for key, value in settings if key.startswith("CHAINS_")] From 59c845e776a070f4db471e6be3d4acc0145257d2 Mon Sep 17 00:00:00 2001 From: 1yam Date: Mon, 30 Sep 2024 15:06:27 +0200 Subject: [PATCH 04/12] Fix: blakc issue --- src/aleph/sdk/utils.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/aleph/sdk/utils.py b/src/aleph/sdk/utils.py index f4a037f2..e4d3bc0a 100644 --- a/src/aleph/sdk/utils.py +++ b/src/aleph/sdk/utils.py @@ -496,6 +496,19 @@ async def get_chain_account(name: str) -> ChainAccount: raise ValueError(f"Account with name {name} not found.") +async def get_chain_account_from_path(path: str) -> ChainAccount: + """Retrieve a chain account by name from the JSON file.""" + accounts = await load_json(settings.CHAINS_CONFIG_FILE) + + for account in accounts: + if account["path"] == path: + logger.debug(f"Found account with the path : {path}.") + return ChainAccount(**account) + + logger.error(f"Account with name {path} not found.") + raise ValueError(f"Account with name {path} not found.") + + async def update_chain_account(updated_account: ChainAccount): """Update an existing chain account in the JSON file.""" accounts = await load_json(settings.CHAINS_CONFIG_FILE) From 841a1a362df3d25ada23466850490eb68462a100 Mon Sep 17 00:00:00 2001 From: 1yam Date: Mon, 30 Sep 2024 15:19:43 +0200 Subject: [PATCH 05/12] Fix: rename CHAINS_CONFIG_FILE to CONFIG_FILE to avoid getting issue by conf of chain --- src/aleph/sdk/account.py | 10 +++++----- src/aleph/sdk/conf.py | 6 +++--- src/aleph/sdk/utils.py | 16 ++++++++-------- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/aleph/sdk/account.py b/src/aleph/sdk/account.py index 73b928c1..1b47202b 100644 --- a/src/aleph/sdk/account.py +++ b/src/aleph/sdk/account.py @@ -131,8 +131,8 @@ def _load_account( elif private_key_path and private_key_path.is_file(): if private_key_path: try: - # Look for the account by private_key_path in CHAINS_CONFIG_FILE - with open(settings.CHAINS_CONFIG_FILE, "r") as file: + # Look for the account by private_key_path in CONFIG_FILE + with open(settings.CONFIG_FILE, "r") as file: accounts = json.load(file) matching_account = next( @@ -159,13 +159,13 @@ def _load_account( except FileNotFoundError: logger.warning( - f"CHAINS_CONFIG_FILE not found, using default account type {account_type.__name__}" + f"CONFIG_FILE not found, using default account type {account_type.__name__}" ) except json.JSONDecodeError: logger.error( - f"Invalid format in CHAINS_CONFIG_FILE, unable to load account info." + f"Invalid format in CONFIG_FILE, unable to load account info." ) - raise ValueError(f"Invalid format in {settings.CHAINS_CONFIG_FILE}.") + raise ValueError(f"Invalid format in {settings.CONFIG_FILE}.") except Exception as e: logger.error(f"Error loading accounts from config: {e}") raise ValueError( diff --git a/src/aleph/sdk/conf.py b/src/aleph/sdk/conf.py index 484c50c7..71628407 100644 --- a/src/aleph/sdk/conf.py +++ b/src/aleph/sdk/conf.py @@ -25,7 +25,7 @@ class Settings(BaseSettings): description="Path to the mnemonic used to create Substrate keypairs", ) - CHAINS_CONFIG_FILE: Path = Field( + CONFIG_FILE: Path = Field( default=Path("chains_config.json"), description="Path to the JSON file containing chain account configurations", ) @@ -167,8 +167,8 @@ class Config: settings.PRIVATE_MNEMONIC_FILE = Path( settings.CONFIG_HOME, "private-keys", "substrate.mnemonic" ) -if str(settings.CHAINS_CONFIG_FILE) == "chains_config.json": - settings.CHAINS_CONFIG_FILE = Path( +if str(settings.CONFIG_FILE) == "chains_config.json": + settings.CONFIG_FILE = Path( settings.CONFIG_HOME, "configs", "chains_config.json" ) diff --git a/src/aleph/sdk/utils.py b/src/aleph/sdk/utils.py index e4d3bc0a..4c818443 100644 --- a/src/aleph/sdk/utils.py +++ b/src/aleph/sdk/utils.py @@ -468,7 +468,7 @@ async def save_json(file_path: Path, data: list): async def add_chain_account(new_account: ChainAccount): """Add a new chain account to the JSON file asynchronously.""" - accounts = await load_json(settings.CHAINS_CONFIG_FILE) + accounts = await load_json(settings.CONFIG_FILE) for account in accounts: if account["name"] == new_account.name: @@ -476,7 +476,7 @@ async def add_chain_account(new_account: ChainAccount): raise ValueError(f"Account with name {new_account.name} already exists.") accounts.append(new_account.dict()) - await save_json(settings.CHAINS_CONFIG_FILE, accounts) + await save_json(settings.CONFIG_FILE, accounts) logger.debug( f"Added account for {new_account.name} with chain {new_account.chain} and path {new_account.path}." @@ -485,7 +485,7 @@ async def add_chain_account(new_account: ChainAccount): async def get_chain_account(name: str) -> ChainAccount: """Retrieve a chain account by name from the JSON file.""" - accounts = await load_json(settings.CHAINS_CONFIG_FILE) + accounts = await load_json(settings.CONFIG_FILE) for account in accounts: if account["name"] == name: @@ -498,7 +498,7 @@ async def get_chain_account(name: str) -> ChainAccount: async def get_chain_account_from_path(path: str) -> ChainAccount: """Retrieve a chain account by name from the JSON file.""" - accounts = await load_json(settings.CHAINS_CONFIG_FILE) + accounts = await load_json(settings.CONFIG_FILE) for account in accounts: if account["path"] == path: @@ -511,12 +511,12 @@ async def get_chain_account_from_path(path: str) -> ChainAccount: async def update_chain_account(updated_account: ChainAccount): """Update an existing chain account in the JSON file.""" - accounts = await load_json(settings.CHAINS_CONFIG_FILE) + accounts = await load_json(settings.CONFIG_FILE) for index, account in enumerate(accounts): if account["name"] == updated_account.name: accounts[index] = updated_account.dict() - await save_json(settings.CHAINS_CONFIG_FILE, accounts) + await save_json(settings.CONFIG_FILE, accounts) logger.debug(f"Updated account with name {updated_account.name}.") return @@ -526,7 +526,7 @@ async def update_chain_account(updated_account: ChainAccount): async def delete_chain_account(name: str): """Delete a chain account from the JSON file.""" - accounts = await load_json(settings.CHAINS_CONFIG_FILE) + accounts = await load_json(settings.CONFIG_FILE) updated_accounts = [account for account in accounts if account["name"] != name] @@ -534,7 +534,7 @@ async def delete_chain_account(name: str): logger.error(f"No account found with name {name}.") raise ValueError(f"No account found with name {name}.") - await save_json(settings.CHAINS_CONFIG_FILE, updated_accounts) + await save_json(settings.CONFIG_FILE, updated_accounts) logger.debug(f"Deleted account with name {name}.") From ab627ffdb09b411ea13ec55a5e78070b1e04198c Mon Sep 17 00:00:00 2001 From: 1yam Date: Mon, 30 Sep 2024 15:23:57 +0200 Subject: [PATCH 06/12] Fix: base58 and pynacl is now needed for build --- pyproject.toml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index f375a0ff..f6f3b5ac 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,8 @@ dependencies = [ "eth_typing==4.3.1", "web3==6.3.0", "aiofiles>=24.1.0", + "base58==2.1.1", # Needed now as default with _load_account changement + "pynacl==1.5.0" # Needed now as default with _load_account changement ] [project.optional-dependencies] From 896c1cb0dbf1249ab257d9740eccd28e8c9128d0 Mon Sep 17 00:00:00 2001 From: 1yam Date: Mon, 30 Sep 2024 15:27:12 +0200 Subject: [PATCH 07/12] Fix: f string without nay placeholders --- src/aleph/sdk/account.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/aleph/sdk/account.py b/src/aleph/sdk/account.py index 1b47202b..c3d1b964 100644 --- a/src/aleph/sdk/account.py +++ b/src/aleph/sdk/account.py @@ -163,7 +163,7 @@ def _load_account( ) except json.JSONDecodeError: logger.error( - f"Invalid format in CONFIG_FILE, unable to load account info." + "Invalid format in CONFIG_FILE, unable to load account info." ) raise ValueError(f"Invalid format in {settings.CONFIG_FILE}.") except Exception as e: From 76599ef3db50aa6f67e3718851ce248f98d2ef9b Mon Sep 17 00:00:00 2001 From: 1yam Date: Mon, 30 Sep 2024 15:30:27 +0200 Subject: [PATCH 08/12] Fix: black error --- src/aleph/sdk/conf.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/aleph/sdk/conf.py b/src/aleph/sdk/conf.py index 71628407..b6f6f986 100644 --- a/src/aleph/sdk/conf.py +++ b/src/aleph/sdk/conf.py @@ -168,9 +168,7 @@ class Config: settings.CONFIG_HOME, "private-keys", "substrate.mnemonic" ) if str(settings.CONFIG_FILE) == "chains_config.json": - settings.CONFIG_FILE = Path( - settings.CONFIG_HOME, "configs", "chains_config.json" - ) + settings.CONFIG_FILE = Path(settings.CONFIG_HOME, "configs", "chains_config.json") # Update CHAINS settings and remove placeholders From 4761948fd8a6707b0f33a0da603f63964ede74eb Mon Sep 17 00:00:00 2001 From: 1yam Date: Tue, 1 Oct 2024 14:16:09 +0200 Subject: [PATCH 09/12] Refactor: we now store single account at the time --- src/aleph/sdk/account.py | 50 ++++---- src/aleph/sdk/chains/solana.py | 92 ++++++++++++++- src/aleph/sdk/conf.py | 24 +++- src/aleph/sdk/types.py | 1 - src/aleph/sdk/utils.py | 195 +++++--------------------------- tests/unit/test_chain_solana.py | 8 +- tests/unit/test_utils.py | 3 +- 7 files changed, 171 insertions(+), 202 deletions(-) diff --git a/src/aleph/sdk/account.py b/src/aleph/sdk/account.py index c3d1b964..e4776d76 100644 --- a/src/aleph/sdk/account.py +++ b/src/aleph/sdk/account.py @@ -10,10 +10,14 @@ from aleph.sdk.chains.common import get_fallback_private_key from aleph.sdk.chains.ethereum import ETHAccount from aleph.sdk.chains.remote import RemoteAccount -from aleph.sdk.chains.solana import SOLAccount +from aleph.sdk.chains.solana import ( + SOLAccount, + parse_solana_private_key, + solana_private_key_from_bytes, +) from aleph.sdk.conf import settings from aleph.sdk.types import AccountFromPrivateKey -from aleph.sdk.utils import parse_solana_private_key, solana_private_key_from_bytes +from aleph.sdk.utils import load_account_key_context logger = logging.getLogger(__name__) @@ -130,46 +134,42 @@ def _load_account( return account_from_hex_string(private_key_str, account_type) elif private_key_path and private_key_path.is_file(): if private_key_path: + account_type = ETHAccount # Default account type + try: - # Look for the account by private_key_path in CONFIG_FILE - with open(settings.CONFIG_FILE, "r") as file: - accounts = json.load(file) - - matching_account = next( - ( - account - for account in accounts - if account["path"] == str(private_key_path) - ), - None, - ) + account_data = load_account_key_context(settings.CONFIG_FILE) - if matching_account: - chain = Chain(matching_account["chain"]) - account_type = CHAIN_TO_ACCOUNT_MAP.get(chain, ETHAccount) - if account_type is None: - account_type = ETHAccount + if account_data: + chain = Chain(account_data.chain) + account_type = ( + CHAIN_TO_ACCOUNT_MAP.get(chain, ETHAccount) or ETHAccount + ) logger.debug( f"Detected {chain} account for path {private_key_path}" ) else: logger.warning( - f"No matching account found for path {private_key_path}, defaulting to {account_type.__name__}" + f"No account data found in {private_key_path}, defaulting to {account_type.__name__}" ) except FileNotFoundError: logger.warning( - f"CONFIG_FILE not found, using default account type {account_type.__name__}" + f"{private_key_path} not found, using default account type {account_type.__name__}" ) except json.JSONDecodeError: logger.error( - "Invalid format in CONFIG_FILE, unable to load account info." + f"Invalid format in {private_key_path}, unable to load account info." + ) + raise ValueError(f"Invalid format in {private_key_path}.") + except KeyError as e: + logger.error(f"Missing key in account config: {e}") + raise ValueError( + f"Invalid account data in {private_key_path}. Key {e} is missing." ) - raise ValueError(f"Invalid format in {settings.CONFIG_FILE}.") except Exception as e: - logger.error(f"Error loading accounts from config: {e}") + logger.error(f"Error loading account from {private_key_path}: {e}") raise ValueError( - f"Could not find matching account for path {private_key_path}." + f"Could not load account data from {private_key_path}." ) return account_from_file(private_key_path, account_type) diff --git a/src/aleph/sdk/chains/solana.py b/src/aleph/sdk/chains/solana.py index ff870a4d..394cb987 100644 --- a/src/aleph/sdk/chains/solana.py +++ b/src/aleph/sdk/chains/solana.py @@ -1,6 +1,6 @@ import json from pathlib import Path -from typing import Dict, Optional, Union +from typing import Dict, List, Optional, Union import base58 from nacl.exceptions import BadSignatureError as NaclBadSignatureError @@ -79,7 +79,7 @@ def verify_signature( public_key: The public key to use for verification. Can be a base58 encoded string or bytes. message: The message to verify. Can be an utf-8 string or bytes. Raises: - BadSignatureError: If the signature is invalid. + BadSignatureError: If the signature is invalid.! """ if isinstance(signature, str): signature = base58.b58decode(signature) @@ -91,3 +91,91 @@ def verify_signature( VerifyKey(public_key).verify(message, signature) except NaclBadSignatureError as e: raise BadSignatureError from e + + +def solana_private_key_from_bytes( + private_key_bytes: bytes, output_format: str = "base58" +) -> Union[str, List[int], bytes]: + """ + Convert a Solana private key in bytes back to different formats (base58 string, uint8 list, or raw bytes). + + - For base58 string: Encode the bytes into a base58 string. + - For uint8 list: Convert the bytes into a list of integers. + - For raw bytes: Return as-is. + + Args: + private_key_bytes (bytes): The private key in byte format. + output_format (str): The format to return ('base58', 'list', 'bytes'). + + Returns: + The private key in the requested format. + + Raises: + ValueError: If the output_format is not recognized or the private key length is invalid. + """ + if not isinstance(private_key_bytes, bytes): + raise ValueError("Expected the private key in bytes.") + + if len(private_key_bytes) != 32: + raise ValueError("Solana private key must be exactly 32 bytes long.") + + if output_format == "base58": + return base58.b58encode(private_key_bytes).decode("utf-8") + + elif output_format == "list": + return list(private_key_bytes) + + elif output_format == "bytes": + return private_key_bytes + + else: + raise ValueError("Invalid output format. Choose 'base58', 'list', or 'bytes'.") + + +def parse_solana_private_key(private_key: Union[str, List[int], bytes]) -> bytes: + """ + Parse the private key which could be either: + - a base58-encoded string (which may contain both private and public key) + - a list of uint8 integers (which may contain both private and public key) + - a byte array (exactly 32 bytes) + + Returns: + bytes: The private key in byte format (32 bytes). + + Raises: + ValueError: If the private key format is invalid or the length is incorrect. + """ + # If the private key is already in byte format + if isinstance(private_key, bytes): + if len(private_key) != 32: + raise ValueError("The private key in bytes must be exactly 32 bytes long.") + return private_key + + # If the private key is a base58-encoded string + elif isinstance(private_key, str): + try: + decoded_key = base58.b58decode(private_key) + if len(decoded_key) not in [32, 64]: + raise ValueError( + "The base58 decoded private key must be either 32 or 64 bytes long." + ) + return decoded_key[:32] + except Exception as e: + raise ValueError(f"Invalid base58 encoded private key: {e}") + + # If the private key is a list of uint8 integers + elif isinstance(private_key, list): + if all(isinstance(i, int) and 0 <= i <= 255 for i in private_key): + byte_key = bytes(private_key) + if len(byte_key) < 32: + raise ValueError("The uint8 array must contain at least 32 elements.") + return byte_key[:32] # Take the first 32 bytes (private key) + else: + raise ValueError( + "Invalid uint8 array, must contain integers between 0 and 255." + ) + + else: + raise ValueError( + "Unsupported private key format. Must be a base58 string, bytes, or a list of uint8 integers." + ) diff --git a/src/aleph/sdk/conf.py b/src/aleph/sdk/conf.py index b6f6f986..8cb729af 100644 --- a/src/aleph/sdk/conf.py +++ b/src/aleph/sdk/conf.py @@ -1,3 +1,4 @@ +import json import os from pathlib import Path from shutil import which @@ -13,6 +14,11 @@ class Settings(BaseSettings): CONFIG_HOME: Optional[str] = None + CONFIG_FILE: Path = Field( + default=Path("chains_config.json"), + description="Path to the JSON file containing chain account configurations", + ) + # In case the user does not want to bother with handling private keys himself, # do an ugly and insecure write and read from disk to this file. PRIVATE_KEY_FILE: Path = Field( @@ -25,11 +31,6 @@ class Settings(BaseSettings): description="Path to the mnemonic used to create Substrate keypairs", ) - CONFIG_FILE: Path = Field( - default=Path("chains_config.json"), - description="Path to the JSON file containing chain account configurations", - ) - PRIVATE_KEY_STRING: Optional[str] = None API_HOST: str = "https://api2.aleph.im" MAX_INLINE_SIZE: int = 50000 @@ -168,7 +169,17 @@ class Config: settings.CONFIG_HOME, "private-keys", "substrate.mnemonic" ) if str(settings.CONFIG_FILE) == "chains_config.json": - settings.CONFIG_FILE = Path(settings.CONFIG_HOME, "configs", "chains_config.json") + settings.CONFIG_FILE = Path(settings.CONFIG_HOME, "chains_config.json") + # If Config file exist and well filled we update the PRIVATE_KEY_FILE default + if settings.CONFIG_FILE.exists(): + try: + with open(settings.CONFIG_FILE, "r", encoding="utf-8") as f: + config_data = json.load(f) + + if "path" in config_data: + settings.PRIVATE_KEY_FILE = Path(config_data["path"]) + except json.JSONDecodeError: + pass # Update CHAINS settings and remove placeholders @@ -180,3 +191,4 @@ class Config: field = field.lower() settings.CHAINS[chain].__dict__[field] = value settings.__delattr__(f"CHAINS_{fields}") +print(f"Private key file is set to: {settings.PRIVATE_KEY_FILE}") diff --git a/src/aleph/sdk/types.py b/src/aleph/sdk/types.py index 84c44725..d86e1fdd 100644 --- a/src/aleph/sdk/types.py +++ b/src/aleph/sdk/types.py @@ -84,7 +84,6 @@ class ChainAccount(BaseModel): Intern Chain Management with Account. """ - name: str path: Path chain: Chain diff --git a/src/aleph/sdk/utils.py b/src/aleph/sdk/utils.py index 4c818443..6a5cd403 100644 --- a/src/aleph/sdk/utils.py +++ b/src/aleph/sdk/utils.py @@ -397,181 +397,48 @@ def make_packet_header( return header -def parse_solana_private_key(private_key: Union[str, List[int], bytes]) -> bytes: +def load_account_key_context(file_path: Path) -> Optional[ChainAccount]: """ - Parse the private key which could be either: - - a base58-encoded string (which may contain both private and public key) - - a list of uint8 integers (which may contain both private and public key) - - a byte array (exactly 32 bytes) - - Returns: - bytes: The private key in byte format (32 bytes). - - Raises: - ValueError: If the private key format is invalid or the length is incorrect. + Synchronously load the private key and chain type from a file. + If the file does not exist or is empty, return None. """ - # If the private key is already in byte format - if isinstance(private_key, bytes): - if len(private_key) != 32: - raise ValueError("The private key in bytes must be exactly 32 bytes long.") - return private_key - - # If the private key is a base58-encoded string - elif isinstance(private_key, str): - try: - decoded_key = base58.b58decode(private_key) - logger.debug(f"Decoded key length: {len(decoded_key)}") # Debugging print - if len(decoded_key) not in [32, 64]: - raise ValueError( - "The base58 decoded private key must be either 32 or 64 bytes long." - ) - return decoded_key[:32] - except Exception as e: - raise ValueError(f"Invalid base58 encoded private key: {e}") - - # If the private key is a list of uint8 integers - elif isinstance(private_key, list): - if all(isinstance(i, int) and 0 <= i <= 255 for i in private_key): - byte_key = bytes(private_key) - if len(byte_key) < 32: - raise ValueError("The uint8 array must contain at least 32 elements.") - return byte_key[:32] # Take the first 32 bytes (private key) - else: - raise ValueError( - "Invalid uint8 array, must contain integers between 0 and 255." - ) - - else: - raise ValueError( - "Unsupported private key format. Must be a base58 string, bytes, or a list of uint8 integers." - ) - - -async def load_json(file_path: Path): - """Asynchronously load JSON from a file. If the file does not exist or is empty, return an empty list.""" if not file_path.exists() or file_path.stat().st_size == 0: - logger.debug( - f"File {file_path} does not exist or is empty. Returning an empty list." - ) - return [] - async with aiofiles.open(file_path, "r") as file: - content = await file.read() - return json.loads(content) - - -async def save_json(file_path: Path, data: list): - """Asynchronously save a list of JSON objects to a file.""" - async with aiofiles.open(file_path, "w") as file: - data_serializable = [{**item, "path": str(item["path"])} for item in data] - await file.write(json.dumps(data_serializable, indent=4)) - - -async def add_chain_account(new_account: ChainAccount): - """Add a new chain account to the JSON file asynchronously.""" - accounts = await load_json(settings.CONFIG_FILE) - - for account in accounts: - if account["name"] == new_account.name: - logger.error(f"Account with name {new_account.name} already exists.") - raise ValueError(f"Account with name {new_account.name} already exists.") - - accounts.append(new_account.dict()) - await save_json(settings.CONFIG_FILE, accounts) - - logger.debug( - f"Added account for {new_account.name} with chain {new_account.chain} and path {new_account.path}." - ) - - -async def get_chain_account(name: str) -> ChainAccount: - """Retrieve a chain account by name from the JSON file.""" - accounts = await load_json(settings.CONFIG_FILE) - - for account in accounts: - if account["name"] == name: - logger.debug(f"Found account with name {name}.") - return ChainAccount(**account) - - logger.error(f"Account with name {name} not found.") - raise ValueError(f"Account with name {name} not found.") - - -async def get_chain_account_from_path(path: str) -> ChainAccount: - """Retrieve a chain account by name from the JSON file.""" - accounts = await load_json(settings.CONFIG_FILE) - - for account in accounts: - if account["path"] == path: - logger.debug(f"Found account with the path : {path}.") - return ChainAccount(**account) - - logger.error(f"Account with name {path} not found.") - raise ValueError(f"Account with name {path} not found.") - - -async def update_chain_account(updated_account: ChainAccount): - """Update an existing chain account in the JSON file.""" - accounts = await load_json(settings.CONFIG_FILE) - - for index, account in enumerate(accounts): - if account["name"] == updated_account.name: - accounts[index] = updated_account.dict() - await save_json(settings.CONFIG_FILE, accounts) - logger.debug(f"Updated account with name {updated_account.name}.") - return - - logger.error(f"Account with name {updated_account.name} not found for update.") - raise ValueError(f"Account with name {updated_account.name} not found.") - - -async def delete_chain_account(name: str): - """Delete a chain account from the JSON file.""" - accounts = await load_json(settings.CONFIG_FILE) - - updated_accounts = [account for account in accounts if account["name"] != name] - - if len(updated_accounts) == len(accounts): - logger.error(f"No account found with name {name}.") - raise ValueError(f"No account found with name {name}.") + logger.debug(f"File {file_path} does not exist or is empty. Returning None.") + return None - await save_json(settings.CONFIG_FILE, updated_accounts) - logger.debug(f"Deleted account with name {name}.") + try: + with file_path.open("rb") as file: + content = file.read() + data = json.loads(content.decode("utf-8")) + return ChainAccount(**data) + except UnicodeDecodeError as e: + logger.error(f"Unable to decode {file_path} as UTF-8: {e}") + raise ValueError(f"File {file_path} is not properly encoded.") + except json.JSONDecodeError: + logger.error(f"Invalid JSON format in {file_path}.") + raise ValueError(f"Invalid format in {file_path}.") -def solana_private_key_from_bytes( - private_key_bytes: bytes, output_format: str = "base58" -) -> Union[str, List[int], bytes]: +def save_account_key_context(file_path: Path, data: ChainAccount): """ - Convert a Solana private key in bytes back to different formats (base58 string, uint8 list, or raw bytes). - - - For base58 string: Encode the bytes into a base58 string. - - For uint8 list: Convert the bytes into a list of integers. - - For raw bytes: Return as-is. - - Args: - private_key_bytes (bytes): The private key in byte format. - output_format (str): The format to return ('base58', 'list', 'bytes'). + Synchronously save a single ChainAccount object as JSON to a file. + """ + with file_path.open("w") as file: + data_serializable = data.dict() + data_serializable["path"] = str(data_serializable["path"]) + json.dump(data_serializable, file, indent=4) - Returns: - The private key in the requested format. - Raises: - ValueError: If the output_format is not recognized or the private key length is invalid. +async def upsert_chain_account(new_account: ChainAccount, config_file: Path): """ - if not isinstance(private_key_bytes, bytes): - raise ValueError("Expected the private key in bytes.") - - if len(private_key_bytes) != 32: - raise ValueError("Solana private key must be exactly 32 bytes long.") + Update the chain account in the config file, replacing the existing one if necessary. - if output_format == "base58": - return base58.b58encode(private_key_bytes).decode("utf-8") + If the file doesn't exist, create it and add the new account. + """ - elif output_format == "list": - return list(private_key_bytes) + account_data = {"path": str(new_account.path), "chain": new_account.chain} - elif output_format == "bytes": - return private_key_bytes + with config_file.open("w", encoding="utf-8") as f: + json.dump(account_data, f, indent=4) - else: - raise ValueError("Invalid output format. Choose 'base58', 'list', or 'bytes'.") + logger.debug(f"Replaced account in {config_file} with {new_account.path}.") diff --git a/tests/unit/test_chain_solana.py b/tests/unit/test_chain_solana.py index 794056e2..aeb4b55a 100644 --- a/tests/unit/test_chain_solana.py +++ b/tests/unit/test_chain_solana.py @@ -10,9 +10,13 @@ from aleph.sdk.account import detect_chain_from_private_key from aleph.sdk.chains.common import get_verification_buffer -from aleph.sdk.chains.solana import SOLAccount, get_fallback_account, verify_signature +from aleph.sdk.chains.solana import ( + SOLAccount, + get_fallback_account, + parse_solana_private_key, + verify_signature, +) from aleph.sdk.exceptions import BadSignatureError -from aleph.sdk.utils import parse_solana_private_key @dataclass diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 60c99a21..bf4c43ff 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -22,14 +22,13 @@ from aleph.sdk.account import detect_chain_from_private_key, is_valid_private_key from aleph.sdk.chains.ethereum import ETHAccount -from aleph.sdk.chains.solana import SOLAccount +from aleph.sdk.chains.solana import SOLAccount, parse_solana_private_key from aleph.sdk.types import SEVInfo from aleph.sdk.utils import ( calculate_firmware_hash, compute_confidential_measure, enum_as_str, get_message_type_value, - parse_solana_private_key, parse_volume, ) From d999e3c0f2655c0033ec928fb90f3af5a2466b92 Mon Sep 17 00:00:00 2001 From: 1yam Date: Tue, 1 Oct 2024 14:17:35 +0200 Subject: [PATCH 10/12] Fix: ruff issue --- pyproject.toml | 1 - src/aleph/sdk/utils.py | 2 -- 2 files changed, 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index f6f3b5ac..f533bfe2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,6 @@ dependencies = [ "aleph-superfluid>=0.2.1", "eth_typing==4.3.1", "web3==6.3.0", - "aiofiles>=24.1.0", "base58==2.1.1", # Needed now as default with _load_account changement "pynacl==1.5.0" # Needed now as default with _load_account changement ] diff --git a/src/aleph/sdk/utils.py b/src/aleph/sdk/utils.py index 6a5cd403..962672c1 100644 --- a/src/aleph/sdk/utils.py +++ b/src/aleph/sdk/utils.py @@ -28,8 +28,6 @@ from uuid import UUID from zipfile import BadZipFile, ZipFile -import aiofiles -import base58 from aleph_message.models import ItemHash, MessageType from aleph_message.models.execution.program import Encoding from aleph_message.models.execution.volume import MachineVolume From 2b25fe59f17129ecbada0e8f6723cbca1f172c7f Mon Sep 17 00:00:00 2001 From: 1yam Date: Tue, 1 Oct 2024 14:30:28 +0200 Subject: [PATCH 11/12] fix: debug stuff remove --- src/aleph/sdk/conf.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/aleph/sdk/conf.py b/src/aleph/sdk/conf.py index 8cb729af..0cfe5096 100644 --- a/src/aleph/sdk/conf.py +++ b/src/aleph/sdk/conf.py @@ -191,4 +191,3 @@ class Config: field = field.lower() settings.CHAINS[chain].__dict__[field] = value settings.__delattr__(f"CHAINS_{fields}") -print(f"Private key file is set to: {settings.PRIVATE_KEY_FILE}") From 96bf68d0712b76c65e5ba7310055be81190fafd9 Mon Sep 17 00:00:00 2001 From: "Andres D. Molins" Date: Wed, 2 Oct 2024 11:59:10 +0200 Subject: [PATCH 12/12] Fix: Improve code structure in pair-programming with Lyam --- .gitignore | 2 +- src/aleph/sdk/account.py | 175 +++++++------------------------- src/aleph/sdk/chains/common.py | 15 --- src/aleph/sdk/chains/solana.py | 6 +- src/aleph/sdk/conf.py | 55 +++++++++- src/aleph/sdk/types.py | 15 +-- src/aleph/sdk/utils.py | 49 +-------- tests/unit/test_chain_solana.py | 148 ++++++++++----------------- tests/unit/test_utils.py | 108 -------------------- 9 files changed, 146 insertions(+), 427 deletions(-) diff --git a/.gitignore b/.gitignore index 2896a4e6..f18f4bd6 100644 --- a/.gitignore +++ b/.gitignore @@ -50,7 +50,7 @@ MANIFEST **/device.key # environment variables -.env +.config.json .env.local .gitsigners diff --git a/src/aleph/sdk/account.py b/src/aleph/sdk/account.py index e4776d76..8c067283 100644 --- a/src/aleph/sdk/account.py +++ b/src/aleph/sdk/account.py @@ -1,102 +1,30 @@ import asyncio -import json import logging from pathlib import Path -from typing import Dict, List, Optional, Type, TypeVar, Union, overload +from typing import Dict, Optional, Type, TypeVar -import base58 from aleph_message.models import Chain from aleph.sdk.chains.common import get_fallback_private_key from aleph.sdk.chains.ethereum import ETHAccount from aleph.sdk.chains.remote import RemoteAccount -from aleph.sdk.chains.solana import ( - SOLAccount, - parse_solana_private_key, - solana_private_key_from_bytes, -) -from aleph.sdk.conf import settings +from aleph.sdk.chains.solana import SOLAccount +from aleph.sdk.conf import load_main_configuration, settings from aleph.sdk.types import AccountFromPrivateKey -from aleph.sdk.utils import load_account_key_context logger = logging.getLogger(__name__) T = TypeVar("T", bound=AccountFromPrivateKey) -CHAIN_TO_ACCOUNT_MAP: Dict[Chain, Type[AccountFromPrivateKey]] = { - Chain.ETH: ETHAccount, - Chain.AVAX: ETHAccount, - Chain.SOL: SOLAccount, - Chain.BASE: ETHAccount, -} - -def detect_chain_from_private_key(private_key: Union[str, List[int], bytes]) -> Chain: - """ - Detect the blockchain chain based on the private key format. - - Chain.ETH for Ethereum (EVM) private keys - - Chain.SOL for Solana private keys (base58 or uint8 format). - - Raises: - ValueError: If the private key format is invalid or not recognized. - """ - if isinstance(private_key, (str, bytes)) and is_valid_private_key( - private_key, ETHAccount - ): - return Chain.ETH - - elif is_valid_private_key(private_key, SOLAccount): - return Chain.SOL - - else: - raise ValueError("Unsupported private key format. Unable to detect chain.") - - -@overload -def is_valid_private_key( - private_key: Union[str, bytes], account_type: Type[ETHAccount] -) -> bool: ... - - -@overload -def is_valid_private_key( - private_key: Union[str, List[int], bytes], account_type: Type[SOLAccount] -) -> bool: ... - - -def is_valid_private_key( - private_key: Union[str, List[int], bytes], account_type: Type[T] -) -> bool: - """ - Check if the private key is valid for either Ethereum or Solana based on the account type. - """ - try: - if account_type == ETHAccount: - # Handle Ethereum private key validation - if isinstance(private_key, str): - if private_key.startswith("0x"): - private_key = private_key[2:] - private_key = bytes.fromhex(private_key) - elif isinstance(private_key, list): - raise ValueError("Ethereum keys cannot be a list of integers") - - account_type(private_key) - - elif account_type == SOLAccount: - # Handle Solana private key validation - if isinstance(private_key, bytes): - return len(private_key) == 64 - elif isinstance(private_key, str): - decoded_key = base58.b58decode(private_key) - return len(decoded_key) == 64 - elif isinstance(private_key, list): - return len(private_key) == 64 and all( - isinstance(i, int) and 0 <= i <= 255 for i in private_key - ) - - return True - except Exception: - return False +def load_chain_account_type(chain: Chain) -> Type[AccountFromPrivateKey]: + chain_account_map: Dict[Chain, Type[AccountFromPrivateKey]] = { + Chain.ETH: ETHAccount, + Chain.AVAX: ETHAccount, + Chain.SOL: SOLAccount, + Chain.BASE: ETHAccount, + } + return chain_account_map.get(chain) or ETHAccount def account_from_hex_string(private_key_str: str, account_type: Type[T]) -> T: @@ -107,72 +35,42 @@ def account_from_hex_string(private_key_str: str, account_type: Type[T]) -> T: def account_from_file(private_key_path: Path, account_type: Type[T]) -> T: private_key = private_key_path.read_bytes() - if account_type == SOLAccount: - private_key = parse_solana_private_key( - solana_private_key_from_bytes(private_key) - ) - return account_type(private_key) def _load_account( private_key_str: Optional[str] = None, private_key_path: Optional[Path] = None, - account_type: Type[AccountFromPrivateKey] = ETHAccount, + account_type: Optional[Type[AccountFromPrivateKey]] = None, ) -> AccountFromPrivateKey: """Load private key from a string or a file. takes the string argument in priority""" - - if private_key_str: - # Check Account type based on private-key string format (base58 / uint for solana) - private_key_chain = detect_chain_from_private_key(private_key=private_key_str) - if private_key_chain == Chain.SOL: - account_type = SOLAccount - logger.debug("Solana private key is detected") - parsed_key = parse_solana_private_key(private_key_str) - return account_type(parsed_key) - logger.debug("Using account from string") - return account_from_hex_string(private_key_str, account_type) - elif private_key_path and private_key_path.is_file(): - if private_key_path: - account_type = ETHAccount # Default account type - - try: - account_data = load_account_key_context(settings.CONFIG_FILE) - - if account_data: - chain = Chain(account_data.chain) - account_type = ( - CHAIN_TO_ACCOUNT_MAP.get(chain, ETHAccount) or ETHAccount - ) - logger.debug( - f"Detected {chain} account for path {private_key_path}" - ) - else: - logger.warning( - f"No account data found in {private_key_path}, defaulting to {account_type.__name__}" - ) - - except FileNotFoundError: - logger.warning( - f"{private_key_path} not found, using default account type {account_type.__name__}" + if private_key_str or (private_key_path and private_key_path.is_file()): + if account_type: + if private_key_path and private_key_path.is_file(): + return account_from_file(private_key_path, account_type) + elif private_key_str: + return account_from_hex_string(private_key_str, account_type) + else: + raise ValueError("Any private key specified") + else: + main_configuration = load_main_configuration(settings.CONFIG_FILE) + if main_configuration: + account_type = load_chain_account_type(main_configuration.chain) + logger.debug( + f"Detected {main_configuration.chain} account for path {settings.CONFIG_FILE}" ) - except json.JSONDecodeError: - logger.error( - f"Invalid format in {private_key_path}, unable to load account info." - ) - raise ValueError(f"Invalid format in {private_key_path}.") - except KeyError as e: - logger.error(f"Missing key in account config: {e}") - raise ValueError( - f"Invalid account data in {private_key_path}. Key {e} is missing." - ) - except Exception as e: - logger.error(f"Error loading account from {private_key_path}: {e}") - raise ValueError( - f"Could not load account data from {private_key_path}." + else: + account_type = ETHAccount # Defaults to ETHAccount + logger.warning( + f"No main configuration data found in {settings.CONFIG_FILE}, defaulting to {account_type.__name__}" ) + if private_key_path and private_key_path.is_file(): + return account_from_file(private_key_path, account_type) + elif private_key_str: + return account_from_hex_string(private_key_str, account_type) + else: + raise ValueError("Any private key specified") - return account_from_file(private_key_path, account_type) elif settings.REMOTE_CRYPTO_HOST: logger.debug("Using remote account") loop = asyncio.get_event_loop() @@ -183,6 +81,7 @@ def _load_account( ) ) else: + account_type = ETHAccount # Defaults to ETHAccount new_private_key = get_fallback_private_key() account = account_type(private_key=new_private_key) logger.info( diff --git a/src/aleph/sdk/chains/common.py b/src/aleph/sdk/chains/common.py index 6087ece6..0a90183c 100644 --- a/src/aleph/sdk/chains/common.py +++ b/src/aleph/sdk/chains/common.py @@ -3,7 +3,6 @@ from pathlib import Path from typing import Dict, Optional -import nacl.signing from coincurve.keys import PrivateKey from typing_extensions import deprecated @@ -150,20 +149,6 @@ def generate_key() -> bytes: return privkey.secret -def generate_key_solana() -> bytes: - """ - Generate a new Solana private key (32 bytes) using Ed25519. - - Returns: - A bytes object representing the 32-byte Solana private key. - """ - # Generate a new signing key (this is the private key part) - private_key = nacl.signing.SigningKey.generate() - - # Return only the private key (32 bytes) - return private_key.encode() - - def get_fallback_private_key(path: Optional[Path] = None) -> bytes: path = path or settings.PRIVATE_KEY_FILE private_key: bytes diff --git a/src/aleph/sdk/chains/solana.py b/src/aleph/sdk/chains/solana.py index 394cb987..a9352489 100644 --- a/src/aleph/sdk/chains/solana.py +++ b/src/aleph/sdk/chains/solana.py @@ -22,7 +22,7 @@ class SOLAccount(BaseAccount): _private_key: PrivateKey def __init__(self, private_key: bytes): - self.private_key = private_key + self.private_key = parse_private_key(private_key_from_bytes(private_key)) self._signing_key = SigningKey(self.private_key) self._private_key = self._signing_key.to_curve25519_private_key() @@ -93,7 +93,7 @@ def verify_signature( raise BadSignatureError from e -def solana_private_key_from_bytes( +def private_key_from_bytes( private_key_bytes: bytes, output_format: str = "base58" ) -> Union[str, List[int], bytes]: """ @@ -132,7 +132,7 @@ def solana_private_key_from_bytes( raise ValueError("Invalid output format. Choose 'base58', 'list', or 'bytes'.") -def parse_solana_private_key(private_key: Union[str, List[int], bytes]) -> bytes: +def parse_private_key(private_key: Union[str, List[int], bytes]) -> bytes: """ Parse the private key which could be either: - a base58-encoded string (which may contain both private and public key) diff --git a/src/aleph/sdk/conf.py b/src/aleph/sdk/conf.py index 0cfe5096..114652b7 100644 --- a/src/aleph/sdk/conf.py +++ b/src/aleph/sdk/conf.py @@ -1,4 +1,5 @@ import json +import logging import os from pathlib import Path from shutil import which @@ -6,16 +7,18 @@ from aleph_message.models import Chain from aleph_message.models.execution.environment import HypervisorType -from pydantic import BaseSettings, Field +from pydantic import BaseModel, BaseSettings, Field from aleph.sdk.types import ChainInfo +logger = logging.getLogger(__name__) + class Settings(BaseSettings): CONFIG_HOME: Optional[str] = None CONFIG_FILE: Path = Field( - default=Path("chains_config.json"), + default=Path("config.json"), description="Path to the JSON file containing chain account configurations", ) @@ -145,6 +148,18 @@ class Config: env_file = ".env" +class MainConfiguration(BaseModel): + """ + Intern Chain Management with Account. + """ + + path: Path + chain: Chain + + class Config: + use_enum_values = True + + # Settings singleton settings = Settings() @@ -168,8 +183,8 @@ class Config: settings.PRIVATE_MNEMONIC_FILE = Path( settings.CONFIG_HOME, "private-keys", "substrate.mnemonic" ) -if str(settings.CONFIG_FILE) == "chains_config.json": - settings.CONFIG_FILE = Path(settings.CONFIG_HOME, "chains_config.json") +if str(settings.CONFIG_FILE) == "config.json": + settings.CONFIG_FILE = Path(settings.CONFIG_HOME, "config.json") # If Config file exist and well filled we update the PRIVATE_KEY_FILE default if settings.CONFIG_FILE.exists(): try: @@ -191,3 +206,35 @@ class Config: field = field.lower() settings.CHAINS[chain].__dict__[field] = value settings.__delattr__(f"CHAINS_{fields}") + + +def save_main_configuration(file_path: Path, data: MainConfiguration): + """ + Synchronously save a single ChainAccount object as JSON to a file. + """ + with file_path.open("w") as file: + data_serializable = data.dict() + data_serializable["path"] = str(data_serializable["path"]) + json.dump(data_serializable, file, indent=4) + + +def load_main_configuration(file_path: Path) -> Optional[MainConfiguration]: + """ + Synchronously load the private key and chain type from a file. + If the file does not exist or is empty, return None. + """ + if not file_path.exists() or file_path.stat().st_size == 0: + logger.debug(f"File {file_path} does not exist or is empty. Returning None.") + return None + + try: + with file_path.open("rb") as file: + content = file.read() + data = json.loads(content.decode("utf-8")) + return MainConfiguration(**data) + except UnicodeDecodeError as e: + logger.error(f"Unable to decode {file_path} as UTF-8: {e}") + except json.JSONDecodeError: + logger.error(f"Invalid JSON format in {file_path}.") + + return None diff --git a/src/aleph/sdk/types.py b/src/aleph/sdk/types.py index d86e1fdd..081a3465 100644 --- a/src/aleph/sdk/types.py +++ b/src/aleph/sdk/types.py @@ -1,13 +1,12 @@ from abc import abstractmethod from enum import Enum -from pathlib import Path from typing import Dict, Optional, Protocol, TypeVar from pydantic import BaseModel __all__ = ("StorageEnum", "Account", "AccountFromPrivateKey", "GenericMessage") -from aleph_message.models import AlephMessage, Chain +from aleph_message.models import AlephMessage class StorageEnum(str, Enum): @@ -77,15 +76,3 @@ class ChainInfo(BaseModel): token: str super_token: Optional[str] = None active: bool = True - - -class ChainAccount(BaseModel): - """ - Intern Chain Management with Account. - """ - - path: Path - chain: Chain - - class Config: - use_enum_values = True diff --git a/src/aleph/sdk/utils.py b/src/aleph/sdk/utils.py index 962672c1..116c7b42 100644 --- a/src/aleph/sdk/utils.py +++ b/src/aleph/sdk/utils.py @@ -37,7 +37,7 @@ from pydantic.json import pydantic_encoder from aleph.sdk.conf import settings -from aleph.sdk.types import ChainAccount, GenericMessage, SEVInfo, SEVMeasurement +from aleph.sdk.types import GenericMessage, SEVInfo, SEVMeasurement logger = logging.getLogger(__name__) @@ -393,50 +393,3 @@ def make_packet_header( header[20:52] = h.digest() return header - - -def load_account_key_context(file_path: Path) -> Optional[ChainAccount]: - """ - Synchronously load the private key and chain type from a file. - If the file does not exist or is empty, return None. - """ - if not file_path.exists() or file_path.stat().st_size == 0: - logger.debug(f"File {file_path} does not exist or is empty. Returning None.") - return None - - try: - with file_path.open("rb") as file: - content = file.read() - data = json.loads(content.decode("utf-8")) - return ChainAccount(**data) - except UnicodeDecodeError as e: - logger.error(f"Unable to decode {file_path} as UTF-8: {e}") - raise ValueError(f"File {file_path} is not properly encoded.") - except json.JSONDecodeError: - logger.error(f"Invalid JSON format in {file_path}.") - raise ValueError(f"Invalid format in {file_path}.") - - -def save_account_key_context(file_path: Path, data: ChainAccount): - """ - Synchronously save a single ChainAccount object as JSON to a file. - """ - with file_path.open("w") as file: - data_serializable = data.dict() - data_serializable["path"] = str(data_serializable["path"]) - json.dump(data_serializable, file, indent=4) - - -async def upsert_chain_account(new_account: ChainAccount, config_file: Path): - """ - Update the chain account in the config file, replacing the existing one if necessary. - - If the file doesn't exist, create it and add the new account. - """ - - account_data = {"path": str(new_account.path), "chain": new_account.chain} - - with config_file.open("w", encoding="utf-8") as f: - json.dump(account_data, f, indent=4) - - logger.debug(f"Replaced account in {config_file} with {new_account.path}.") diff --git a/tests/unit/test_chain_solana.py b/tests/unit/test_chain_solana.py index aeb4b55a..0fbd717e 100644 --- a/tests/unit/test_chain_solana.py +++ b/tests/unit/test_chain_solana.py @@ -5,15 +5,13 @@ import base58 import pytest -from aleph_message.models import Chain from nacl.signing import VerifyKey -from aleph.sdk.account import detect_chain_from_private_key from aleph.sdk.chains.common import get_verification_buffer from aleph.sdk.chains.solana import ( SOLAccount, get_fallback_account, - parse_solana_private_key, + parse_private_key, verify_signature, ) from aleph.sdk.exceptions import BadSignatureError @@ -145,96 +143,54 @@ async def test_sign_raw(solana_account): verify_signature(signature, solana_account.get_address(), buffer) -def test_parse_private_key_base58(): - base58_key = "9beEbjn1Md7prQbH9kk9HjTM3npbj1S49BJQpSpYJKvnfATP8Eki9ofaq19tAzpijjV4TyTtibXGBkRjFnmTkiD" - private_key_bytes = parse_solana_private_key(base58_key) - - assert isinstance(private_key_bytes, bytes) - assert len(private_key_bytes) == 32 - - account = SOLAccount(private_key_bytes) - - assert account.get_address() == "8XnzZVqAD1GUEYjQvsyURG36F7ZEhyDGpvYD68TSkSLy" - assert detect_chain_from_private_key(base58_key) == Chain.SOL - assert isinstance(account.get_address(), str) - assert len(account.get_address()) > 0 - - -def test_parse_private_key_uint8_array(): - uint8_array_key = [ - 73, - 6, - 73, - 131, - 134, - 65, - 155, - 206, - 87, - 203, - 226, - 184, - 174, - 66, - 214, - 252, - 201, - 188, - 56, - 102, - 241, - 81, - 21, - 30, - 150, - 55, - 134, - 252, - 138, - 137, - 174, - 163, - 89, - 90, - 53, - 40, - 237, - 153, - 99, - 127, - 220, - 233, - 29, - 48, - 180, - 199, - 18, - 225, - 249, - 163, - 140, - 157, - 201, - 74, - 221, - 176, - 229, - 6, - 182, - 226, - 74, - 243, - 193, - 143, - ] - private_key_bytes = parse_solana_private_key(uint8_array_key) - - assert isinstance(private_key_bytes, bytes) - assert len(private_key_bytes) == 32 - - account = SOLAccount(private_key_bytes) - - assert account.get_address() == "71o4nN2BgB8MdD771U5VAPBj8jwufxkYJZwNnCr81VwL" - assert detect_chain_from_private_key(uint8_array_key) == Chain.SOL - assert isinstance(account.get_address(), str) - assert len(account.get_address()) > 0 +def test_parse_solana_private_key_bytes(): + # Valid 32-byte private key + private_key_bytes = bytes(range(32)) + parsed_key = parse_private_key(private_key_bytes) + assert isinstance(parsed_key, bytes) + assert len(parsed_key) == 32 + assert parsed_key == private_key_bytes + + # Invalid private key (too short) + with pytest.raises( + ValueError, match="The private key in bytes must be exactly 32 bytes long." + ): + parse_private_key(bytes(range(31))) + + +def test_parse_solana_private_key_base58(): + # Valid base58 private key (32 bytes) + base58_key = base58.b58encode(bytes(range(32))).decode("utf-8") + parsed_key = parse_private_key(base58_key) + assert isinstance(parsed_key, bytes) + assert len(parsed_key) == 32 + + # Invalid base58 key (not decodable) + with pytest.raises(ValueError, match="Invalid base58 encoded private key"): + parse_private_key("invalid_base58_key") + + # Invalid base58 key (wrong length) + with pytest.raises( + ValueError, + match="The base58 decoded private key must be either 32 or 64 bytes long.", + ): + parse_private_key(base58.b58encode(bytes(range(31))).decode("utf-8")) + + +def test_parse_solana_private_key_list(): + # Valid list of uint8 integers (64 elements, but we only take the first 32 for private key) + uint8_list = list(range(64)) + parsed_key = parse_private_key(uint8_list) + assert isinstance(parsed_key, bytes) + assert len(parsed_key) == 32 + assert parsed_key == bytes(range(32)) + + # Invalid list (contains non-integers) + with pytest.raises(ValueError, match="Invalid uint8 array"): + parse_private_key([1, 2, "not an int", 4]) # type: ignore # Ignore type check for string + + # Invalid list (less than 32 elements) + with pytest.raises( + ValueError, match="The uint8 array must contain at least 32 elements." + ): + parse_private_key(list(range(31))) diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index bf4c43ff..bfca23a5 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -1,7 +1,6 @@ import base64 import datetime -import base58 import pytest as pytest from aleph_message.models import ( AggregateMessage, @@ -20,9 +19,6 @@ PersistentVolume, ) -from aleph.sdk.account import detect_chain_from_private_key, is_valid_private_key -from aleph.sdk.chains.ethereum import ETHAccount -from aleph.sdk.chains.solana import SOLAccount, parse_solana_private_key from aleph.sdk.types import SEVInfo from aleph.sdk.utils import ( calculate_firmware_hash, @@ -239,107 +235,3 @@ def test_compute_confidential_measure(): ) == b"ls2jv10V3HVShVI/RHCo/a43WO0soLZf0huU9ZZstIw=" ) - - -def test_parse_solana_private_key_bytes(): - # Valid 32-byte private key - private_key_bytes = bytes(range(32)) - parsed_key = parse_solana_private_key(private_key_bytes) - assert isinstance(parsed_key, bytes) - assert len(parsed_key) == 32 - assert parsed_key == private_key_bytes - - # Invalid private key (too short) - with pytest.raises( - ValueError, match="The private key in bytes must be exactly 32 bytes long." - ): - parse_solana_private_key(bytes(range(31))) - - -def test_parse_solana_private_key_base58(): - # Valid base58 private key (32 bytes) - base58_key = base58.b58encode(bytes(range(32))).decode("utf-8") - parsed_key = parse_solana_private_key(base58_key) - assert isinstance(parsed_key, bytes) - assert len(parsed_key) == 32 - - # Invalid base58 key (not decodable) - with pytest.raises(ValueError, match="Invalid base58 encoded private key"): - parse_solana_private_key("invalid_base58_key") - - # Invalid base58 key (wrong length) - with pytest.raises( - ValueError, - match="The base58 decoded private key must be either 32 or 64 bytes long.", - ): - parse_solana_private_key(base58.b58encode(bytes(range(31))).decode("utf-8")) - - -def test_parse_solana_private_key_list(): - # Valid list of uint8 integers (64 elements, but we only take the first 32 for private key) - uint8_list = list(range(64)) - parsed_key = parse_solana_private_key(uint8_list) - assert isinstance(parsed_key, bytes) - assert len(parsed_key) == 32 - assert parsed_key == bytes(range(32)) - - # Invalid list (contains non-integers) - with pytest.raises(ValueError, match="Invalid uint8 array"): - parse_solana_private_key([1, 2, "not an int", 4]) # type: ignore # Ignore type check for string - - # Invalid list (less than 32 elements) - with pytest.raises( - ValueError, match="The uint8 array must contain at least 32 elements." - ): - parse_solana_private_key(list(range(31))) - - -def test_is_solana_private_key(): - sol_key = base58.b58encode(bytes(range(64))).decode("utf-8") - assert is_valid_private_key(sol_key, SOLAccount) is True - - short_sol_key = base58.b58encode(bytes(range(32))).decode("utf-8") - assert is_valid_private_key(short_sol_key, SOLAccount) is False - - sol_key_list = list(range(64)) - assert is_valid_private_key(sol_key_list, SOLAccount) is True - - short_sol_key_list = list(range(32)) - assert is_valid_private_key(short_sol_key_list, SOLAccount) is False - - sol_key_bytes = bytes(range(64)) - assert is_valid_private_key(sol_key_bytes, SOLAccount) is True - - short_sol_key_bytes = bytes(range(32)) - assert is_valid_private_key(short_sol_key_bytes, SOLAccount) is False - - -def test_detect_chain_from_private_key(): - eth_key = "0x" + "a" * 64 - assert detect_chain_from_private_key(eth_key) == Chain.ETH - - sol_key = base58.b58encode(bytes(range(64))).decode("utf-8") - assert detect_chain_from_private_key(sol_key) == Chain.SOL - - sol_key_list = list(range(64)) - assert detect_chain_from_private_key(sol_key_list) == Chain.SOL - - with pytest.raises(ValueError, match="Unsupported private key format"): - detect_chain_from_private_key("invalid_key") - - -def test_is_eth_private_key(): - eth_key = "0x" + "a" * 64 - assert is_valid_private_key(eth_key, ETHAccount) is True - - eth_key_no_prefix = "a" * 64 - assert is_valid_private_key(eth_key_no_prefix, ETHAccount) is True - - assert is_valid_private_key("a" * 63, ETHAccount) is False - - assert is_valid_private_key("zz" * 32, ETHAccount) is False - - eth_key_bytes = bytes(range(32)) - assert is_valid_private_key(eth_key_bytes, ETHAccount) is True - - assert is_valid_private_key(bytes(range(31)), ETHAccount) is False