diff --git a/dkg/modules/asset/asset.py b/dkg/modules/asset/asset.py index 90d5ccd..ec5c892 100644 --- a/dkg/modules/asset/asset.py +++ b/dkg/modules/asset/asset.py @@ -206,6 +206,7 @@ def create( minimum_number_of_node_replications = arguments.get( "minimum_number_of_node_replications" ) + local_store = arguments.get("local_store") blockchain_id = self.manager.blockchain_provider.blockchain_id dataset = {} @@ -448,6 +449,15 @@ def create( frequency, ) + local_store_result = None + if local_store: + retry = 0 + while (not (local_store_result or {}).get("status")) and retry < 6: + local_store_result = self.node_service.local_store( + dataset_root, dataset, blockchain_id, ual + ) + retry += 1 + return json.loads( Web3.to_json( { @@ -471,6 +481,7 @@ def create( }, "numberOfConfirmations": finality_status_result, "requiredConfirmations": minimum_number_of_finalization_confirmations, + **({"localStore": local_store_result} if local_store else {}), }, } ) diff --git a/dkg/modules/asset/async_asset.py b/dkg/modules/asset/async_asset.py index 939b539..d91c803 100644 --- a/dkg/modules/asset/async_asset.py +++ b/dkg/modules/asset/async_asset.py @@ -237,6 +237,7 @@ async def create( minimum_number_of_node_replications = arguments.get( "minimum_number_of_node_replications" ) + local_store = arguments.get("local_store") blockchain_id = self.manager.blockchain_provider.blockchain_id dataset = {} @@ -462,6 +463,15 @@ async def create( frequency, ) + local_store_result = None + if local_store: + retry = 0 + while (not (local_store_result or {}).get("status")) and retry < 6: + local_store_result = await self.node_service.local_store( + dataset_root, dataset, blockchain_id, ual + ) + retry += 1 + return json.loads( Web3.to_json( { @@ -483,6 +493,7 @@ async def create( }, "numberOfConfirmations": finality_status_result, "requiredConfirmations": minimum_number_of_finalization_confirmations, + **({"localStore": local_store_result} if local_store else {}), }, } ) diff --git a/dkg/services/input_service.py b/dkg/services/input_service.py index e8147ed..433d3a1 100644 --- a/dkg/services/input_service.py +++ b/dkg/services/input_service.py @@ -40,6 +40,7 @@ def get_asset_create_arguments(self, options): "minimum_number_of_node_replications": self.get_minimum_number_of_node_replications( options ), + "local_store": self.get_local_store(options), } def get_query_arguments(self, options): @@ -181,3 +182,6 @@ def get_repository(self, options): or self.config.get("repository") or DefaultParameters.REPOSITORY.value ) + + def get_local_store(self, options): + return options.get("local_store") or False diff --git a/dkg/services/node_services/async_node_service.py b/dkg/services/node_services/async_node_service.py index 1641988..71004db 100644 --- a/dkg/services/node_services/async_node_service.py +++ b/dkg/services/node_services/async_node_service.py @@ -21,6 +21,7 @@ def __init__(self, manager: AsyncRequestManager): _publish = Method(NodeRequest.publish) _get = Method(NodeRequest.get) _query = Method(NodeRequest.query) + _local_store = Method(NodeRequest.local_store) async def info(self) -> NodeResponseDict: return await self._info() @@ -182,3 +183,10 @@ async def query( paranet_ual, ): return await self._query(query, query_type, repository, paranet_ual) + + async def local_store(self, dataset_root, dataset, blockchain_id, ual): + try: + result = await self._local_store(dataset_root, dataset, blockchain_id, ual) + return result.get("data") + except Exception as e: + raise Exception(f"Unable to local store: {e}") diff --git a/dkg/services/node_services/node_service.py b/dkg/services/node_services/node_service.py index e447055..b99771f 100644 --- a/dkg/services/node_services/node_service.py +++ b/dkg/services/node_services/node_service.py @@ -22,6 +22,7 @@ def __init__(self, manager: DefaultRequestManager): _publish = Method(NodeRequest.publish) _get = Method(NodeRequest.get) _query = Method(NodeRequest.query) + _local_store = Method(NodeRequest.local_store) def get_operation_result( self, operation_id: str, operation: str, max_retries: int, frequency: int @@ -165,3 +166,10 @@ def query( paranet_ual, ): return self._query(query, query_type, repository, paranet_ual) + + def local_store(self, dataset_root, dataset, blockchain_id, ual): + try: + result = self._local_store(dataset_root, dataset, blockchain_id, ual) + return result.get("data") + except Exception as e: + raise Exception(f"Unable to local store: {e}") diff --git a/dkg/utils/node_request.py b/dkg/utils/node_request.py index 28f7972..ea67311 100644 --- a/dkg/utils/node_request.py +++ b/dkg/utils/node_request.py @@ -103,6 +103,17 @@ class NodeRequest: }, ) + local_store = NodeCall( + method=HTTPRequestMethod.POST, + path="local-store", + data={ + "datasetRoot": str, + "dataset": dict[str, list[str]], + "blockchain": str, + "UAL": UAL, + }, + ) + class LocalStoreOperationStatus(AutoStrEnumUpperCase): LOCAL_STORE_INIT_START = auto() diff --git a/examples/async_demo.py b/examples/async_demo.py index dffa96f..5fc5d7b 100644 --- a/examples/async_demo.py +++ b/examples/async_demo.py @@ -72,7 +72,7 @@ def print_json(json_dict: dict): "epochs_num": 2, "minimum_number_of_finalization_confirmations": 3, "minimum_number_of_node_replications": 1, - "token_amount": 100, + "local_store": True, }, ) print( diff --git a/examples/demo.py b/examples/demo.py index 00060d8..9f716c7 100644 --- a/examples/demo.py +++ b/examples/demo.py @@ -90,7 +90,7 @@ def print_json(json_dict: dict): "epochs_num": 2, "minimum_number_of_finalization_confirmations": 3, "minimum_number_of_node_replications": 1, - "token_amount": 100, + "local_store": True, }, ) print(