Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ See [Helm Chart README](helm-chart/eoapi-notifier/README.md) for configuration o

#### Outputs
- `mqtt`: Publish events to MQTT broker
- `cloudevents`: Send events as CloudEvents

## Development

Expand Down
110 changes: 89 additions & 21 deletions eoapi_notifier/outputs/cloudevents.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,36 +13,66 @@
import httpx
from cloudevents.conversion import to_binary
from cloudevents.http import CloudEvent
from pydantic import field_validator, model_validator
from pydantic import BaseModel, field_validator, model_validator

from ..core.event import NotificationEvent
from ..core.plugin import BaseOutput, BasePluginConfig, PluginMetadata


class RefConfig(BaseModel):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I'm expecting:

  • The code in eoAPI only expects to have K_SINK defined in an environment variable
  • The ref vs url logic should be outside the code, and should only reside in the helm chart
  • The Helm chart, based on the configuration, should set K_SINK only is url is set, otherwise it should create a SinkBinding which should match the deployment/stefullset and inject the environment variable. This last part is the responsibility of KNative and no logic, besides the expectation on the presence ok K_SINK, should reside in the application code.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Python side of code should only care of K_SINK. If it's set then it has to POST to that endpoint.
A special attention should be given to the env K_CE_OVERRIDES. This is a JSON object that specifies overrides to the outbound event.

See this:

"""Kubernetes resource reference configuration."""

apiVersion: str
kind: str
name: str
namespace: str | None = None


class DestinationConfig(BaseModel):
"""Destination configuration - either ref or url."""

ref: RefConfig | None = None
url: str | None = None

@model_validator(mode="after")
def validate_mutually_exclusive(self) -> "DestinationConfig":
if self.ref and self.url:
raise ValueError(
"destination.ref and destination.url are mutually exclusive"
)
if not self.ref and not self.url:
raise ValueError(
"Either destination.ref or destination.url must be specified"
)
return self

@field_validator("url")
@classmethod
def validate_url(cls, v: str | None) -> str | None:
if v and not v.startswith(("http://", "https://")):
raise ValueError("destination.url must start with http:// or https://")
return v


class CloudEventsConfig(BasePluginConfig):
"""Configuration for CloudEvents output adapter with environment variable
support."""

endpoint: str | None = None
destination: DestinationConfig
source: str = "/eoapi/stac"
event_type: str = "org.eoapi.stac"
timeout: float = 30.0
max_retries: int = 3
retry_backoff: float = 1.0

@field_validator("endpoint")
@classmethod
def validate_endpoint(cls, v: str | None) -> str | None:
if v and not v.startswith(("http://", "https://")):
raise ValueError("endpoint must start with http:// or https://")
return v

@model_validator(mode="after")
def apply_knative_overrides(self) -> "CloudEventsConfig":
"""Apply KNative SinkBinding environment variables as special case."""
# K_SINK overrides endpoint (KNative SinkBinding)
# K_SINK overrides destination for ref-based configs (KNative SinkBinding)
if k_sink := os.getenv("K_SINK"):
self.endpoint = k_sink
if self.destination.ref:
# For ref-based destinations, K_SINK provides the resolved URL
self.destination = DestinationConfig(url=k_sink)

# K_SOURCE overrides source
if k_source := os.getenv("K_SOURCE"):
Expand All @@ -57,7 +87,15 @@ def apply_knative_overrides(self) -> "CloudEventsConfig":
@classmethod
def get_sample_config(cls) -> dict[str, Any]:
return {
"endpoint": None, # Uses K_SINK env var if not set
"destination": {
"ref": {
"apiVersion": "messaging.knative.dev/v1",
"kind": "Broker",
"name": "eoapi-broker",
"namespace": "serverless",
}
# "url": "https://example.com/webhook" # mutually exclusive with ref
},
"source": "/eoapi/stac",
"event_type": "org.eoapi.stac",
"timeout": 30.0,
Expand All @@ -76,12 +114,27 @@ def get_metadata(cls) -> PluginMetadata:
)

def get_connection_info(self) -> str:
url = self.endpoint or os.getenv("K_SINK", "K_SINK env var")
if self.destination.url:
url = self.destination.url
elif self.destination.ref:
ref_name = f"{self.destination.ref.kind}/{self.destination.ref.name}"
url = os.getenv("K_SINK", f"K_SINK env var -> {ref_name}")
else:
url = "unresolved"
return f"POST {url}"

def get_status_info(self) -> dict[str, Any]:
if self.destination.url:
endpoint_info = self.destination.url
elif self.destination.ref:
endpoint_info = (
f"{self.destination.ref.kind}/{self.destination.ref.name} (via K_SINK)"
)
else:
endpoint_info = "unresolved"

return {
"Endpoint": self.endpoint or "K_SINK env var",
"Destination": endpoint_info,
"Source": self.source,
"Event Type": self.event_type,
"Timeout": f"{self.timeout}s",
Expand All @@ -107,12 +160,19 @@ async def start(self) -> None:
f"max_retries={self.config.max_retries}"
)

endpoint = self.config.endpoint
if not endpoint:
raise ValueError(
"endpoint configuration required (can be set via config, K_SINK, "
"or CLOUDEVENTS_ENDPOINT env vars)"
)
# Get endpoint URL
if self.config.destination.url:
endpoint = self.config.destination.url
elif self.config.destination.ref:
k_sink = os.getenv("K_SINK")
if not k_sink:
raise ValueError(
f"K_SINK environment variable required for ref destination "
f"{self.config.destination.ref.kind}/{self.config.destination.ref.name}"
)
endpoint = k_sink
else:
raise ValueError("destination.ref or destination.url must be configured")

self.logger.debug(f"Step 1: Resolved endpoint: {endpoint}")

Expand Down Expand Up @@ -151,7 +211,15 @@ async def send_event(self, event: NotificationEvent) -> bool:
return False

try:
endpoint = self.config.endpoint
# Get endpoint URL
if self.config.destination.url:
endpoint = self.config.destination.url
else:
k_sink = os.getenv("K_SINK")
if not k_sink:
self.logger.error("K_SINK not available for ref destination")
return False
endpoint = k_sink

# Convert to CloudEvent
self.logger.debug(f"Converting event {event.id} to CloudEvent format...")
Expand Down
21 changes: 13 additions & 8 deletions examples/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,22 @@ outputs:
# topic: "eoapi/" # MQTT_TOPIC
# qos: 1 # MQTT_QOS

# CloudEvents HTTP output for sending events as CloudEvents
#
# Besides the regular overwrite, this plugin also supports K_SINK
# https://knative.dev/docs/eventing/custom-event-source/sinkbinding/
# CloudEvents output for sending events as CloudEvents
- type: cloudevents
config:
endpoint: https://example.com/webhook # CLOUDEVENTS_ENDPOINT or K_SINK
source: "/eoapi/pgstac" # CLOUDEVENTS_SOURCE or K_SOURCE
event_type: "org.eoapi.stac.item" # CLOUDEVENTS_EVENT_TYPE or K_TYPE

# Optional: CloudEventattributes
# source: "/eoapi/stac" # CLOUDEVENTS_SOURCE or K_SOURCE
# event_type: "org.eoapi.stac" # CLOUDEVENTS_EVENT_TYPE or K_TYPE
destination:
# Option 1: Kubernetes resource reference (uses SinkBinding)
ref:
apiVersion: messaging.knative.dev/v1
kind: Broker
name: eoapi-broker
namespace: serverless

# Option 2: Direct HTTP endpoint (alternative to ref)
# url: https://example.com/webhook

# Optional: HTTP settings
# timeout: 30.0 # CLOUDEVENTS_TIMEOUT
Expand Down
24 changes: 24 additions & 0 deletions helm-chart/eoapi-notifier/templates/sinkbinding.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{{- range .Values.config.outputs }}
{{- if and (eq .type "cloudevents") .config.destination.ref }}
apiVersion: sources.knative.dev/v1beta1
kind: SinkBinding
metadata:
name: {{ include "eoapi-notifier.fullname" $ }}-binding
labels:
{{- include "eoapi-notifier.labels" $ | nindent 4 }}
spec:
subject:
apiVersion: apps/v1
kind: Deployment
name: {{ include "eoapi-notifier.fullname" $ }}
sink:
ref:
apiVersion: {{ .config.destination.ref.apiVersion }}
kind: {{ .config.destination.ref.kind }}
name: {{ .config.destination.ref.name }}
{{- if .config.destination.ref.namespace }}
namespace: {{ .config.destination.ref.namespace }}
{{- end }}
{{- break }}
{{- end }}
{{- end }}
46 changes: 15 additions & 31 deletions helm-chart/eoapi-notifier/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,17 @@ config:

- type: cloudevents
config:
# Will be overridden by K_SINK from KNative SinkBinding
endpoint: https://example.com/webhook
source: /eoapi/stac
event_type: org.eoapi.stac
source: /eoapi/pgstac
event_type: org.eoapi.stac.item
destination:
# Use ref for Knative resources
ref:
apiVersion: messaging.knative.dev/v1
kind: Broker
name: eoapi-broker
namespace: serverless
# Alternatively, use url for direct HTTP endpoints
# url: https://example.com/webhook

# Secrets
secrets:
Expand All @@ -75,27 +82,9 @@ secrets:
# These will be injected as environment variables and automatically override config values
# Use plugin-prefixed variables: PGSTAC_PASSWORD, MQTT_USERNAME, CLOUDEVENTS_ENDPOINT, etc
#
# KNative Support:
# The cloudevents plugin supports K_SINK variables for KNative SinkBinding:
# - K_SINK: Overrides CLOUDEVENTS_ENDPOINT (automatically set by SinkBinding)
# - K_SOURCE: Overrides CLOUDEVENTS_SOURCE
# - K_TYPE: Overrides CLOUDEVENTS_EVENT_TYPE
#
# For KNative integration, use SinkBinding to automatically inject K_SINK:
# apiVersion: sources.knative.dev/v1beta1
# kind: SinkBinding
# metadata:
# name: eoapi-notifier-binding
# spec:
# subject:
# apiVersion: apps/v1
# kind: Deployment
# name: eoapi-notifier
# sink:
# ref:
# apiVersion: serving.knative.dev/v1
# kind: Service
# name: my-knative-service
# CloudEvents destination options:
# - destination.ref: Kubernetes resource reference (uses SinkBinding)
# - destination.url: Direct HTTP endpoint
env: {}
# Examples - Standard environment variables:
# PGSTAC_HOST: postgresql-service
Expand All @@ -105,11 +94,6 @@ env: {}
# MQTT_USE_TLS: "true"
#
# CloudEvents examples:
# CLOUDEVENTS_ENDPOINT: https://my-webhook-url
# CLOUDEVENTS_SOURCE: /eoapi/stac/production
# CLOUDEVENTS_EVENT_TYPE: org.eoapi.stac.item
#
# KNative examples (typically set by SinkBinding):
# K_SINK: https://my-knative-service.default.svc.cluster.local
# K_SOURCE: /eoapi/stac/pgstac
# K_TYPE: org.eoapi.stac
# K_SINK: https://my-service.default.svc.cluster.local (set by SinkBinding)
Loading