Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -1 +1 @@
* @leoparente @ltucker @mfiedorowicz
* @jajeffries @leoparente @ltucker @mfiedorowicz
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pip install netboxlabs-diode-sdk
* `DIODE_SENTRY_DSN` - Optional Sentry DSN for error reporting
* `DIODE_CLIENT_ID` - Client ID for OAuth2 authentication
* `DIODE_CLIENT_SECRET` - Client Secret for OAuth2 authentication
* `DIODE_DRY_RUN_OUTPUT_DIR` - Directory where `DiodeDryRunClient` will write JSON files

### Example

Expand Down Expand Up @@ -75,6 +76,34 @@ if __name__ == "__main__":

```

### Dry run mode

`DiodeDryRunClient` generates ingestion requests without contacting a Diode server. Requests are printed to stdout by default, or written to JSON files when `output_dir` (or the `DIODE_DRY_RUN_OUTPUT_DIR` environment variable) is specified. The `app_name` parameter serves as the filename prefix; if not provided, `dryrun` is used as the default prefix. The file name is suffixed with a nanosecond-precision timestamp, resulting in the format `<app_name>_<timestamp_ns>.json`.

```python
from netboxlabs.diode.sdk import DiodeDryRunClient

with DiodeDryRunClient(app_name="my_app", output_dir="/tmp") as client:
client.ingest([
Entity(device="Device A"),
])
```

The produced file can later be ingested by a real Diode instance using
`load_dryrun_entities` with a standard `DiodeClient`:

```python
from netboxlabs.diode.sdk import DiodeClient, load_dryrun_entities

with DiodeClient(
target="grpc://localhost:8080/diode",
app_name="my-test-app",
app_version="0.0.1",
) as client:
entities = list(load_dryrun_entities("my_app_92722156890707.json"))
client.ingest(entities=entities)
```

## Supported entities (object types)

* ASN
Expand Down
8 changes: 7 additions & 1 deletion netboxlabs/diode/sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
# Copyright 2024 NetBox Labs Inc
"""NetBox Labs, Diode - SDK."""

from netboxlabs.diode.sdk.client import DiodeClient
from netboxlabs.diode.sdk.client import (
DiodeClient,
DiodeDryRunClient,
load_dryrun_entities,
)

assert DiodeClient
assert DiodeDryRunClient
assert load_dryrun_entities
94 changes: 93 additions & 1 deletion netboxlabs/diode/sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@
import os
import platform
import ssl
import sys
import time
import uuid
from collections.abc import Iterable
from pathlib import Path
from urllib.parse import urlencode, urlparse

import certifi
import grpc
import sentry_sdk
from google.protobuf.json_format import MessageToJson, ParseDict

from netboxlabs.diode.sdk.diode.v1 import ingester_pb2, ingester_pb2_grpc
from netboxlabs.diode.sdk.exceptions import DiodeClientError, DiodeConfigError
Expand All @@ -27,11 +31,28 @@
_DIODE_SENTRY_DSN_ENVVAR_NAME = "DIODE_SENTRY_DSN"
_CLIENT_ID_ENVVAR_NAME = "DIODE_CLIENT_ID"
_CLIENT_SECRET_ENVVAR_NAME = "DIODE_CLIENT_SECRET"
_DRY_RUN_OUTPUT_DIR_ENVVAR_NAME = "DIODE_DRY_RUN_OUTPUT_DIR"
_INGEST_SCOPE = "diode:ingest"
_DEFAULT_STREAM = "latest"
_LOGGER = logging.getLogger(__name__)


def load_dryrun_entities(file_path: str | Path) -> Iterable[Entity]:
"""Yield entities from a file with concatenated JSON messages."""
path = Path(file_path)
with path.open("r") as fh:
request = json.load(fh)
req_pb = ingester_pb2.IngestRequest()
ParseDict(request, req_pb)
yield from req_pb.entities


class DiodeClientInterface:
"""Runtime placeholder for the Diode client interface."""

pass


def _load_certs() -> bytes:
"""Loads cacert.pem."""
with open(certifi.where(), "rb") as f:
Expand Down Expand Up @@ -82,7 +103,7 @@ def _get_optional_config_value(
return value


class DiodeClient:
class DiodeClient(DiodeClientInterface):
"""Diode Client."""

_name = "diode-sdk-python"
Expand Down Expand Up @@ -287,6 +308,77 @@ def _authenticate(self, scope: str):
) + [("authorization", f"Bearer {access_token}")]


class DiodeDryRunClient(DiodeClientInterface):
"""Client that outputs ingestion requests instead of sending them."""

_name = "diode-sdk-python-dry-run"
_version = version_semver()
_app_name = None
_app_version = None

def __init__(self, app_name: str = "dryrun", output_dir: str | None = None):
"""Initiate a new dry run client."""
self._output_dir = os.getenv(_DRY_RUN_OUTPUT_DIR_ENVVAR_NAME, output_dir)
self._app_name = app_name

@property
def name(self) -> str:
"""Retrieve the name."""
return self._name

@property
def version(self) -> str:
"""Retrieve the version."""
return self._version

@property
def app_name(self) -> str:
"""Retrieve the app name."""
return self._app_name

@property
def output_dir(self) -> str | None:
"""Retrieve the dry run output dir."""
return self._output_dir

def __enter__(self):
"""Enters the runtime context related to the channel object."""
return self

def __exit__(self, exc_type, exc_value, exc_traceback):
"""Exits the runtime context related to the channel object."""

def ingest(
self,
entities: Iterable[Entity | ingester_pb2.Entity | None],
stream: str | None = _DEFAULT_STREAM,
) -> ingester_pb2.IngestResponse:
"""Ingest entities in dry run mode."""
request = ingester_pb2.IngestRequest(
stream=stream,
id=str(uuid.uuid4()),
producer_app_name=self._app_name,
entities=entities,
sdk_name=self.name,
sdk_version=self.version,
)

output = MessageToJson(request, preserving_proto_field_name=True)
if self._output_dir:
timestamp = time.perf_counter_ns()
path = Path(self._output_dir)
path.mkdir(parents=True, exist_ok=True)
filename = "".join(
c if c.isalnum() or c in ("_", "-") else "_" for c in self._app_name
)
file_path = path / f"{filename}_{timestamp}.json"
with file_path.open("w") as fh:
fh.write(output)
else:
print(output, file=sys.stdout)
return ingester_pb2.IngestResponse()


class _DiodeAuthentication:
def __init__(
self,
Expand Down
25 changes: 25 additions & 0 deletions netboxlabs/diode/sdk/client.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from __future__ import annotations

from collections.abc import Iterable
from typing import Protocol, runtime_checkable

from netboxlabs.diode.sdk.diode.v1 import ingester_pb2
from netboxlabs.diode.sdk.ingester import Entity

_DEFAULT_STREAM: str

@runtime_checkable
class DiodeClientInterface(Protocol):
"""Interface implemented by diode clients."""

@property
def name(self) -> str: ...
@property
def version(self) -> str: ...
def ingest(
self,
entities: Iterable[Entity | ingester_pb2.Entity | None],
stream: str | None = _DEFAULT_STREAM,
) -> ingester_pb2.IngestResponse: ...
def __enter__(self) -> DiodeClientInterface: ...
def __exit__(self, exc_type, exc_value, exc_traceback) -> None: ...
Loading