diff --git a/src/environment_provider/environment_provider.py b/src/environment_provider/environment_provider.py index 05539a1..dd0ef4f 100644 --- a/src/environment_provider/environment_provider.py +++ b/src/environment_provider/environment_provider.py @@ -32,7 +32,11 @@ from etos_lib.opentelemetry.semconv import Attributes as SemConvAttributes from etos_lib.kubernetes import Kubernetes, Environment, Provider from etos_lib.kubernetes.schemas.common import OwnerReference -from etos_lib.kubernetes.schemas import Environment as EnvironmentSchema, EnvironmentSpec, Metadata +from etos_lib.kubernetes.schemas import ( + Environment as EnvironmentSchema, + EnvironmentSpec, + Metadata, +) from etos_lib.kubernetes.schemas import Test from etos_lib.kubernetes.schemas import Provider as ProviderSchema from etos_lib.kubernetes.schemas import EnvironmentRequest as EnvironmentRequestSchema @@ -45,6 +49,7 @@ from log_area_provider.log_area import LogArea from .lib.config import Config +from .lib.otel_tracing import get_current_context from .lib.encrypt import Encrypt from .lib.graphql import request_main_suite from .lib.join import Join @@ -86,7 +91,9 @@ def __init__(self, suite_runner_ids: Optional[list[str]] = None) -> None: :param suite_runner_ids: IDs from the suite runner to correlate sub suites. """ self.etos = ETOS( - "ETOS Environment Provider", os.getenv("HOSTNAME", "Unknown"), "Environment Provider" + "ETOS Environment Provider", + os.getenv("HOSTNAME", "Unknown"), + "Environment Provider", ) self.kubernetes = Kubernetes() self.environment_provider_config = Config(self.etos, self.kubernetes, suite_runner_ids) @@ -376,7 +383,11 @@ def checkout(self, request: EnvironmentRequestSchema) -> None: for test in request.spec.splitter.tests: test_runners.setdefault( test.execution.testRunner, - {"docker": test.execution.testRunner, "priority": 1, "unsplit_recipes": []}, + { + "docker": test.execution.testRunner, + "priority": 1, + "unsplit_recipes": [], + }, ) test_runners[test.execution.testRunner]["unsplit_recipes"].append(test) @@ -467,7 +478,11 @@ def checkout(self, request: EnvironmentRequestSchema) -> None: # Add sub suites to test suite structure and send environment events to the ESR. for iut, suite in test_runners[test_runner].get("iuts", {}).items(): sub_suite = test_suite.add( - request, test_runner, iut, suite, test_runners[test_runner]["priority"] + request, + test_runner, + iut, + suite, + test_runners[test_runner]["priority"], ) if self.environment_provider_config.etos_controller: self.send_environment_events( @@ -561,6 +576,13 @@ def _run(self, request: EnvironmentRequestSchema) -> None: if triggered is not None: self.etos.events.send_activity_finished(triggered, outcome) + def _run_with_span(self, request: EnvironmentRequestSchema) -> None: + """Run the environment provider task with an attached span.""" + with self.tracer.start_as_current_span( + "request_environment", context=get_current_context() + ): + self._run(request) + def _configure_provider(self, provider_db: ETCDPath, provider_spec: dict, name: str): """Configure a single provider for a testrun.""" self.logger.info("Saving provider with name %r in %r", name, provider_db) @@ -616,14 +638,19 @@ def get_environment(self) -> dict: for request in self.environment_provider_config.requests: if self.environment_provider_config.etos_controller: self.configure_environment_provider(request) - self.configure(request) - self._run(request) + self.configure(request) + self._run_with_span(request) + else: + self.configure(request) + self._run(request) return {"error": None} except Exception as exception: # pylint:disable=broad-except self.cleanup() traceback.print_exc() self.logger.error( - "Failed creating environment for test. %r", exception, extra={"user_log": True} + "Failed creating environment for test. %r", + exception, + extra={"user_log": True}, ) raise finally: @@ -651,7 +678,10 @@ def get_environment(): """Entrypoint for getting an environment.""" logformat = "[%(asctime)s] %(levelname)s:%(message)s" logging.basicConfig( - level=logging.INFO, stream=sys.stdout, format=logformat, datefmt="%Y-%m-%d %H:%M:%S" + level=logging.INFO, + stream=sys.stdout, + format=logformat, + datefmt="%Y-%m-%d %H:%M:%S", ) logging.getLogger("gql").setLevel(logging.WARNING) try: diff --git a/src/environment_provider/lib/otel_tracing.py b/src/environment_provider/lib/otel_tracing.py new file mode 100644 index 0000000..ab550ce --- /dev/null +++ b/src/environment_provider/lib/otel_tracing.py @@ -0,0 +1,54 @@ +# Copyright Axis Communications AB. +# +# For a full list of individual contributors, please see the commit history. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""OpenTelemetry-related code.""" +import logging +import os + +import opentelemetry +from opentelemetry.propagators.textmap import Getter +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator + + +LOGGER = logging.getLogger(__name__) + + +class EnvVarContextGetter(Getter): + """OpenTelemetry context getter class for environment variables.""" + + def get(self, carrier, key): + """Get value using the given carrier variable and key.""" + value = os.getenv(carrier) + if value is not None and value != "": + pairs = value.split(",") + for pair in pairs: + k, v = pair.split("=", 1) + if k == key: + return [v] + return [] + + def keys(self, carrier): + """Get keys of the given carrier variable.""" + value = os.getenv(carrier) + if value is not None: + return [pair.split("=")[0] for pair in value.split(",") if "=" in pair] + return [] + + +def get_current_context() -> opentelemetry.context.context.Context: + """Get current context propagated via environment variable OTEL_CONTEXT.""" + propagator = TraceContextTextMapPropagator() + ctx = propagator.extract(carrier="OTEL_CONTEXT", getter=EnvVarContextGetter()) + return ctx diff --git a/src/environment_provider/lib/releaser.py b/src/environment_provider/lib/releaser.py index a2ea10f..f9c1282 100644 --- a/src/environment_provider/lib/releaser.py +++ b/src/environment_provider/lib/releaser.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """Releaser of environments.""" +import os import logging from jsontas.jsontas import JsonTas from opentelemetry import trace @@ -31,6 +32,7 @@ from log_area_provider import LogAreaProvider from log_area_provider.exceptions import LogAreaCheckinFailed from log_area_provider.log_area import LogArea as LogAreaSpec +from .otel_tracing import get_current_context TRACER = trace.get_tracer(__name__) @@ -233,13 +235,17 @@ class EnvironmentReleaser: logger = logging.getLogger(__name__) + def __init__(self): + """Initialize the opentelemetry tracer.""" + self.tracer = trace.get_tracer(__name__) + def environment(self, environment_id: str) -> EnvironmentSchema: """Environment gets an environment from kubernetes with environment_id as name.""" client = Environment(Kubernetes()) environment = client.get(environment_id).to_dict() # type: ignore return EnvironmentSchema.model_validate(environment) - def run(self, environment_id: str): + def _run(self, environment_id: str): """Run the releaser. It will check which type of environment and release it.""" self.logger.info("Running the environment releaser") etos = ETOS("", "", "") @@ -261,7 +267,11 @@ def run(self, environment_id: str): ) return etos.config.set("SUITE_ID", environment.spec.suite_id) - tasks = [Iut(etos, environment), LogArea(etos, environment), Executor(etos, environment)] + tasks = [ + Iut(etos, environment), + LogArea(etos, environment), + Executor(etos, environment), + ] exceptions = [] for task in tasks: @@ -273,3 +283,18 @@ def run(self, environment_id: str): exceptions.append(exception) if exceptions: raise ReleaseError("Some or all release tasks failed") + + def _run_with_span(self, environment_id: str): + """Run the release with an attached span.""" + with trace.get_tracer(__name__).start_as_current_span( + "release_environment", context=get_current_context() + ): + self._run(environment_id) + + def run(self, environment_id: str): + """Run the releaser. It will check which type of environment and release it.""" + # The REQUEST environment variable is set by the environment_controller. + if os.getenv("REQUEST") is not None: + self._run_with_span(environment_id) + else: + self._run(environment_id)