diff --git a/.github/workflows/build-publish.yml b/.github/workflows/build-publish.yml index 925cd70..9c00493 100644 --- a/.github/workflows/build-publish.yml +++ b/.github/workflows/build-publish.yml @@ -16,14 +16,8 @@ jobs: uses: actions/setup-python@v4 with: python-version: "3.9" - - name: Install pypa/build - run: >- - python3 -m - pip install - build - --user - name: Build a binary wheel and a source tarball - run: python3 -m build + run: make build - name: Store the distribution packages uses: actions/upload-artifact@v4 with: diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index a9db199..a02d6aa 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -27,3 +27,19 @@ jobs: run: pip install -U setuptools - name: Run Tox run: tox + check-golang: + # The type of runner that the job will run on + runs-on: ubuntu-latest + # Steps represent a sequence of tasks that will be executed as part of the job + steps: + - uses: actions/checkout@v2 + - name: Lint Go code + run: make check + run-build: + # The type of runner that the job will run on + runs-on: ubuntu-latest + # Steps represent a sequence of tasks that will be executed as part of the job + steps: + - uses: actions/checkout@v2 + - name: Build python module with Go code + run: make build diff --git a/.gitignore b/.gitignore index 279c72e..573a579 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,7 @@ __pycache__/* */.ipynb_checkpoints/* .DS_Store .tags +bin/ # Project files .ropeproject diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..7597310 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,7 @@ +prune build +prune docs/_build +prune docs/api +recursive-include *.go +include go.mod go.sum +graft src/etos_lib/bindings/ +global-exclude *.pyc *.o diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..335be6e --- /dev/null +++ b/Makefile @@ -0,0 +1,65 @@ +export GOBIN := $(CURDIR)/bin +export PYBUILD := $(CURDIR)/bin +export VENVDIR := $(CURDIR)/.buildenv +export BINDINGS := $(CURDIR)/src/etos_lib/bindings +export BUILD_MESSAGING := go build --buildmode=c-shared -o $(BINDINGS)/messaging/client.so ./cmd/messaging/main.go + +BUILD_PYTHON = -m build +VIRTUALENV = $(VENVDIR)/bin/python +GOLANGCI_LINT = $(GOBIN)/golangci-lint +GOLANGCI_LINT_VERSION = v1.64.6 + +.PHONY: all +all: check build + +.PHONY: check +check: staticcheck test + +.PHONY: test +test: + go test -cover -timeout 30s -race $(shell go list ./... | grep -v "etos-library/test") + +.PHONY: tidy +tidy: + go mod tidy + +.PHONY: check-dirty +check-dirty: + $(GIT) diff --exit-code HEAD + +.PHONY: staticcheck +staticcheck: $(GOLANGCI_LINT) + $(GOLANGCI_LINT) run + +.PHONY: clean +clean: + $(RM) $(GOBIN)/* + $(RM) -r $(BINDINGS) + $(RM) -r $(VENVDIR) + $(RM) -r dist + +.PHONY: build +build: build-bindings build-python + +.PHONY: build-bindings +build-bindings: + $(BUILD_MESSAGING) + +.PHONY: build-python +build-python: $(BUILD_PYTHON) + $(VIRTUALENV) $(BUILD_PYTHON) + +$(GOLANGCI_LINT): + mkdir -p $(dir $@) + curl -sfL \ + https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh \ + | sh -s -- -b $(GOBIN) $(GOLANGCI_LINT_VERSION) + +$(BUILD_PYTHON): $(VIRTUALENV) + mkdir -p $(dir $@) + $(VIRTUALENV) -m pip install build + +$(VIRTUALENV): + mkdir -p $(dir $@) + pip install virtualenv + python -m virtualenv $(VENVDIR) diff --git a/cmd/messaging/main.go b/cmd/messaging/main.go new file mode 100644 index 0000000..787807d --- /dev/null +++ b/cmd/messaging/main.go @@ -0,0 +1,27 @@ +// Copyright Axis Communications AB. +// +// For a full list of individual contributors, please see the commit history. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package main + +import ( + "C" + + // This import will make sure that we build the bindings on make build. + _ "github.com/eiffel-community/etos-library/pkg/bindings" +) + +// main is required for cgo to work. +func main() { +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..5480181 --- /dev/null +++ b/go.mod @@ -0,0 +1,21 @@ +module github.com/eiffel-community/etos-library + +go 1.22.1 + +require ( + github.com/rabbitmq/rabbitmq-stream-go-client v1.5.0 + github.com/sirupsen/logrus v1.9.3 +) + +require ( + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/klauspost/compress v1.17.9 // indirect + github.com/kr/pretty v0.3.1 // indirect + github.com/pierrec/lz4 v2.6.1+incompatible // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/spaolacci/murmur3 v1.1.0 // indirect + github.com/stretchr/testify v1.9.0 // indirect + golang.org/x/sys v0.28.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..c1a1827 --- /dev/null +++ b/go.sum @@ -0,0 +1,63 @@ +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad h1:a6HEuzUHeKH6hwfN/ZoQgRgVIWFJljSWa/zetS2WTvg= +github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/onsi/ginkgo/v2 v2.22.2 h1:/3X8Panh8/WwhU/3Ssa6rCKqPLuAkVY2I0RoyDLySlU= +github.com/onsi/ginkgo/v2 v2.22.2/go.mod h1:oeMosUL+8LtarXBHu/c0bx2D/K9zyQ6uX3cTyztHwsk= +github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8= +github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY= +github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= +github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/rabbitmq-stream-go-client v1.5.0 h1:2UWryxhtQmYA3Bx2iajQCre3yQbARiSikpC/8iWbu3k= +github.com/rabbitmq/rabbitmq-stream-go-client v1.5.0/go.mod h1:KDXSNVSqj4QNg6TNMBnQQ/oWHaxLjUI1520j68SyEcY= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= +github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= +github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= +golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8= +golang.org/x/tools v0.28.0/go.mod h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/bindings/publisher.go b/pkg/bindings/publisher.go new file mode 100644 index 0000000..e908da1 --- /dev/null +++ b/pkg/bindings/publisher.go @@ -0,0 +1,135 @@ +// Copyright Axis Communications AB. +// +// For a full list of individual contributors, please see the commit history. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package bindings + +/* +#include +#include +*/ +import "C" + +import ( + "log/slog" + "os" + "runtime/cgo" + + "github.com/eiffel-community/etos-library/pkg/stream" +) + +// StreamHandler is a struct that holds the logger, connection string, stream name, streamer, +// publisher, and consumer. +// It is used to create a new publisher and publish messages to a stream from python. +// For using the streamers in Go code please refer to pkg/stream/stream.go. +// +// For usage examples view the python code in src/etos_lib/messaging/publisher.py. +type StreamHandler struct { + logger *slog.Logger + connectionString string + streamName string + streamer stream.Streamer + publisher stream.Publisher + consumer stream.Consumer +} + +// New creates a new publisher and returns a pointer to it. +// +//export New +func New(connectionString, streamName *C.char) (C.uintptr_t, bool) { + logger := newLogger() + streamer, err := stream.NewRabbitMQStreamer(C.GoString(streamName), C.GoString(connectionString), logger) + if err != nil { + return C.uintptr_t(0), false + } + return C.uintptr_t(cgo.NewHandle(&StreamHandler{ + logger: logger, + streamer: streamer, + })), true +} + +// Publisher creates a new publisher and starts it. +// +//export Publisher +func Publisher(p C.uintptr_t, name *C.char) bool { + h := cgo.Handle(p) + handler, ok := h.Value().(*StreamHandler) + if !ok { + // Creating a new logger here since if handler is nil, we don't have access to one. + newLogger().Error("Failed to get stream handler from memory") + return false + } + publisher, err := handler.streamer.Publisher(C.GoString(name)) + if err != nil { + handler.logger.Error("Failed to create publisher", slog.Any("Error", err)) + return false + } + handler.publisher = publisher + if err = publisher.Start(); err != nil { + handler.logger.Error("Failed to start publisher", slog.Any("Error", err)) + return false + } + return true +} + +// Publish a message to the stream. +// +//export Publish +func Publish(p C.uintptr_t, event, identifier, eventType, meta *C.char) bool { + h := cgo.Handle(p) + handler, ok := h.Value().(*StreamHandler) + if !ok { + // Creating a new logger here since if handler is nil, we don't have access to one. + newLogger().Error("Failed to get stream handler from memory") + return false + } + message := C.GoString(event) + filter := stream.Filter{ + Identifier: C.GoString(identifier), + Type: C.GoString(eventType), + Meta: C.GoString(meta), + } + handler.publisher.Publish([]byte(message), filter) + return true +} + +// Close closes the publisher and the streamer. +// +//export Close +func Close(p C.uintptr_t) bool { + h := cgo.Handle(p) + // Delete the handle here, since it shall not be reused after close. + defer h.Delete() + handler, ok := h.Value().(*StreamHandler) + if !ok { + // Creating a new logger here since if handler is nil, we don't have access to one. + newLogger().Error("Failed to get stream handler from memory") + return false + } + if handler.publisher != nil { + handler.publisher.Close() + } + if handler.consumer != nil { + handler.consumer.Close() + } + if handler.streamer != nil { + handler.streamer.Close() + } + return true +} + +// newLogger creates a new logger. +func newLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(os.Stdout, nil)) +} diff --git a/pkg/stream/rabbitmq.go b/pkg/stream/rabbitmq.go new file mode 100644 index 0000000..8ff26d9 --- /dev/null +++ b/pkg/stream/rabbitmq.go @@ -0,0 +1,329 @@ +// Copyright Axis Communications AB. +// +// For a full list of individual contributors, please see the commit history. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package stream + +import ( + "context" + "errors" + "fmt" + "log/slog" + "sync" + "time" + + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" +) + +const ( + IgnoreUnfiltered = false + MaxLengthBytes = "2gb" + MaxAge = 10 * time.Second + ConfirmationTimeout = 2 * time.Second +) + +// RabbitMQStreamer is a struct representing a single RabbitMQ connection. From a new connection new streams may be +// created. Normal case is to have a single connection with multiple streams. If multiple connections are needed +// then multiple instances of the program should be run. +type RabbitMQStreamer struct { + logger *slog.Logger + environment *stream.Environment + streamName string +} + +// NewRabbitMQStreamer creates a new RabbitMQ streamer. Only a single connection should be created. +func NewRabbitMQStreamer( + streamName string, + address string, + logger *slog.Logger, +) (Streamer, error) { + options := stream.NewEnvironmentOptions().SetMaxProducersPerClient(1).SetUri(address) + env, err := stream.NewEnvironment(options) + if err != nil { + return nil, err + } + return &RabbitMQStreamer{logger: logger, environment: env, streamName: streamName}, err +} + +// CreateStream creates a new RabbitMQ stream. +func (s *RabbitMQStreamer) CreateStream(name string) error { + // This will create the stream if not already created. + return s.environment.DeclareStream(name, + &stream.StreamOptions{ + MaxLengthBytes: stream.ByteCapacity{}.From(MaxLengthBytes), + MaxAge: MaxAge, + }, + ) +} + +// Consumer returns a new Consumer. +func (s *RabbitMQStreamer) Consumer(name string) (Consumer, error) { + exists, err := s.environment.StreamExists(s.streamName) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.New("no stream exists, cannot stream events") + } + options := stream.NewConsumerOptions(). + SetClientProvidedName(name). + SetConsumerName(name). + SetCRCCheck(false). + SetOffset(stream.OffsetSpecification{}.First()) + return &RabbitMQStreamConsumer{ + logger: s.logger, + streamName: s.streamName, + environment: s.environment, + options: options, + }, nil +} + +// Publisher returns a new Publisher. +func (s *RabbitMQStreamer) Publisher(name string) (Publisher, error) { + exists, err := s.environment.StreamExists(s.streamName) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.New("no stream exists, cannot stream events") + } + options := stream.NewProducerOptions(). + SetClientProvidedName(name). + SetProducerName(name). + SetConfirmationTimeOut(ConfirmationTimeout) + return &RabbitMQStreamPublisher{ + logger: s.logger, + streamName: s.streamName, + environment: s.environment, + options: options, + }, nil +} + +// Close the RabbitMQ connection. +func (s *RabbitMQStreamer) Close() { + err := s.environment.Close() + if err != nil { + s.logger.Error("failed to close RabbitMQStreamer", slog.Any("error", err)) + } +} + +// RabbitMQStreamConsumer is a structure implementing the Consumer interface. Used to consume events +// from a RabbitMQ stream. +type RabbitMQStreamConsumer struct { + ctx context.Context + logger *slog.Logger + streamName string + environment *stream.Environment + options *stream.ConsumerOptions + consumer *stream.Consumer + channel chan<- []byte + filter []string +} + +// WithChannel adds a channel for receiving events from the stream. If no +// channel is added, then events will be logged. +func (s *RabbitMQStreamConsumer) WithChannel(ch chan<- []byte) Consumer { + s.channel = ch + return s +} + +// WithOffset adds an offset to the RabbitMQ stream. -1 means start from the beginning. +func (s *RabbitMQStreamConsumer) WithOffset(offset int) Consumer { + if offset == -1 { + s.options = s.options.SetOffset(stream.OffsetSpecification{}.First()) + } else { + s.options = s.options.SetOffset(stream.OffsetSpecification{}.Offset(int64(offset))) + } + return s +} + +// WithFilter adds a filter to the RabbitMQ stream. +func (s *RabbitMQStreamConsumer) WithFilter(filter []string) Consumer { + s.filter = filter + if len(s.filter) > 0 { + s.options = s.options.SetFilter(stream.NewConsumerFilter(filter, IgnoreUnfiltered, s.postFilter)) + } + return s +} + +// Consume will start consuming the RabbitMQ stream, non blocking. A channel is returned where +// an error is sent when the consumer closes down. +func (s *RabbitMQStreamConsumer) Consume() (<-chan error, error) { + handler := func(_ stream.ConsumerContext, message *amqp.Message) { + for _, d := range message.Data { + if s.channel != nil { + s.channel <- d + } else { + s.logger.Debug(string(d)) + } + } + } + consumer, err := s.environment.NewConsumer(s.streamName, handler, s.options) + if err != nil { + return nil, err + } + s.consumer = consumer + closed := make(chan error, 1) + go s.notifyClose(s.ctx, closed) + return closed, nil +} + +// notifyClose will keep track of context and the notify close channel from RabbitMQ and send +// error on a channel. +func (s *RabbitMQStreamConsumer) notifyClose(ctx context.Context, ch chan<- error) { + closed := s.consumer.NotifyClose() + select { + case <-ctx.Done(): + ch <- ctx.Err() + case event := <-closed: + ch <- event.Err + } +} + +// Close the RabbitMQ stream consumer. +func (s *RabbitMQStreamConsumer) Close() { + if s.consumer != nil { + if err := s.consumer.Close(); err != nil { + s.logger.Error("failed to close rabbitmq consumer", slog.Any("error", err)) + } + } +} + +// postFilter applies client side filtering on all messages received from the RabbitMQ stream. +// The RabbitMQ server-side filtering is not perfect and will let through a few messages that don't +// match the filter, this is expected as the RabbitMQ unit of delivery is the chunk and there may +// be multiple messages in a chunk and those messages are not filtered. +func (s *RabbitMQStreamConsumer) postFilter(message *amqp.Message) bool { + if s.filter == nil { + return true // Unfiltered + } + identifier := message.ApplicationProperties["identifier"] + eventType := message.ApplicationProperties["type"] + eventMeta := message.ApplicationProperties["meta"] + name := fmt.Sprintf("%s.%s.%s", identifier, eventType, eventMeta) + for _, filter := range s.filter { + if name == filter { + return true + } + } + return false +} + +// RabbitMQStreamPublisher is a structure implementing the Publisher interface. Used to publish events +// to a RabbitMQ stream. +type RabbitMQStreamPublisher struct { + logger *slog.Logger + streamName string + environment *stream.Environment + options *stream.ProducerOptions + producer *ha.ReliableProducer + unConfirmedMessages chan message.StreamMessage + unConfirmed *sync.WaitGroup + done chan struct{} + shutdown bool +} + +type Filter struct { + Identifier string + Type string + Meta string +} + +// Start will start the RabbitMQ stream publisher, non blocking. A channel is returned where +// a events can be published. +func (s *RabbitMQStreamPublisher) Start() error { + // TODO: Make a 'New' function + s.unConfirmed = &sync.WaitGroup{} + // TODO: Verify if unlimited is good or bad. + s.unConfirmedMessages = make(chan message.StreamMessage) + s.options.SetFilter(stream.NewProducerFilter(func(message message.StreamMessage) string { + p := message.GetApplicationProperties() + return fmt.Sprintf("%s.%s.%s", p["identifier"], p["type"], p["meta"]) + })) + producer, err := ha.NewReliableProducer( + s.environment, + s.streamName, + s.options, + func(messageStatus []*stream.ConfirmationStatus) { + go func() { + for _, msgStatus := range messageStatus { + if msgStatus.IsConfirmed() { + s.unConfirmed.Done() + } else { + s.logger.Warn("Unconfirmed message", slog.Any("error", msgStatus.GetError())) + s.unConfirmedMessages <- msgStatus.GetMessage() + } + } + }() + }) + if err != nil { + return err + } + s.producer = producer + s.done = make(chan struct{}) + go s.publish(s.done) + return nil +} + +// Publish an event to the RabbitMQ stream. +func (s *RabbitMQStreamPublisher) Publish(msg []byte, filter Filter) { + if s.shutdown { + s.logger.Error("Publisher is closed") + return + } + s.unConfirmed.Add(1) + message := amqp.NewMessage(msg) + if filter.Meta == "" { + filter.Meta = "*" + } + message.ApplicationProperties = map[string]interface{}{ + "identifier": filter.Identifier, + "type": filter.Type, + "meta": filter.Meta, + } + s.unConfirmedMessages <- message +} + +// publish a message from unconfirmed messages to RabbitMQ. +func (s *RabbitMQStreamPublisher) publish(done chan struct{}) { + for { + select { + case msg := <-s.unConfirmedMessages: + if err := s.producer.Send(msg); err != nil { + s.logger.Error("Failed to send message", slog.Any("error", err)) + } + case <-done: + return + } + } +} + +// Close the RabbitMQ stream publisher. +func (s *RabbitMQStreamPublisher) Close() { + if s.producer != nil { + s.logger.Info("Stopping publisher") + s.shutdown = true + s.logger.Info("Wait for unconfirmed messages") + s.unConfirmed.Wait() + s.done <- struct{}{} + s.logger.Info("Done, closing down") + if err := s.producer.Close(); err != nil { + s.logger.Error("Failed to close rabbitmq publisher", slog.Any("error", err)) + } + close(s.unConfirmedMessages) + } +} diff --git a/pkg/stream/stream.go b/pkg/stream/stream.go new file mode 100644 index 0000000..5ecc198 --- /dev/null +++ b/pkg/stream/stream.go @@ -0,0 +1,37 @@ +// Copyright Axis Communications AB. +// +// For a full list of individual contributors, please see the commit history. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package stream + +type Streamer interface { + Consumer(string) (Consumer, error) + Publisher(string) (Publisher, error) + CreateStream(string) error + Close() +} + +type Consumer interface { + WithChannel(chan<- []byte) Consumer + WithOffset(int) Consumer + WithFilter([]string) Consumer + Consume() (<-chan error, error) + Close() +} + +type Publisher interface { + Start() error + Publish([]byte, Filter) + Close() +} diff --git a/pyproject.toml b/pyproject.toml index 3b6042a..f0b57a3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ dependencies = [ "eiffellib[rabbitmq]~=2.4", "requests~=2.31", "kubernetes~=26.1", - "pydantic~=2.1", + "pydantic~=2.10", "pyyaml~=6.0", "opentelemetry-api~=1.21", "opentelemetry-sdk~=1.21", diff --git a/setup.py b/setup.py index a3b1c0c..8f383cd 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,10 @@ # -*- coding: utf-8 -*- """Setup file for etos_lib.""" + +from distutils.errors import CompileError +from subprocess import call from setuptools import setup +from setuptools.command.build_ext import build_ext from setuptools_scm.version import get_local_dirty_tag @@ -38,5 +42,21 @@ def local_scheme(version) -> str: return f"+{version.node}" +class BuildGoBindings(build_ext): + """Custom command to build the go bindings in this repository.""" + + def build_extension(self, ext): + """Build the go extension.""" + raise CompileError("Testing") + cmd = ["make", "build"] + out = call(cmd) + if out != 0: + raise CompileError("Go build failed") + + if __name__ == "__main__": - setup(use_scm_version={"local_scheme": local_scheme, "version_scheme": version_scheme}) + setup( + use_scm_version={"local_scheme": local_scheme, "version_scheme": version_scheme}, + cmdclass={"build_ext": BuildGoBindings}, + zip_safe=False, + ) diff --git a/src/etos_lib/etos.py b/src/etos_lib/etos.py index 67e9f99..38ca627 100644 --- a/src/etos_lib/etos.py +++ b/src/etos_lib/etos.py @@ -14,6 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. """ETOS Library module.""" + +import time +from etos_lib.messaging.publisher import Publisher +from etos_lib.messaging.v1.publisher import Publisher as V1Publisher +from etos_lib.messaging.v2alpha.publisher import Publisher as V2alphaPublisher from .eiffel.publisher import TracingRabbitMQPublisher as RabbitMQPublisher from .eiffel.subscriber import TracingRabbitMQSubscriber as RabbitMQSubscriber from .graphql.query_handler import GraphQLQueryHandler @@ -78,6 +83,34 @@ def start_subscriber(self): self.subscriber.start() self.config.set("subscriber", self.subscriber) + def messagebus_publisher(self, version: str = "v1") -> Publisher: + """Start the internal messagebus publisher using config data from ETOS library. + + :param version: Version of the messagebus protocol to use. + """ + publisher = self.config.get("internal_publisher") + if publisher is None: + connection_parameters = self.config.etos_rabbitmq_publisher_data() + if not connection_parameters: + raise PublisherConfigurationMissing + if version == "v1": + publisher = V1Publisher(**connection_parameters) + elif version == "v2alpha": + publisher = V2alphaPublisher( + self.config.etos_rabbitmq_publisher_uri(), + self.config.etos_stream_name(), + ) + else: + raise ValueError(f"Unknown version {version!r} of messagebus") + if not self.debug.disable_sending_events: + publisher.start() + # Wait for start. + # No timeout necessary since there is a built-in timeout in the publisher. + while publisher.is_alive(): + time.sleep(0.1) + self.config.set("internal_publisher", publisher) + return publisher + @property def debug(self): """Entry for debug parameters for ETOS.""" diff --git a/src/etos_lib/lib/config.py b/src/etos_lib/lib/config.py index 280ae19..633c11f 100644 --- a/src/etos_lib/lib/config.py +++ b/src/etos_lib/lib/config.py @@ -14,7 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. """ETOS Library config.""" + import json +from urllib.parse import quote_plus from pprint import pprint import os @@ -92,6 +94,24 @@ def etos_rabbitmq_publisher_data(self): "ssl": ssl, } + def etos_stream_name(self): + """Get the stream name for the ETOS rabbitmq service.""" + return os.getenv("ETOS_STREAM_NAME", "etos") + + def etos_rabbitmq_publisher_uri(self): + """Get RabbitMQ URI for the ETOS rabbitmq service.""" + data = self.etos_rabbitmq_publisher_data() + port = os.getenv("ETOS_RABBITMQ_STREAM_PORT", os.getenv("ETOS_RABBITMQ_PORT", "5672")) + + if data.get("username") is not None and data.get("password") is None: + netloc = f"{data.get('username')}@{data.get('host')}:{port}" + elif data.get("username") is not None and data.get("password") is not None: + netloc = f"{data.get('username')}:{data.get('password')}@{data.get('host')}:{port}" + else: + netloc = f"{data.get('host')}:{port}" + vhost = quote_plus(data.get("vhost", "/") or "/") + return f"rabbitmq-stream://{netloc}/{vhost}" + def rabbitmq_from_environment(self): """Load RabbitMQ data from environment variables.""" ssl = os.getenv("RABBITMQ_SSL", "true") == "true" diff --git a/src/etos_lib/logging/default_config.yaml b/src/etos_lib/logging/default_config.yaml index a13e79e..b3b46a3 100644 --- a/src/etos_lib/logging/default_config.yaml +++ b/src/etos_lib/logging/default_config.yaml @@ -3,3 +3,5 @@ logging: loglevel: INFO file: loglevel: DEBUG + messagebus: + version: v1 diff --git a/src/etos_lib/logging/logger.py b/src/etos_lib/logging/logger.py index 9d37b46..cce9869 100644 --- a/src/etos_lib/logging/logger.py +++ b/src/etos_lib/logging/logger.py @@ -28,6 +28,7 @@ >>> [2020-12-16 10:35:00][cb7c8cd9-40a6-4ecc-8321-a1eae6beae35] INFO: Hello! """ + import atexit import logging import logging.config @@ -47,8 +48,10 @@ from etos_lib.logging.filter import EtosFilter from etos_lib.logging.formatter import EtosLogFormatter from etos_lib.logging.log_processors import ToStringProcessor -from etos_lib.logging.log_publisher import RabbitMQLogPublisher -from etos_lib.logging.rabbitmq_handler import RabbitMQHandler +from etos_lib.logging.messagebus_handler import MessagebusHandler +from etos_lib.messaging.publisher import Publisher +from etos_lib.messaging.v1.publisher import Publisher as V1Publisher +from etos_lib.messaging.v2alpha.publisher import Publisher as V2alphaPublisher DEFAULT_CONFIG = Path(__file__).parent.joinpath("default_config.yaml") DEFAULT_LOG_PATH = Debug().default_log_path @@ -131,34 +134,47 @@ def setup_stream_logging(config: dict, log_filter: EtosFilter) -> None: root_logger.addHandler(stream_handler) -def setup_rabbitmq_logging(log_filter: EtosFilter) -> None: - """Set up rabbitmq logging. +def setup_internal_messagebus_logging(config: dict, log_filter: EtosFilter) -> None: + """Set up internal messagebus logging. - :param log_filter: Logfilter to add to stream handler. + :param config: Internal logging configuration. + :type config: dict + :param log_filter: LogFilter to add to stream handler. :type log_filter: :obj:`EtosFilter` """ loglevel = logging.DEBUG root_logger = logging.getLogger() - # These loggers need to be prevented from propagating their logs - # to RabbitMQLogPublisher. If they aren't, this may cause a deadlock. - logging.getLogger("pika").propagate = False - logging.getLogger("eiffellib.publishers.rabbitmq_publisher").propagate = False - logging.getLogger("etos_lib.eiffel.publisher").propagate = False - logging.getLogger("base_rabbitmq").propagate = False - - rabbitmq = RabbitMQLogPublisher(**Config().etos_rabbitmq_publisher_data(), routing_key=None) + publisher = Config().get("internal_publisher") + version = config.get("version", "v1") + if publisher is None: + if version == "v1": + # These loggers need to be prevented from propagating their logs + # to V1Publisher. If they aren't, this may cause a deadlock. + logging.getLogger("pika").propagate = False + logging.getLogger("eiffellib.publishers.rabbitmq_publisher").propagate = False + logging.getLogger("etos_lib.eiffel.publisher").propagate = False + logging.getLogger("base_rabbitmq").propagate = False + publisher = V1Publisher(**Config().etos_rabbitmq_publisher_data()) + elif version == "v2alpha": + publisher = V2alphaPublisher( + Config().etos_rabbitmq_publisher_uri(), + Config().etos_stream_name(), + ) + else: + raise ValueError("Unknown version of messagebus") + Config().set("internal_publisher", publisher) if Debug().enable_sending_logs: - rabbitmq.start() - atexit.register(close_rabbit, rabbitmq) + publisher.start() + atexit.register(close_rabbit, publisher) - rabbit_handler = RabbitMQHandler(rabbitmq) - rabbit_handler.setFormatter(EtosLogFormatter()) - rabbit_handler.setLevel(loglevel) - rabbit_handler.addFilter(log_filter) + handler = MessagebusHandler(publisher) + handler.setFormatter(EtosLogFormatter()) + handler.setLevel(loglevel) + handler.addFilter(log_filter) - root_logger.addHandler(rabbit_handler) + root_logger.addHandler(handler) def setup_otel_logging( @@ -223,12 +239,13 @@ def setup_logging( setup_stream_logging(logging_config.get("stream"), log_filter) if logging_config.get("file"): setup_file_logging(logging_config.get("file"), log_filter) - setup_rabbitmq_logging(log_filter) + setup_internal_messagebus_logging(logging_config.get("messagebus"), log_filter) if otel_resource: setup_otel_logging(log_filter, otel_resource) -def close_rabbit(rabbit: RabbitMQLogPublisher) -> None: +def close_rabbit(rabbit: Publisher) -> None: """Close down a rabbitmq connection.""" - rabbit.wait_for_unpublished_events() + if isinstance(rabbit, V1Publisher): + rabbit.wait_for_unpublished_events() rabbit.close() diff --git a/src/etos_lib/logging/rabbitmq_handler.py b/src/etos_lib/logging/messagebus_handler.py similarity index 58% rename from src/etos_lib/logging/rabbitmq_handler.py rename to src/etos_lib/logging/messagebus_handler.py index 0a10dde..0fd82d0 100644 --- a/src/etos_lib/logging/rabbitmq_handler.py +++ b/src/etos_lib/logging/messagebus_handler.py @@ -14,12 +14,16 @@ # See the License for the specific language governing permissions and # limitations under the License. """ETOS rabbitmq handler.""" + import json import logging +from etos_lib.messaging.publisher import Publisher +from etos_lib.messaging.v2alpha.publisher import Publisher as V2alphaPublisher +from etos_lib.messaging.events import Log, Message -class RabbitMQHandler(logging.StreamHandler): - """A RabbitMQ log handler that sends logs tagged with user_log to RabbitMQ. +class MessagebusHandler(logging.StreamHandler): + """A log handler that sends logs tagged with user_log to the internal messagebus. Example:: @@ -35,34 +39,37 @@ class RabbitMQHandler(logging.StreamHandler): closing = False - def __init__(self, rabbitmq): + def __init__(self, publisher: Publisher): """Initialize.""" super().__init__() - self.rabbitmq = rabbitmq + self.publisher = publisher - def emit(self, record): - """Send user log to RabbitMQ, starting the connection if necessary. + def emit(self, record: logging.LogRecord): + """Send user log to messagebus. - The record parameter "user_log" must be set to True if a message shall be - sent to RabbitMQ. + The record parameter "user_log" must be set to True if a message shall be sent. """ if self.closing: return - try: send = record.user_log except AttributeError: - send = False + # If it's the v2alpha protocol, always set to True unless explicitly + # disabled with `user_log=False`. + send = isinstance(self.publisher, V2alphaPublisher) + if not send: + return msg = self.format(record) + if not isinstance(msg, dict): + msg = json.loads(msg) + try: identifier = record.identifier except AttributeError: - identifier = json.loads(msg).get("identifier", "Unknown") - # An unknown identifier will never be picked up by user log handler - # so it is unnecessary to send it. - routing_key = f"{identifier}.log.{record.levelname}" - if send and self.rabbitmq.is_alive(): - if identifier == "Unknown": - raise ValueError("Trying to send a user log when identifier is not set") - self.rabbitmq.send_event(msg, routing_key=routing_key) + identifier = msg.get("identifier") + if identifier is None or identifier == "Unknown": + return + + if self.publisher.is_alive(): + self.publisher.publish(identifier, Message(data=Log(**msg))) diff --git a/src/etos_lib/messaging/__init__.py b/src/etos_lib/messaging/__init__.py new file mode 100644 index 0000000..217dd7a --- /dev/null +++ b/src/etos_lib/messaging/__init__.py @@ -0,0 +1,16 @@ +# Copyright Axis Communications AB. +# +# For a full list of individual contributors, please see the commit history. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""ETOS internal event publisher.""" diff --git a/src/etos_lib/messaging/events.py b/src/etos_lib/messaging/events.py new file mode 100644 index 0000000..118f609 --- /dev/null +++ b/src/etos_lib/messaging/events.py @@ -0,0 +1,126 @@ +# Copyright Axis Communications AB. +# +# For a full list of individual contributors, please see the commit history. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""ETOS internal messaging events.""" + +import inspect +import sys +from typing import Optional, Any +from pydantic import BaseModel, Field +from .types import File, Log, Result + + +class Event(BaseModel): + """Base internal messaging event.""" + + id: Optional[int] = None + event: str = "Unknown" + data: Any + meta: str = "*" + + def __str__(self) -> str: + """Return the string representation of an event.""" + return f"{self.event}({self.id}): {self.data}" + + def __eq__(self, other: "Event") -> bool: # type:ignore + """Check if the event is the same by testing the IDs.""" + if self.id is None or other.id is None: + return super().__eq__(other) + return self.id == other.id + + +class ServerEvent(Event): + """Events to be handled by the client.""" + + +class UserEvent(Event): + """Events to be handled by the user.""" + + +class Ping(ServerEvent): + """A ping event. Sent to keep connection between server and client alive.""" + + event: str = "Ping" + data: Any = None + + +class Error(ServerEvent): + """An error from the messaging server.""" + + event: str = "Error" + data: Any = None + + +class Unknown(UserEvent): + """An unknown event.""" + + event: str = "Unknown" + data: Any = None + + +class Shutdown(UserEvent): + """A shutdown event from ETOS.""" + + event: str = "Shutdown" + data: Result + + def __str__(self) -> str: + """Return the string representation of a shutdown.""" + return ( + f"Result(conclusion={self.data.conclusion}, " + f"verdict={self.data.verdict}, description={self.data.description})" + ) + + +class Message(UserEvent): + """An ETOS user log event.""" + + event: str = "Message" + data: Log + meta: str = Field(default_factory=lambda data: data["data"].level) + + def __str__(self) -> str: + """Return the string representation of a user log.""" + return self.data.message + + +class Report(UserEvent): + """An ETOS test case report file event.""" + + event: str = "Report" + data: File + + def __str__(self) -> str: + """Return the string representation of a file.""" + return f"[{self.data.name}]({self.data.url})" + + +class Artifact(UserEvent): + """An ETOS test case artifact file event.""" + + event: str = "Artifact" + data: File + + def __str__(self) -> str: + """Return the string representation of a file.""" + return f"[{self.data.name}]({self.data.url})" + + +def parse(event: dict) -> Event: + """Parse an event dict and return a corresponding Event class.""" + for name, obj in inspect.getmembers(sys.modules[__name__]): + if event.get("event", "").lower() == name.lower(): + return obj.model_validate(event) + return Unknown(**event) diff --git a/src/etos_lib/messaging/publisher.py b/src/etos_lib/messaging/publisher.py new file mode 100644 index 0000000..34a14a8 --- /dev/null +++ b/src/etos_lib/messaging/publisher.py @@ -0,0 +1,38 @@ +# Copyright Axis Communications AB. +# +# For a full list of individual contributors, please see the commit history. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""ETOS publisher interface.""" + +from .events import Event + + +class Publisher: + """Publisher interface for ETOS.""" + + def publish(self, testrun_id: str, event: Event): + """Publish an event to the internal messagebus.""" + raise NotImplementedError + + def start(self): + """Start the connection to the server.""" + raise NotImplementedError + + def close(self): + """Close the connection to the server.""" + raise NotImplementedError + + def is_alive(self): + """Check if the connection is alive.""" + raise NotImplementedError diff --git a/src/etos_lib/messaging/types.py b/src/etos_lib/messaging/types.py new file mode 100644 index 0000000..39a3d17 --- /dev/null +++ b/src/etos_lib/messaging/types.py @@ -0,0 +1,54 @@ +# Copyright Axis Communications AB. +# +# For a full list of individual contributors, please see the commit history. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Types used by events but are not events themselves.""" + +from datetime import datetime +from typing import Optional +from typing_extensions import Annotated +from pydantic import AliasChoices, BaseModel, Field, StringConstraints + + +class File(BaseModel): + """An ETOS file event.""" + + url: str + name: str + directory: Optional[str] = None + checksums: dict = {} + + +class Log(BaseModel): + """An ETOS log.""" + + message: str + level: Annotated[str, StringConstraints(to_lower=True)] = Field("info", alias="levelname") + name: str + # The datestring field is, by default, generated as '@timestamp' but since + # that is illegal in python we convert the name over to 'datestring'. Using + # an aliased Field. + # The '@timestamp' key is necessary for logstash, which we support, so we + # cannot update the formatter that creates the '@timestamp' key. + datestring: datetime = Field( + serialization_alias="datestring", validation_alias=AliasChoices("@timestamp", "datestring") + ) + + +class Result(BaseModel): + """Shutdown result.""" + + conclusion: str + verdict: str + description: str = "" diff --git a/src/etos_lib/messaging/v1/__init__.py b/src/etos_lib/messaging/v1/__init__.py new file mode 100644 index 0000000..217dd7a --- /dev/null +++ b/src/etos_lib/messaging/v1/__init__.py @@ -0,0 +1,16 @@ +# Copyright Axis Communications AB. +# +# For a full list of individual contributors, please see the commit history. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""ETOS internal event publisher.""" diff --git a/src/etos_lib/logging/log_publisher.py b/src/etos_lib/messaging/v1/publisher.py similarity index 82% rename from src/etos_lib/logging/log_publisher.py rename to src/etos_lib/messaging/v1/publisher.py index b7089d3..6fa7f72 100644 --- a/src/etos_lib/logging/log_publisher.py +++ b/src/etos_lib/messaging/v1/publisher.py @@ -14,16 +14,27 @@ # See the License for the specific language governing permissions and # limitations under the License. """ETOS rabbitmq log publisher.""" + import time import json import pika from eiffellib.publishers import RabbitMQPublisher +from ..events import Event +from ..publisher import Publisher as PublisherInterface + -class RabbitMQLogPublisher(RabbitMQPublisher): +class Publisher(RabbitMQPublisher, PublisherInterface): """A RabbitMQ publisher that can send JSON strings instead of Eiffel events.""" + def publish(self, testrun_id, event: Event): + """Publish an event to the internal messagebus.""" + self.send_event( + event=event.model_dump_json(), + routing_key=f"{testrun_id}.{event.event.lower()}.{event.meta}", + ) + def send_event(self, event, block=True, routing_key="#"): """Overload the send_event from the eiffellib rabbitmq publisher to send strings. diff --git a/src/etos_lib/messaging/v2alpha/__init__.py b/src/etos_lib/messaging/v2alpha/__init__.py new file mode 100644 index 0000000..217dd7a --- /dev/null +++ b/src/etos_lib/messaging/v2alpha/__init__.py @@ -0,0 +1,16 @@ +# Copyright Axis Communications AB. +# +# For a full list of individual contributors, please see the commit history. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""ETOS internal event publisher.""" diff --git a/src/etos_lib/messaging/v2alpha/publisher.py b/src/etos_lib/messaging/v2alpha/publisher.py new file mode 100644 index 0000000..5624b60 --- /dev/null +++ b/src/etos_lib/messaging/v2alpha/publisher.py @@ -0,0 +1,158 @@ +# Copyright Axis Communications AB. +# +# For a full list of individual contributors, please see the commit history. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""ETOS internal event publisher.""" + +import os +import ctypes +import logging +from pathlib import Path +from ..events import Event +from ..publisher import Publisher as PublisherInterface + + +# This package: etos_lib/messaging/v2alpha/publisher.py +# Root directory: etos_lib/ +# Path: ../../ +ROOT = str(Path(__file__).parent.parent.parent.joinpath("bindings")) +LIBRARY_PATH = Path(os.getenv("BINDINGS", ROOT)).joinpath("messaging/client.so") + + +class Publisher(PublisherInterface): # pylint:disable=too-many-instance-attributes + """Internal messaging publisher for ETOS. + + This publisher is running in a shared C library created from + Go because the Python library that we have access to is not + fast enough and we could not create reliable connections using + it. + + start() - Start up the publisher + publish() - Publish an event + close() - Stop the publisher and wait for outstanding events. + is_alive() - Check if publisher is alive. + + Example usage: + + # Make sure that the ETOS_RABBITMQ_* and ETOS_STREAM_NAME environment variables are set. + from etos_lib.etos import ETOS as ETOS_LIBRARY + from etos_lib.messaging.protocol import Shutdown, Result + ETOS = ETOS_LIBRARY(...) + with ETOS.messagebus_publisher() as PUBLISHER: + PUBLISHER.publish("identifier", Shutdown(data=Result( + conclusion="SUCCESSFUL", + verdict="PASSED", + description="Hello world", + ))) + + Example usage with logger: + + # Make sure that the ETOS_RABBITMQ_* and ETOS_STREAM_NAME environment variables are set. + import logging + from etos_lib.logging.logger import setup_logging, FORMAT_CONFIG + setup_logging("name", "version", "environment", None) + FORMAT_CONFIG.identifier = "identifier" + LOGGER = logging.getLogger(__name__) + LOGGER.info("Hello world", extra={"user_log": True}) + """ + + __connected = False + logger = logging.getLogger(__name__) + + def __init__(self, connection_string: str, stream_name: str): + """Initialize the Publisher object.""" + self.stream_name = stream_name + self.__connection_string = connection_string + self.__setup_bindings(LIBRARY_PATH) + self.__handler = self.__library.New( + self.__connection_string.encode("utf-8"), + self.stream_name.encode("utf-8"), + ) + + def __setup_bindings(self, library_path: Path): + """Set up the bindings for the Publisher.""" + self.__library = ctypes.cdll.LoadLibrary(str(library_path)) + self.__connect = self.__library.Publisher + self.__connect.argtypes = [ + ctypes.c_int, # pointer to stream handler + ctypes.c_char_p, # connectionString + ctypes.c_char_p, # streamName + ] + self.__publish = self.__library.Publish + self.__publish.argtypes = [ + ctypes.c_int, # pointer to stream handler + ctypes.c_char_p, # event + ctypes.c_char_p, # identifier + ctypes.c_char_p, # eventType + ctypes.c_char_p, # meta + ] + self.__close = self.__library.Close + self.__close.argtypes = [ctypes.c_int] # pointer to stream handler + + def __enter__(self): + """Connect to the server.""" + self.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Close the connection to the server.""" + self.close() + + def __del__(self): + """Close the connection to the server.""" + self.close() + + def start(self): + """Start the connection to the server.""" + if not self.__connected: + success = self.__connect( + self.__handler, + self.__connection_string.encode("utf-8"), + self.stream_name.encode("utf-8"), + ) + if not success: + raise ConnectionError("Failed to connect to stream") + self.__connected = True + + def publish(self, testrun_id: str, event: Event): + """Publish an event to the internal messagebus.""" + assert self.__connected, "Not connected to the server" + self.logger.debug( + "Publishing event '%s' to id %r", + event, + testrun_id, + extra={"user_log": False}, + ) + return self.__publish( + self.__handler, + event.model_dump_json().encode("utf-8"), + testrun_id.encode("utf-8"), + event.event.lower().encode("utf-8"), + event.meta.encode("utf-8"), + ) + + def is_alive(self): + """Check if the connection is alive.""" + return self.__connected + + def close(self): + """Close the connection to the server.""" + if self.__connected: + self.__close(self.__handler) + self.__connected = False + self.__handler = None + + +if __name__ == "__main__": + PUBLISHER = Publisher("amqp://kalle", "kula")