Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,6 @@ keys.json

# Pycharm
.idea

# Vscode
.vscode/
26 changes: 25 additions & 1 deletion hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from hathor.nanocontracts.sorter.types import NCSorterCallable
from hathor.p2p.manager import ConnectionsManager
from hathor.p2p.peer import PrivatePeer
from hathor.p2p.peers_whitelist import PeersWhitelist
from hathor.pubsub import PubSubManager
from hathor.reactor import ReactorProtocol as Reactor
from hathor.storage import RocksDBStorage
Expand Down Expand Up @@ -191,6 +192,7 @@ def __init__(self) -> None:
self._enable_ipv6: bool = False
self._disable_ipv4: bool = False

self._peers_whitelist: PeersWhitelist | None = None
self._nc_anti_mev: bool = True

self._nc_storage_factory: NCStorageFactory | None = None
Expand Down Expand Up @@ -349,6 +351,23 @@ def set_peer(self, peer: PrivatePeer) -> 'Builder':
self._peer = peer
return self

def set_url_whitelist(self, reactor: Reactor, url: str) -> 'Builder':
"""Sets the peers whitelist to a URLPeersWhitelist.

Args:
reactor: The Twisted reactor
url: The URL to fetch the whitelist from (required)
"""
self.check_if_can_modify()
if not url:
raise ValueError('url is required for set_url_whitelist')
from hathor.p2p.peers_whitelist import URLPeersWhitelist
url_peers_whitelist = URLPeersWhitelist(reactor, url, False)
url_peers_whitelist.enable()
# We do not start the URLPeersWhitelist here, as it is started by the ConnectionsManager
self._peers_whitelist = url_peers_whitelist
return self

def _get_or_create_settings(self) -> HathorSettingsType:
"""Return the HathorSettings instance set on this builder, or a new one if not set."""
if self._settings is None:
Expand Down Expand Up @@ -474,7 +493,7 @@ def _get_or_create_p2p_manager(self) -> ConnectionsManager:
my_peer=my_peer,
pubsub=self._get_or_create_pubsub(),
ssl=enable_ssl,
whitelist_only=False,
peers_whitelist=self._peers_whitelist,
rng=self._rng,
enable_ipv6=self._enable_ipv6,
disable_ipv4=self._disable_ipv4,
Expand Down Expand Up @@ -779,6 +798,11 @@ def enable_event_queue(self) -> 'Builder':
self._enable_event_queue = True
return self

def set_whitelist(self, peers_wl: PeersWhitelist | None) -> 'Builder':
self.check_if_can_modify()
self._peers_whitelist = peers_wl
return self

def set_tx_storage(self, tx_storage: TransactionStorage) -> 'Builder':
self.check_if_can_modify()
self._tx_storage = tx_storage
Expand Down
1 change: 0 additions & 1 deletion hathor/conf/mainnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
MULTISIG_VERSION_BYTE=b'\x64',
NETWORK_NAME='mainnet',
BOOTSTRAP_DNS=['mainnet.hathor.network'],
ENABLE_PEER_WHITELIST=True,
WHITELIST_URL='https://hathor-public-files.s3.amazonaws.com/whitelist_peer_ids',
# Genesis stuff
# output addr: HJB2yxxsHtudGGy3jmVeadwMfRi2zNCKKD
Expand Down
1 change: 0 additions & 1 deletion hathor/conf/mainnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ MULTISIG_VERSION_BYTE: x64
NETWORK_NAME: mainnet
BOOTSTRAP_DNS:
- mainnet.hathor.network
ENABLE_PEER_WHITELIST: true
WHITELIST_URL: https://hathor-public-files.s3.amazonaws.com/whitelist_peer_ids

# Genesis stuff
Expand Down
3 changes: 0 additions & 3 deletions hathor/conf/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ class HathorSettings(NamedTuple):
# Initial bootstrap servers
BOOTSTRAP_DNS: list[str] = []

# enable peer whitelist
ENABLE_PEER_WHITELIST: bool = False

# weather to use the whitelist with sync-v2 peers, does not affect whether the whitelist is enabled or not, it will
# always be enabled for sync-v1 if it is enabled
USE_PEER_WHITELIST_ON_SYNC_V2: bool = True
Expand Down
22 changes: 0 additions & 22 deletions hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
from hathor.nanocontracts.storage import NCBlockStorage, NCContractStorage
from hathor.p2p.manager import ConnectionsManager
from hathor.p2p.peer import PrivatePeer
from hathor.p2p.peer_id import PeerId
from hathor.pubsub import EventArguments, HathorEvents, PubSubManager
from hathor.reactor import ReactorProtocol as Reactor
from hathor.reward_lock import is_spent_reward_locked
Expand Down Expand Up @@ -233,9 +232,6 @@ def __init__(
# Thread pool used to resolve pow when sending tokens
self.pow_thread_pool = ThreadPool(minthreads=0, maxthreads=settings.MAX_POW_THREADS, name='Pow thread pool')

# List of whitelisted peers
self.peers_whitelist: list[PeerId] = []

# List of capabilities of the peer
if capabilities is not None:
self.capabilities = capabilities
Expand Down Expand Up @@ -887,24 +883,6 @@ def on_new_tx(
def has_sync_version_capability(self) -> bool:
return self._settings.CAPABILITY_SYNC_VERSION in self.capabilities

def add_peer_to_whitelist(self, peer_id: PeerId) -> None:
if not self._settings.ENABLE_PEER_WHITELIST:
return

if peer_id in self.peers_whitelist:
self.log.info('peer already in whitelist', peer_id=peer_id)
else:
self.peers_whitelist.append(peer_id)

def remove_peer_from_whitelist_and_disconnect(self, peer_id: PeerId) -> None:
if not self._settings.ENABLE_PEER_WHITELIST:
return

if peer_id in self.peers_whitelist:
self.peers_whitelist.remove(peer_id)
# disconnect from node
self.connections.drop_connection_by_peer_id(peer_id)

def has_recent_activity(self) -> bool:
current_timestamp = time.time()
latest_blockchain_timestamp = self.tx_storage.latest_timestamp
Expand Down
136 changes: 63 additions & 73 deletions hathor/p2p/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
from hathor.p2p.peer_endpoint import PeerAddress, PeerEndpoint
from hathor.p2p.peer_id import PeerId
from hathor.p2p.peer_storage import VerifiedPeerStorage
from hathor.p2p.peers_whitelist import PeersWhitelist
from hathor.p2p.protocol import HathorProtocol
from hathor.p2p.rate_limiter import RateLimiter
from hathor.p2p.states.ready import ReadyState
from hathor.p2p.sync_factory import SyncAgentFactory
from hathor.p2p.sync_version import SyncVersion
from hathor.p2p.utils import parse_whitelist
from hathor.pubsub import HathorEvents, PubSubManager
from hathor.reactor import ReactorProtocol as Reactor
from hathor.transaction import BaseTransaction
Expand All @@ -48,9 +48,6 @@

logger = get_logger()

# The timeout in seconds for the whitelist GET request
WHITELIST_REQUEST_TIMEOUT = 45


class _SyncRotateInfo(NamedTuple):
candidates: list[PeerId]
Expand Down Expand Up @@ -85,10 +82,10 @@ class GlobalRateLimiter:
new_connection_from_queue: deque[PeerId]
connecting_peers: dict[IStreamClientEndpoint, _ConnectingPeer]
handshaking_peers: set[HathorProtocol]
whitelist_only: bool
verified_peer_storage: VerifiedPeerStorage
_sync_factories: dict[SyncVersion, SyncAgentFactory]
_enabled_sync_versions: set[SyncVersion]
peers_whitelist: Optional[PeersWhitelist]

rate_limiter: RateLimiter

Expand All @@ -100,7 +97,7 @@ def __init__(
pubsub: PubSubManager,
ssl: bool,
rng: Random,
whitelist_only: bool,
peers_whitelist: PeersWhitelist | None,
enable_ipv6: bool,
disable_ipv4: bool,
) -> None:
Expand Down Expand Up @@ -187,17 +184,12 @@ def __init__(
self.lc_connect.clock = self.reactor
self.lc_connect_interval = 0.2 # seconds

# A timer to try to reconnect to the disconnect known peers.
if self._settings.ENABLE_PEER_WHITELIST:
self.wl_reconnect = LoopingCall(self.update_whitelist)
self.wl_reconnect.clock = self.reactor
# Whitelisted peers.
self.peers_whitelist: PeersWhitelist | None = peers_whitelist

# Pubsub object to publish events
self.pubsub = pubsub

# Parameter to explicitly enable whitelist-only mode, when False it will still check the whitelist for sync-v1
self.whitelist_only = whitelist_only

# Parameter to enable IPv6 connections
self.enable_ipv6 = enable_ipv6

Expand Down Expand Up @@ -273,7 +265,13 @@ def do_discovery(self) -> None:
Do a discovery and connect on all discovery strategies.
"""
for peer_discovery in self.peer_discoveries:
coro = peer_discovery.discover_and_connect(self.connect_to_endpoint)
# Wrap connect_to_endpoint to register bootstrap peer IDs
def connect_with_bootstrap_registration(entrypoint: PeerEndpoint) -> None:
if self.peers_whitelist and entrypoint.peer_id is not None:
self.peers_whitelist.add_bootstrap_peer(entrypoint.peer_id)
self.connect_to_endpoint(entrypoint)

coro = peer_discovery.discover_and_connect(connect_with_bootstrap_registration)
Deferred.fromCoroutine(coro)

def disable_rate_limiter(self) -> None:
Expand All @@ -298,29 +296,14 @@ def start(self) -> None:
self.lc_reconnect.start(5, now=False)
self.lc_sync_update.start(self.lc_sync_update_interval, now=False)

if self._settings.ENABLE_PEER_WHITELIST:
self._start_whitelist_reconnect()
if self.peers_whitelist:
self.peers_whitelist.start(self.drop_connection_by_peer_id)

for description in self.listen_address_descriptions:
self.listen(description)

self.do_discovery()

def _start_whitelist_reconnect(self) -> None:
# The deferred returned by the LoopingCall start method
# executes when the looping call stops running
# https://docs.twistedmatrix.com/en/stable/api/twisted.internet.task.LoopingCall.html
d = self.wl_reconnect.start(30)
d.addErrback(self._handle_whitelist_reconnect_err)

def _handle_whitelist_reconnect_err(self, *args: Any, **kwargs: Any) -> None:
""" This method will be called when an exception happens inside the whitelist update
and ends up stopping the looping call.
We log the error and start the looping call again.
"""
self.log.error('whitelist reconnect had an exception. Start looping call again.', args=args, kwargs=kwargs)
self.reactor.callLater(30, self._start_whitelist_reconnect)

def _start_peer_connect_loop(self) -> None:
# The deferred returned by the LoopingCall start method
# executes when the looping call stops running
Expand Down Expand Up @@ -349,6 +332,9 @@ def stop(self) -> None:
if self.lc_sync_update.running:
self.lc_sync_update.stop()

if self.peers_whitelist:
self.peers_whitelist.stop()

def _get_peers_count(self) -> PeerConnectionsMetrics:
"""Get a dict containing the count of peers in each state"""

Expand Down Expand Up @@ -416,9 +402,9 @@ def on_peer_connect(self, protocol: HathorProtocol) -> None:
self.log.warn('reached maximum number of connections', max_connections=self.max_connections)
protocol.disconnect(force=True)
return

self.connections.add(protocol)
self.handshaking_peers.add(protocol)

self.pubsub.publish(
HathorEvents.NETWORK_PEER_CONNECTED,
protocol=protocol,
Expand Down Expand Up @@ -597,47 +583,6 @@ def reconnect_to_all(self) -> None:
for peer in list(self.verified_peer_storage.values()):
self.connect_to_peer(peer, int(now))

def update_whitelist(self) -> Deferred[None]:
from twisted.web.client import readBody
from twisted.web.http_headers import Headers
assert self._settings.WHITELIST_URL is not None
self.log.info('update whitelist')
d = self._http_agent.request(
b'GET',
self._settings.WHITELIST_URL.encode(),
Headers({'User-Agent': ['hathor-core']}),
None)
d.addCallback(readBody)
d.addTimeout(WHITELIST_REQUEST_TIMEOUT, self.reactor)
d.addCallback(self._update_whitelist_cb)
d.addErrback(self._update_whitelist_err)

return d

def _update_whitelist_err(self, *args: Any, **kwargs: Any) -> None:
self.log.error('update whitelist failed', args=args, kwargs=kwargs)

def _update_whitelist_cb(self, body: bytes) -> None:
assert self.manager is not None
self.log.info('update whitelist got response')
try:
text = body.decode()
new_whitelist = parse_whitelist(text)
except Exception:
self.log.exception('failed to parse whitelist')
return
current_whitelist = set(self.manager.peers_whitelist)
peers_to_add = new_whitelist - current_whitelist
if peers_to_add:
self.log.info('add new peers to whitelist', peers=peers_to_add)
peers_to_remove = current_whitelist - new_whitelist
if peers_to_remove:
self.log.info('remove peers peers from whitelist', peers=peers_to_remove)
for peer_id in peers_to_add:
self.manager.add_peer_to_whitelist(peer_id)
for peer_id in peers_to_remove:
self.manager.remove_peer_from_whitelist_and_disconnect(peer_id)

def connect_to_peer(self, peer: UnverifiedPeer | PublicPeer, now: int) -> None:
""" Attempts to connect if it is not connected to the peer.
"""
Expand Down Expand Up @@ -935,3 +880,48 @@ def reload_entrypoints_and_connections(self) -> None:
self.log.warn('Killing all connections and resetting entrypoints...')
self.disconnect_all_peers(force=True)
self.my_peer.reload_entrypoints_from_source_file()

def whitelist_swap(self, wl_object: PeersWhitelist) -> None:
"""
Altering whitelist (URL/PATH) during full-node runtime.
"""

if not wl_object:
return

if self.peers_whitelist:
self.peers_whitelist.stop()

# Sysctl may only update to another URL or Path, not None.
self.log.info("Swapping whitelists...")
self.peers_whitelist = wl_object

self.whitelist_toggle(wl_object.is_enabled())

def whitelist_toggle(self, wl_toggle: bool) -> None:
"""
Called if whitelist is turned "on" or "off".
Called via sysctl methods.
"""

if not self.peers_whitelist:
return

# Pass toggle option to whitelist
self.peers_whitelist.enable(wl_toggle)

if wl_toggle:
self.log.info('Whitelist ON: Node starts following it...')
# All connections not in the whitelist are severed.
# Use a snapshot to avoid modifying the set while iterating.
connections_snapshot = list(self.connections)
for conn in connections_snapshot:
peer_id = conn.get_peer_id()
# Skip connections that don't have a peer_id yet (still in handshake state)
if peer_id is None:
continue
if not self.peers_whitelist.is_peer_whitelisted(peer_id):
conn.disconnect(reason='Whitelist turned on', force=True)
return

self.log.info("Whitelist OFF - Node starts ignoring it...")
Loading
Loading