Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ dependencies = [
"aleph-superfluid>=0.2.1",
"eth_typing==4.3.1",
"web3==6.3.0",
"base58==2.1.1", # Needed now as default with _load_account changement
"pynacl==1.5.0" # Needed now as default with _load_account changement
]

[project.optional-dependencies]
Expand Down
139 changes: 137 additions & 2 deletions src/aleph/sdk/account.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,103 @@
import asyncio
import json
import logging
from pathlib import Path
from typing import Optional, Type, TypeVar
from typing import Dict, List, Optional, Type, TypeVar, Union, overload

import base58
from aleph_message.models import Chain

from aleph.sdk.chains.common import get_fallback_private_key
from aleph.sdk.chains.ethereum import ETHAccount
from aleph.sdk.chains.remote import RemoteAccount
from aleph.sdk.chains.solana import (
SOLAccount,
parse_solana_private_key,
solana_private_key_from_bytes,
)
from aleph.sdk.conf import settings
from aleph.sdk.types import AccountFromPrivateKey
from aleph.sdk.utils import load_account_key_context

logger = logging.getLogger(__name__)

T = TypeVar("T", bound=AccountFromPrivateKey)

CHAIN_TO_ACCOUNT_MAP: Dict[Chain, Type[AccountFromPrivateKey]] = {
Chain.ETH: ETHAccount,
Chain.AVAX: ETHAccount,
Chain.SOL: SOLAccount,
Chain.BASE: ETHAccount,
}


def detect_chain_from_private_key(private_key: Union[str, List[int], bytes]) -> Chain:
"""
Detect the blockchain chain based on the private key format.
- Chain.ETH for Ethereum (EVM) private keys
- Chain.SOL for Solana private keys (base58 or uint8 format).

Raises:
ValueError: If the private key format is invalid or not recognized.
"""
if isinstance(private_key, (str, bytes)) and is_valid_private_key(
private_key, ETHAccount
):
return Chain.ETH

elif is_valid_private_key(private_key, SOLAccount):
return Chain.SOL

else:
raise ValueError("Unsupported private key format. Unable to detect chain.")


@overload
def is_valid_private_key(
private_key: Union[str, bytes], account_type: Type[ETHAccount]
) -> bool: ...


@overload
def is_valid_private_key(
private_key: Union[str, List[int], bytes], account_type: Type[SOLAccount]
) -> bool: ...


def is_valid_private_key(
private_key: Union[str, List[int], bytes], account_type: Type[T]
) -> bool:
"""
Check if the private key is valid for either Ethereum or Solana based on the account type.
"""
try:
if account_type == ETHAccount:
# Handle Ethereum private key validation
if isinstance(private_key, str):
if private_key.startswith("0x"):
private_key = private_key[2:]
private_key = bytes.fromhex(private_key)
elif isinstance(private_key, list):
raise ValueError("Ethereum keys cannot be a list of integers")

account_type(private_key)

elif account_type == SOLAccount:
# Handle Solana private key validation
if isinstance(private_key, bytes):
return len(private_key) == 64
elif isinstance(private_key, str):
decoded_key = base58.b58decode(private_key)
return len(decoded_key) == 64
elif isinstance(private_key, list):
return len(private_key) == 64 and all(
isinstance(i, int) and 0 <= i <= 255 for i in private_key
)

return True
except Exception:
return False


def account_from_hex_string(private_key_str: str, account_type: Type[T]) -> T:
if private_key_str.startswith("0x"):
Expand All @@ -22,6 +107,11 @@ def account_from_hex_string(private_key_str: str, account_type: Type[T]) -> T:

def account_from_file(private_key_path: Path, account_type: Type[T]) -> T:
private_key = private_key_path.read_bytes()
if account_type == SOLAccount:
private_key = parse_solana_private_key(
solana_private_key_from_bytes(private_key)
)

return account_type(private_key)


Expand All @@ -33,10 +123,55 @@ def _load_account(
"""Load private key from a string or a file. takes the string argument in priority"""

if private_key_str:
# Check Account type based on private-key string format (base58 / uint for solana)
private_key_chain = detect_chain_from_private_key(private_key=private_key_str)
if private_key_chain == Chain.SOL:
account_type = SOLAccount
logger.debug("Solana private key is detected")
parsed_key = parse_solana_private_key(private_key_str)
return account_type(parsed_key)
logger.debug("Using account from string")
return account_from_hex_string(private_key_str, account_type)
elif private_key_path and private_key_path.is_file():
logger.debug("Using account from file")
if private_key_path:
account_type = ETHAccount # Default account type

try:
account_data = load_account_key_context(settings.CONFIG_FILE)

if account_data:
chain = Chain(account_data.chain)
account_type = (
CHAIN_TO_ACCOUNT_MAP.get(chain, ETHAccount) or ETHAccount
)
logger.debug(
f"Detected {chain} account for path {private_key_path}"
)
else:
logger.warning(
f"No account data found in {private_key_path}, defaulting to {account_type.__name__}"
)

except FileNotFoundError:
logger.warning(
f"{private_key_path} not found, using default account type {account_type.__name__}"
)
except json.JSONDecodeError:
logger.error(
f"Invalid format in {private_key_path}, unable to load account info."
)
raise ValueError(f"Invalid format in {private_key_path}.")
except KeyError as e:
logger.error(f"Missing key in account config: {e}")
raise ValueError(
f"Invalid account data in {private_key_path}. Key {e} is missing."
)
except Exception as e:
logger.error(f"Error loading account from {private_key_path}: {e}")
raise ValueError(
f"Could not load account data from {private_key_path}."
)

return account_from_file(private_key_path, account_type)
elif settings.REMOTE_CRYPTO_HOST:
logger.debug("Using remote account")
Expand Down
15 changes: 15 additions & 0 deletions src/aleph/sdk/chains/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pathlib import Path
from typing import Dict, Optional

import nacl.signing
from coincurve.keys import PrivateKey
from typing_extensions import deprecated

Expand Down Expand Up @@ -149,6 +150,20 @@ def generate_key() -> bytes:
return privkey.secret


def generate_key_solana() -> bytes:
"""
Generate a new Solana private key (32 bytes) using Ed25519.

Returns:
A bytes object representing the 32-byte Solana private key.
"""
# Generate a new signing key (this is the private key part)
private_key = nacl.signing.SigningKey.generate()

# Return only the private key (32 bytes)
return private_key.encode()


def get_fallback_private_key(path: Optional[Path] = None) -> bytes:
path = path or settings.PRIVATE_KEY_FILE
private_key: bytes
Expand Down
92 changes: 90 additions & 2 deletions src/aleph/sdk/chains/solana.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
from pathlib import Path
from typing import Dict, Optional, Union
from typing import Dict, List, Optional, Union

import base58
from nacl.exceptions import BadSignatureError as NaclBadSignatureError
Expand Down Expand Up @@ -79,7 +79,7 @@ def verify_signature(
public_key: The public key to use for verification. Can be a base58 encoded string or bytes.
message: The message to verify. Can be an utf-8 string or bytes.
Raises:
BadSignatureError: If the signature is invalid.
BadSignatureError: If the signature is invalid.!
"""
if isinstance(signature, str):
signature = base58.b58decode(signature)
Expand All @@ -91,3 +91,91 @@ def verify_signature(
VerifyKey(public_key).verify(message, signature)
except NaclBadSignatureError as e:
raise BadSignatureError from e


def solana_private_key_from_bytes(
private_key_bytes: bytes, output_format: str = "base58"
) -> Union[str, List[int], bytes]:
"""
Convert a Solana private key in bytes back to different formats (base58 string, uint8 list, or raw bytes).

- For base58 string: Encode the bytes into a base58 string.
- For uint8 list: Convert the bytes into a list of integers.
- For raw bytes: Return as-is.

Args:
private_key_bytes (bytes): The private key in byte format.
output_format (str): The format to return ('base58', 'list', 'bytes').

Returns:
The private key in the requested format.

Raises:
ValueError: If the output_format is not recognized or the private key length is invalid.
"""
if not isinstance(private_key_bytes, bytes):
raise ValueError("Expected the private key in bytes.")

if len(private_key_bytes) != 32:
raise ValueError("Solana private key must be exactly 32 bytes long.")

if output_format == "base58":
return base58.b58encode(private_key_bytes).decode("utf-8")

elif output_format == "list":
return list(private_key_bytes)

elif output_format == "bytes":
return private_key_bytes

else:
raise ValueError("Invalid output format. Choose 'base58', 'list', or 'bytes'.")


def parse_solana_private_key(private_key: Union[str, List[int], bytes]) -> bytes:
"""
Parse the private key which could be either:
- a base58-encoded string (which may contain both private and public key)
- a list of uint8 integers (which may contain both private and public key)
- a byte array (exactly 32 bytes)

Returns:
bytes: The private key in byte format (32 bytes).

Raises:
ValueError: If the private key format is invalid or the length is incorrect.
"""
# If the private key is already in byte format
if isinstance(private_key, bytes):
if len(private_key) != 32:
raise ValueError("The private key in bytes must be exactly 32 bytes long.")
return private_key

# If the private key is a base58-encoded string
elif isinstance(private_key, str):
try:
decoded_key = base58.b58decode(private_key)
if len(decoded_key) not in [32, 64]:
raise ValueError(
"The base58 decoded private key must be either 32 or 64 bytes long."
)
return decoded_key[:32]
except Exception as e:
raise ValueError(f"Invalid base58 encoded private key: {e}")

# If the private key is a list of uint8 integers
elif isinstance(private_key, list):
if all(isinstance(i, int) and 0 <= i <= 255 for i in private_key):
byte_key = bytes(private_key)
if len(byte_key) < 32:
raise ValueError("The uint8 array must contain at least 32 elements.")
return byte_key[:32] # Take the first 32 bytes (private key)
else:
raise ValueError(
"Invalid uint8 array, must contain integers between 0 and 255."
)

else:
raise ValueError(
"Unsupported private key format. Must be a base58 string, bytes, or a list of uint8 integers."
)
19 changes: 19 additions & 0 deletions src/aleph/sdk/conf.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import os
from pathlib import Path
from shutil import which
Expand All @@ -13,6 +14,11 @@
class Settings(BaseSettings):
CONFIG_HOME: Optional[str] = None

CONFIG_FILE: Path = Field(
default=Path("chains_config.json"),
description="Path to the JSON file containing chain account configurations",
)

# In case the user does not want to bother with handling private keys himself,
# do an ugly and insecure write and read from disk to this file.
PRIVATE_KEY_FILE: Path = Field(
Expand Down Expand Up @@ -162,6 +168,19 @@ class Config:
settings.PRIVATE_MNEMONIC_FILE = Path(
settings.CONFIG_HOME, "private-keys", "substrate.mnemonic"
)
if str(settings.CONFIG_FILE) == "chains_config.json":
settings.CONFIG_FILE = Path(settings.CONFIG_HOME, "chains_config.json")
# If Config file exist and well filled we update the PRIVATE_KEY_FILE default
if settings.CONFIG_FILE.exists():
try:
with open(settings.CONFIG_FILE, "r", encoding="utf-8") as f:
config_data = json.load(f)

if "path" in config_data:
settings.PRIVATE_KEY_FILE = Path(config_data["path"])
except json.JSONDecodeError:
pass


# Update CHAINS settings and remove placeholders
CHAINS_ENV = [(key[7:], value) for key, value in settings if key.startswith("CHAINS_")]
Expand Down
15 changes: 14 additions & 1 deletion src/aleph/sdk/types.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from abc import abstractmethod
from enum import Enum
from pathlib import Path
from typing import Dict, Optional, Protocol, TypeVar

from pydantic import BaseModel

__all__ = ("StorageEnum", "Account", "AccountFromPrivateKey", "GenericMessage")

from aleph_message.models import AlephMessage
from aleph_message.models import AlephMessage, Chain


class StorageEnum(str, Enum):
Expand Down Expand Up @@ -76,3 +77,15 @@ class ChainInfo(BaseModel):
token: str
super_token: Optional[str] = None
active: bool = True


class ChainAccount(BaseModel):
"""
Intern Chain Management with Account.
"""

path: Path
chain: Chain

class Config:
use_enum_values = True
Loading
Loading