From 27b0735faef45b0e690d99c1ff4c1388ecdf4bc1 Mon Sep 17 00:00:00 2001 From: Tobias Persson Date: Thu, 27 Feb 2025 11:53:33 +0100 Subject: [PATCH 01/11] Add a client for sending internal events This is the client that should be used together with the SSE v2 server Also updated the rabbitmq log handler to send using this client instead The logger will now publish all logs, not only user logs. Note that the client is written in go, exported as a shared C library and then loaded into python. The client can be built using make build and is stored in the bindings directory. To enable these bindings when testing locally an environment variable called BINDINGS must be set to the bindings directory. The reason for deciding to use Go -> C -> Python instead of using the python library that exists is because the Python library is lackluster in my opinion. Error messages were unclear and it was quite slow. It is also written with asyncio which causes it to infect everything else and made the client a lot more difficult to create. With Go we get a reliable, and fast, publisher and we can use it for our Go code as well and since we are going the route (at some point) in re-writing ETOS into Go I don't see this as a problem. This solution is not ready and there are TODOs. I just want feedback on the approach. --- Makefile | 43 +++ cmd/messaging/main.go | 27 ++ go.mod | 21 ++ go.sum | 63 ++++ pkg/bindings/publisher.go | 98 ++++++ pkg/stream/rabbitmq.go | 329 ++++++++++++++++++ pkg/stream/stream.go | 37 ++ pyproject.toml | 3 +- src/etos_lib/etos.py | 23 ++ src/etos_lib/lib/config.py | 18 + src/etos_lib/logging/log_publisher.py | 52 --- src/etos_lib/logging/logger.py | 48 ++- ...bitmq_handler.py => messagebus_handler.py} | 42 +-- src/etos_lib/messaging/__init__.py | 16 + src/etos_lib/messaging/events.py | 126 +++++++ src/etos_lib/messaging/publisher.py | 129 +++++++ src/etos_lib/messaging/types.py | 48 +++ 17 files changed, 1022 insertions(+), 101 deletions(-) create mode 100644 Makefile create mode 100644 cmd/messaging/main.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 pkg/bindings/publisher.go create mode 100644 pkg/stream/rabbitmq.go create mode 100644 pkg/stream/stream.go delete mode 100644 src/etos_lib/logging/log_publisher.py rename src/etos_lib/logging/{rabbitmq_handler.py => messagebus_handler.py} (56%) create mode 100644 src/etos_lib/messaging/__init__.py create mode 100644 src/etos_lib/messaging/events.py create mode 100644 src/etos_lib/messaging/publisher.py create mode 100644 src/etos_lib/messaging/types.py diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..85322e6 --- /dev/null +++ b/Makefile @@ -0,0 +1,43 @@ +export GOBIN := $(CURDIR)/bin +export BINDINGS := $(CURDIR)/bindings +export BUILD_MESSAGING := go build --buildmode=c-shared -o $(BINDINGS)/stream/client.so ./cmd/messaging/main.go + +GOLANGCI_LINT = $(GOBIN)/golangci-lint +GOLANGCI_LINT_VERSION = v1.52.2 + +.PHONY: all +all: check build + +.PHONY: check +check: staticcheck test + +.PHONY: test +test: + go test -cover -timeout 30s -race $(shell go list ./... | grep -v "etos-library/test") + +.PHONY: tidy +tidy: + go mod tidy + +.PHONY: check-dirty +check-dirty: + $(GIT) diff --exit-code HEAD + +.PHONY: staticcheck +staticcheck: $(GOLANGCI_LINT) + $(GOLANGCI_LINT) run + +.PHONY: clean +clean: + $(RM) $(GOBIN)/* + $(RM) -r $(BINDINGS)/* + +.PHONY: build +build: + $(BUILD_MESSAGING) + +$(GOLANGCI_LINT): + mkdir -p $(dir $@) + curl -sfL \ + https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh \ + | sh -s -- -b $(GOBIN) $(GOLANGCI_LINT_VERSION) diff --git a/cmd/messaging/main.go b/cmd/messaging/main.go new file mode 100644 index 0000000..29c691c --- /dev/null +++ b/cmd/messaging/main.go @@ -0,0 +1,27 @@ +// Copyright Axis Communications AB. +// +// For a full list of individual contributors, please see the commit history. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package main + +import ( + "C" + + // This import will make sure that we build the bindings on make build + _ "github.com/eiffel-community/etos-library/pkg/bindings" +) + +// main is required for cgo to work +func main() { +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..5480181 --- /dev/null +++ b/go.mod @@ -0,0 +1,21 @@ +module github.com/eiffel-community/etos-library + +go 1.22.1 + +require ( + github.com/rabbitmq/rabbitmq-stream-go-client v1.5.0 + github.com/sirupsen/logrus v1.9.3 +) + +require ( + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/klauspost/compress v1.17.9 // indirect + github.com/kr/pretty v0.3.1 // indirect + github.com/pierrec/lz4 v2.6.1+incompatible // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/spaolacci/murmur3 v1.1.0 // indirect + github.com/stretchr/testify v1.9.0 // indirect + golang.org/x/sys v0.28.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..c1a1827 --- /dev/null +++ b/go.sum @@ -0,0 +1,63 @@ +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad h1:a6HEuzUHeKH6hwfN/ZoQgRgVIWFJljSWa/zetS2WTvg= +github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/onsi/ginkgo/v2 v2.22.2 h1:/3X8Panh8/WwhU/3Ssa6rCKqPLuAkVY2I0RoyDLySlU= +github.com/onsi/ginkgo/v2 v2.22.2/go.mod h1:oeMosUL+8LtarXBHu/c0bx2D/K9zyQ6uX3cTyztHwsk= +github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8= +github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY= +github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= +github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/rabbitmq-stream-go-client v1.5.0 h1:2UWryxhtQmYA3Bx2iajQCre3yQbARiSikpC/8iWbu3k= +github.com/rabbitmq/rabbitmq-stream-go-client v1.5.0/go.mod h1:KDXSNVSqj4QNg6TNMBnQQ/oWHaxLjUI1520j68SyEcY= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= +github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= +github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= +golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8= +golang.org/x/tools v0.28.0/go.mod h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/bindings/publisher.go b/pkg/bindings/publisher.go new file mode 100644 index 0000000..724feb5 --- /dev/null +++ b/pkg/bindings/publisher.go @@ -0,0 +1,98 @@ +// Copyright Axis Communications AB. +// +// For a full list of individual contributors, please see the commit history. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package bindings + +import ( + "C" + "log/slog" + "os" + + "github.com/eiffel-community/etos-library/pkg/stream" +) + +var ( + Streamer stream.Streamer + Publisher stream.Publisher + Consumer stream.Consumer +) +var logger = slog.New(slog.NewTextHandler(os.Stdout, nil)) + +// Connect to the stream. +// +//export Connect +func Connect(connectionString, streamName *C.char) bool { + if err := setupPublisher(C.GoString(connectionString), C.GoString(streamName)); err != nil { + logger.Error("Failed to setup publisher", slog.Any("Error", err)) + return false + } + return true +} + +// Publish a message to the stream. +// +//export Publish +func Publish(event, identifier, eventType, meta *C.char) bool { + message := C.GoString(event) + filter := stream.Filter{ + Identifier: C.GoString(identifier), + Type: C.GoString(eventType), + Meta: C.GoString(meta), + } + Publisher.Publish([]byte(message), filter) + return true +} + +// Close any active publisher, streamer and consumer. +// +//export Close +func Close() { + if Publisher != nil { + Publisher.Close() + } + if Streamer != nil { + Streamer.Close() + } + if Consumer != nil { + Consumer.Close() + } +} + +// setupPublisher sets up a publisher for the stream. +func setupPublisher(connectionString, streamName string) error { + var err error + if Streamer == nil { + if err = setupStreamer(connectionString, streamName); err != nil { + return err + } + } + // TODO: This name + Publisher, err = Streamer.Publisher("publisher-1") + if err != nil { + return err + } + if err = Publisher.Start(); err != nil { + return err + } + return nil +} + +// setupStreamer sets up a streamer for the stream. +func setupStreamer(connectionString, streamName string) error { + var err error + addresses := []string{connectionString} + Streamer, err = stream.NewRabbitMQStreamer(streamName, addresses, logger) + return err +} diff --git a/pkg/stream/rabbitmq.go b/pkg/stream/rabbitmq.go new file mode 100644 index 0000000..d84f92a --- /dev/null +++ b/pkg/stream/rabbitmq.go @@ -0,0 +1,329 @@ +// Copyright Axis Communications AB. +// +// For a full list of individual contributors, please see the commit history. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package stream + +import ( + "context" + "errors" + "fmt" + "log/slog" + "sync" + "time" + + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" +) + +const ( + IgnoreUnfiltered = false + MaxLengthBytes = "2gb" + MaxAge = 10 * time.Second + ConfirmationTimeout = 2 * time.Second +) + +// RabbitMQStreamer is a struct representing a single RabbitMQ connection. From a new connection new streams may be +// created. Normal case is to have a single connection with multiple streams. If multiple connections are needed +// then multiple instances of the program should be run. +type RabbitMQStreamer struct { + logger *slog.Logger + environment *stream.Environment + streamName string +} + +// NewRabbitMQStreamer creates a new RabbitMQ streamer. Only a single connection should be created. +func NewRabbitMQStreamer( + streamName string, + addresses []string, + logger *slog.Logger, +) (Streamer, error) { + options := stream.NewEnvironmentOptions().SetMaxProducersPerClient(1).SetUris(addresses) + env, err := stream.NewEnvironment(options) + if err != nil { + return nil, err + } + return &RabbitMQStreamer{logger: logger, environment: env, streamName: streamName}, err +} + +// CreateStream creates a new RabbitMQ stream. +func (s *RabbitMQStreamer) CreateStream(name string) error { + // This will create the stream if not already created. + return s.environment.DeclareStream(name, + &stream.StreamOptions{ + MaxLengthBytes: stream.ByteCapacity{}.From(MaxLengthBytes), + MaxAge: MaxAge, + }, + ) +} + +// Consumer returns a new Consumer. +func (s *RabbitMQStreamer) Consumer(name string) (Consumer, error) { + exists, err := s.environment.StreamExists(s.streamName) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.New("no stream exists, cannot stream events") + } + options := stream.NewConsumerOptions(). + SetClientProvidedName(name). + SetConsumerName(name). + SetCRCCheck(false). + SetOffset(stream.OffsetSpecification{}.First()) + return &RabbitMQStreamConsumer{ + logger: s.logger, + streamName: s.streamName, + environment: s.environment, + options: options, + }, nil +} + +// Publisher returns a new Publisher. +func (s *RabbitMQStreamer) Publisher(name string) (Publisher, error) { + exists, err := s.environment.StreamExists(s.streamName) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.New("no stream exists, cannot stream events") + } + options := stream.NewProducerOptions(). + SetClientProvidedName(name). + SetProducerName(name). + SetConfirmationTimeOut(ConfirmationTimeout) + return &RabbitMQStreamPublisher{ + logger: s.logger, + streamName: s.streamName, + environment: s.environment, + options: options, + }, nil +} + +// Close the RabbitMQ connection. +func (s *RabbitMQStreamer) Close() { + err := s.environment.Close() + if err != nil { + s.logger.Error("failed to close RabbitMQStreamer", slog.Any("error", err)) + } +} + +// RabbitMQStreamConsumer is a structure implementing the Consumer interface. Used to consume events +// from a RabbitMQ stream. +type RabbitMQStreamConsumer struct { + ctx context.Context + logger *slog.Logger + streamName string + environment *stream.Environment + options *stream.ConsumerOptions + consumer *stream.Consumer + channel chan<- []byte + filter []string +} + +// WithChannel adds a channel for receiving events from the stream. If no +// channel is added, then events will be logged. +func (s *RabbitMQStreamConsumer) WithChannel(ch chan<- []byte) Consumer { + s.channel = ch + return s +} + +// WithOffset adds an offset to the RabbitMQ stream. -1 means start from the beginning. +func (s *RabbitMQStreamConsumer) WithOffset(offset int) Consumer { + if offset == -1 { + s.options = s.options.SetOffset(stream.OffsetSpecification{}.First()) + } else { + s.options = s.options.SetOffset(stream.OffsetSpecification{}.Offset(int64(offset))) + } + return s +} + +// WithFilter adds a filter to the RabbitMQ stream. +func (s *RabbitMQStreamConsumer) WithFilter(filter []string) Consumer { + s.filter = filter + if len(s.filter) > 0 { + s.options = s.options.SetFilter(stream.NewConsumerFilter(filter, IgnoreUnfiltered, s.postFilter)) + } + return s +} + +// Consume will start consuming the RabbitMQ stream, non blocking. A channel is returned where +// an error is sent when the consumer closes down. +func (s *RabbitMQStreamConsumer) Consume() (<-chan error, error) { + handler := func(_ stream.ConsumerContext, message *amqp.Message) { + for _, d := range message.Data { + if s.channel != nil { + s.channel <- d + } else { + s.logger.Debug(string(d)) + } + } + } + consumer, err := s.environment.NewConsumer(s.streamName, handler, s.options) + if err != nil { + return nil, err + } + s.consumer = consumer + closed := make(chan error, 1) + go s.notifyClose(s.ctx, closed) + return closed, nil +} + +// notifyClose will keep track of context and the notify close channel from RabbitMQ and send +// error on a channel. +func (s *RabbitMQStreamConsumer) notifyClose(ctx context.Context, ch chan<- error) { + closed := s.consumer.NotifyClose() + select { + case <-ctx.Done(): + ch <- ctx.Err() + case event := <-closed: + ch <- event.Err + } +} + +// Close the RabbitMQ stream consumer. +func (s *RabbitMQStreamConsumer) Close() { + if s.consumer != nil { + if err := s.consumer.Close(); err != nil { + s.logger.Error("failed to close rabbitmq consumer", slog.Any("error", err)) + } + } +} + +// postFilter applies client side filtering on all messages received from the RabbitMQ stream. +// The RabbitMQ server-side filtering is not perfect and will let through a few messages that don't +// match the filter, this is expected as the RabbitMQ unit of delivery is the chunk and there may +// be multiple messages in a chunk and those messages are not filtered. +func (s *RabbitMQStreamConsumer) postFilter(message *amqp.Message) bool { + if s.filter == nil { + return true // Unfiltered + } + identifier := message.ApplicationProperties["identifier"] + eventType := message.ApplicationProperties["type"] + eventMeta := message.ApplicationProperties["meta"] + name := fmt.Sprintf("%s.%s.%s", identifier, eventType, eventMeta) + for _, filter := range s.filter { + if name == filter { + return true + } + } + return false +} + +// RabbitMQStreamPublisher is a structure implementing the Publisher interface. Used to publish events +// to a RabbitMQ stream. +type RabbitMQStreamPublisher struct { + logger *slog.Logger + streamName string + environment *stream.Environment + options *stream.ProducerOptions + producer *ha.ReliableProducer + unConfirmedMessages chan message.StreamMessage + unConfirmed *sync.WaitGroup + done chan struct{} + shutdown bool +} + +type Filter struct { + Identifier string + Type string + Meta string +} + +// Start will start the RabbitMQ stream publisher, non blocking. A channel is returned where +// a events can be published. +func (s *RabbitMQStreamPublisher) Start() error { + // TODO: Make a 'New' function + s.unConfirmed = &sync.WaitGroup{} + // TODO: Verify if unlimited is good or bad. + s.unConfirmedMessages = make(chan message.StreamMessage) + s.options.SetFilter(stream.NewProducerFilter(func(message message.StreamMessage) string { + p := message.GetApplicationProperties() + return fmt.Sprintf("%s.%s.%s", p["identifier"], p["type"], p["meta"]) + })) + producer, err := ha.NewReliableProducer( + s.environment, + s.streamName, + s.options, + func(messageStatus []*stream.ConfirmationStatus) { + go func() { + for _, msgStatus := range messageStatus { + if msgStatus.IsConfirmed() { + s.unConfirmed.Done() + } else { + s.logger.Warn("Unconfirmed message", slog.Any("error", msgStatus.GetError())) + s.unConfirmedMessages <- msgStatus.GetMessage() + } + } + }() + }) + if err != nil { + return err + } + s.producer = producer + s.done = make(chan struct{}) + go s.publish(s.done) + return nil +} + +// Publish an event to the RabbitMQ stream. +func (s *RabbitMQStreamPublisher) Publish(msg []byte, filter Filter) { + if s.shutdown { + s.logger.Error("Publisher is closed") + return + } + s.unConfirmed.Add(1) + message := amqp.NewMessage(msg) + if filter.Meta == "" { + filter.Meta = "*" + } + message.ApplicationProperties = map[string]interface{}{ + "identifier": filter.Identifier, + "type": filter.Type, + "meta": filter.Meta, + } + s.unConfirmedMessages <- message +} + +// publish a message from unconfirmed messages to RabbitMQ. +func (s *RabbitMQStreamPublisher) publish(done chan struct{}) { + for { + select { + case msg := <-s.unConfirmedMessages: + if err := s.producer.Send(msg); err != nil { + s.logger.Error("Failed to send message", slog.Any("error", err)) + } + case <-done: + return + } + } +} + +// Close the RabbitMQ stream publisher. +func (s *RabbitMQStreamPublisher) Close() { + if s.producer != nil { + s.logger.Info("Stopping publisher") + s.shutdown = true + s.logger.Info("Wait for unconfirmed messages") + s.unConfirmed.Wait() + s.done <- struct{}{} + s.logger.Info("Done, closing down") + if err := s.producer.Close(); err != nil { + s.logger.Error("Failed to close rabbitmq publisher", slog.Any("error", err)) + } + close(s.unConfirmedMessages) + } +} diff --git a/pkg/stream/stream.go b/pkg/stream/stream.go new file mode 100644 index 0000000..5ecc198 --- /dev/null +++ b/pkg/stream/stream.go @@ -0,0 +1,37 @@ +// Copyright Axis Communications AB. +// +// For a full list of individual contributors, please see the commit history. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package stream + +type Streamer interface { + Consumer(string) (Consumer, error) + Publisher(string) (Publisher, error) + CreateStream(string) error + Close() +} + +type Consumer interface { + WithChannel(chan<- []byte) Consumer + WithOffset(int) Consumer + WithFilter([]string) Consumer + Consume() (<-chan error, error) + Close() +} + +type Publisher interface { + Start() error + Publish([]byte, Filter) + Close() +} diff --git a/pyproject.toml b/pyproject.toml index 3b6042a..587f3f3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,8 +20,9 @@ dependencies = [ "eiffellib[rabbitmq]~=2.4", "requests~=2.31", "kubernetes~=26.1", - "pydantic~=2.1", + "pydantic~=2.10", "pyyaml~=6.0", + "rstream~=0.20", "opentelemetry-api~=1.21", "opentelemetry-sdk~=1.21", "opentelemetry-exporter-otlp~=1.21", diff --git a/src/etos_lib/etos.py b/src/etos_lib/etos.py index 67e9f99..9c2d04c 100644 --- a/src/etos_lib/etos.py +++ b/src/etos_lib/etos.py @@ -14,6 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. """ETOS Library module.""" + +import time +from etos_lib.messaging.publisher import Publisher from .eiffel.publisher import TracingRabbitMQPublisher as RabbitMQPublisher from .eiffel.subscriber import TracingRabbitMQSubscriber as RabbitMQSubscriber from .graphql.query_handler import GraphQLQueryHandler @@ -78,6 +81,26 @@ def start_subscriber(self): self.subscriber.start() self.config.set("subscriber", self.subscriber) + def messagebus_publisher(self) -> Publisher: + """Start the internal messagebus publisher using config data from ETOS library.""" + publisher = self.config.get("internal_publisher") + if publisher is None: + connection_parameters = self.config.etos_rabbitmq_publisher_data() + if not connection_parameters: + raise PublisherConfigurationMissing + publisher = Publisher( + self.config.etos_rabbitmq_publisher_uri(), + self.config.etos_stream_name(), + ) + if not self.debug.disable_sending_events: + publisher.start() + # Wait for start. + # No timeout necessary since there is a built-in timeout in the publisher. + while publisher.is_alive(): + time.sleep(0.1) + self.config.set("internal_publisher", publisher) + return publisher + @property def debug(self): """Entry for debug parameters for ETOS.""" diff --git a/src/etos_lib/lib/config.py b/src/etos_lib/lib/config.py index 280ae19..1be7d5a 100644 --- a/src/etos_lib/lib/config.py +++ b/src/etos_lib/lib/config.py @@ -14,7 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. """ETOS Library config.""" + import json +from urllib.parse import quote_plus from pprint import pprint import os @@ -92,6 +94,22 @@ def etos_rabbitmq_publisher_data(self): "ssl": ssl, } + def etos_stream_name(self): + """Get the stream name for the ETOS rabbitmq service.""" + return os.getenv("ETOS_STREAM_NAME", "etos") + + def etos_rabbitmq_publisher_uri(self): + """Get RabbitMQ URI for the ETOS rabbitmq service.""" + data = self.etos_rabbitmq_publisher_data() + if data.get("username") is not None and data.get("password") is None: + netloc = f"{data.get('username')}@{data.get('host')}:{data.get('port')}" + elif data.get("username") is not None and data.get("password") is not None: + netloc = f"{data.get('username')}:{data.get('password')}@{data.get('host')}:{data.get('port')}" + else: + netloc = f"{data.get('host')}:{data.get('port')}" + vhost = quote_plus(data.get("vhost", "/") or "/") + return f"rabbitmq-stream://{netloc}/{vhost}" + def rabbitmq_from_environment(self): """Load RabbitMQ data from environment variables.""" ssl = os.getenv("RABBITMQ_SSL", "true") == "true" diff --git a/src/etos_lib/logging/log_publisher.py b/src/etos_lib/logging/log_publisher.py deleted file mode 100644 index b7089d3..0000000 --- a/src/etos_lib/logging/log_publisher.py +++ /dev/null @@ -1,52 +0,0 @@ -# Copyright Axis Communications AB. -# -# For a full list of individual contributors, please see the commit history. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""ETOS rabbitmq log publisher.""" -import time -import json -import pika - -from eiffellib.publishers import RabbitMQPublisher - - -class RabbitMQLogPublisher(RabbitMQPublisher): - """A RabbitMQ publisher that can send JSON strings instead of Eiffel events.""" - - def send_event(self, event, block=True, routing_key="#"): - """Overload the send_event from the eiffellib rabbitmq publisher to send strings. - - This method differs slightly from its parent in that it takes the routing_key as input. - """ - if block: - self.wait_start() - while self._channel is None or not self._channel.is_open: - time.sleep(0.1) - properties = pika.BasicProperties(content_type="application/json", delivery_mode=2) - if not isinstance(event, str): - event = json.dumps(event) - - with self._lock: - try: - self._channel.basic_publish( - self.exchange, - routing_key, - event, - properties, - ) - except: # pylint:disable=bare-except - self._nacked_deliveries.append(event) - return - self._delivered += 1 - self._deliveries[self._delivered] = event diff --git a/src/etos_lib/logging/logger.py b/src/etos_lib/logging/logger.py index 9d37b46..f9cb0bd 100644 --- a/src/etos_lib/logging/logger.py +++ b/src/etos_lib/logging/logger.py @@ -28,6 +28,7 @@ >>> [2020-12-16 10:35:00][cb7c8cd9-40a6-4ecc-8321-a1eae6beae35] INFO: Hello! """ + import atexit import logging import logging.config @@ -47,8 +48,8 @@ from etos_lib.logging.filter import EtosFilter from etos_lib.logging.formatter import EtosLogFormatter from etos_lib.logging.log_processors import ToStringProcessor -from etos_lib.logging.log_publisher import RabbitMQLogPublisher -from etos_lib.logging.rabbitmq_handler import RabbitMQHandler +from etos_lib.logging.messagebus_handler import MessagebusHandler +from etos_lib.messaging.publisher import Publisher DEFAULT_CONFIG = Path(__file__).parent.joinpath("default_config.yaml") DEFAULT_LOG_PATH = Debug().default_log_path @@ -131,34 +132,33 @@ def setup_stream_logging(config: dict, log_filter: EtosFilter) -> None: root_logger.addHandler(stream_handler) -def setup_rabbitmq_logging(log_filter: EtosFilter) -> None: - """Set up rabbitmq logging. +def setup_internal_messagebus_logging(log_filter: EtosFilter) -> None: + """Set up internal messagebus logging. - :param log_filter: Logfilter to add to stream handler. + :param log_filter: LogFilter to add to stream handler. :type log_filter: :obj:`EtosFilter` """ loglevel = logging.DEBUG root_logger = logging.getLogger() - # These loggers need to be prevented from propagating their logs - # to RabbitMQLogPublisher. If they aren't, this may cause a deadlock. - logging.getLogger("pika").propagate = False - logging.getLogger("eiffellib.publishers.rabbitmq_publisher").propagate = False - logging.getLogger("etos_lib.eiffel.publisher").propagate = False - logging.getLogger("base_rabbitmq").propagate = False - - rabbitmq = RabbitMQLogPublisher(**Config().etos_rabbitmq_publisher_data(), routing_key=None) + publisher = Config().get("internal_publisher") + if publisher is None: + publisher = Publisher( + Config().etos_rabbitmq_publisher_uri(), + Config().etos_stream_name(), + ) + Config().set("internal_publisher", publisher) if Debug().enable_sending_logs: - rabbitmq.start() - atexit.register(close_rabbit, rabbitmq) + publisher.start() + atexit.register(publisher.close) - rabbit_handler = RabbitMQHandler(rabbitmq) - rabbit_handler.setFormatter(EtosLogFormatter()) - rabbit_handler.setLevel(loglevel) - rabbit_handler.addFilter(log_filter) + handler = MessagebusHandler(publisher) + handler.setFormatter(EtosLogFormatter()) + handler.setLevel(loglevel) + handler.addFilter(log_filter) - root_logger.addHandler(rabbit_handler) + root_logger.addHandler(handler) def setup_otel_logging( @@ -223,12 +223,6 @@ def setup_logging( setup_stream_logging(logging_config.get("stream"), log_filter) if logging_config.get("file"): setup_file_logging(logging_config.get("file"), log_filter) - setup_rabbitmq_logging(log_filter) + setup_internal_messagebus_logging(log_filter) if otel_resource: setup_otel_logging(log_filter, otel_resource) - - -def close_rabbit(rabbit: RabbitMQLogPublisher) -> None: - """Close down a rabbitmq connection.""" - rabbit.wait_for_unpublished_events() - rabbit.close() diff --git a/src/etos_lib/logging/rabbitmq_handler.py b/src/etos_lib/logging/messagebus_handler.py similarity index 56% rename from src/etos_lib/logging/rabbitmq_handler.py rename to src/etos_lib/logging/messagebus_handler.py index 0a10dde..3010738 100644 --- a/src/etos_lib/logging/rabbitmq_handler.py +++ b/src/etos_lib/logging/messagebus_handler.py @@ -14,12 +14,15 @@ # See the License for the specific language governing permissions and # limitations under the License. """ETOS rabbitmq handler.""" + import json import logging +from etos_lib.messaging.publisher import Publisher +from etos_lib.messaging.events import Log, Message -class RabbitMQHandler(logging.StreamHandler): - """A RabbitMQ log handler that sends logs tagged with user_log to RabbitMQ. +class MessagebusHandler(logging.StreamHandler): + """A log handler that sends logs tagged with user_log to the internal messagebus. Example:: @@ -35,34 +38,31 @@ class RabbitMQHandler(logging.StreamHandler): closing = False - def __init__(self, rabbitmq): + def __init__(self, publisher: Publisher): """Initialize.""" super().__init__() - self.rabbitmq = rabbitmq + self.publisher = publisher - def emit(self, record): - """Send user log to RabbitMQ, starting the connection if necessary. + def emit(self, record: logging.LogRecord): + """Send user log to messagebus. - The record parameter "user_log" must be set to True if a message shall be - sent to RabbitMQ. + The record parameter "user_log" must be set to True if a message shall be sent. """ if self.closing: return - - try: - send = record.user_log - except AttributeError: - send = False + # This feels volatile, but is necessary to avoid infinite recursion and + # still have access to the actual log prints. + # Maybe we should add this check to the record instead, but that would require + # us to change the logging calls in the code. + if record.name == "etos_lib.messaging.publisher": + return msg = self.format(record) + if not isinstance(msg, dict): + msg = json.loads(msg) try: identifier = record.identifier except AttributeError: - identifier = json.loads(msg).get("identifier", "Unknown") - # An unknown identifier will never be picked up by user log handler - # so it is unnecessary to send it. - routing_key = f"{identifier}.log.{record.levelname}" - if send and self.rabbitmq.is_alive(): - if identifier == "Unknown": - raise ValueError("Trying to send a user log when identifier is not set") - self.rabbitmq.send_event(msg, routing_key=routing_key) + identifier = msg.get("identifier") + if self.publisher.is_alive() and identifier is not None and identifier != "Unknown": + self.publisher.publish(identifier, Message(data=Log(**msg))) diff --git a/src/etos_lib/messaging/__init__.py b/src/etos_lib/messaging/__init__.py new file mode 100644 index 0000000..217dd7a --- /dev/null +++ b/src/etos_lib/messaging/__init__.py @@ -0,0 +1,16 @@ +# Copyright Axis Communications AB. +# +# For a full list of individual contributors, please see the commit history. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""ETOS internal event publisher.""" diff --git a/src/etos_lib/messaging/events.py b/src/etos_lib/messaging/events.py new file mode 100644 index 0000000..118f609 --- /dev/null +++ b/src/etos_lib/messaging/events.py @@ -0,0 +1,126 @@ +# Copyright Axis Communications AB. +# +# For a full list of individual contributors, please see the commit history. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""ETOS internal messaging events.""" + +import inspect +import sys +from typing import Optional, Any +from pydantic import BaseModel, Field +from .types import File, Log, Result + + +class Event(BaseModel): + """Base internal messaging event.""" + + id: Optional[int] = None + event: str = "Unknown" + data: Any + meta: str = "*" + + def __str__(self) -> str: + """Return the string representation of an event.""" + return f"{self.event}({self.id}): {self.data}" + + def __eq__(self, other: "Event") -> bool: # type:ignore + """Check if the event is the same by testing the IDs.""" + if self.id is None or other.id is None: + return super().__eq__(other) + return self.id == other.id + + +class ServerEvent(Event): + """Events to be handled by the client.""" + + +class UserEvent(Event): + """Events to be handled by the user.""" + + +class Ping(ServerEvent): + """A ping event. Sent to keep connection between server and client alive.""" + + event: str = "Ping" + data: Any = None + + +class Error(ServerEvent): + """An error from the messaging server.""" + + event: str = "Error" + data: Any = None + + +class Unknown(UserEvent): + """An unknown event.""" + + event: str = "Unknown" + data: Any = None + + +class Shutdown(UserEvent): + """A shutdown event from ETOS.""" + + event: str = "Shutdown" + data: Result + + def __str__(self) -> str: + """Return the string representation of a shutdown.""" + return ( + f"Result(conclusion={self.data.conclusion}, " + f"verdict={self.data.verdict}, description={self.data.description})" + ) + + +class Message(UserEvent): + """An ETOS user log event.""" + + event: str = "Message" + data: Log + meta: str = Field(default_factory=lambda data: data["data"].level) + + def __str__(self) -> str: + """Return the string representation of a user log.""" + return self.data.message + + +class Report(UserEvent): + """An ETOS test case report file event.""" + + event: str = "Report" + data: File + + def __str__(self) -> str: + """Return the string representation of a file.""" + return f"[{self.data.name}]({self.data.url})" + + +class Artifact(UserEvent): + """An ETOS test case artifact file event.""" + + event: str = "Artifact" + data: File + + def __str__(self) -> str: + """Return the string representation of a file.""" + return f"[{self.data.name}]({self.data.url})" + + +def parse(event: dict) -> Event: + """Parse an event dict and return a corresponding Event class.""" + for name, obj in inspect.getmembers(sys.modules[__name__]): + if event.get("event", "").lower() == name.lower(): + return obj.model_validate(event) + return Unknown(**event) diff --git a/src/etos_lib/messaging/publisher.py b/src/etos_lib/messaging/publisher.py new file mode 100644 index 0000000..0940eba --- /dev/null +++ b/src/etos_lib/messaging/publisher.py @@ -0,0 +1,129 @@ +# Copyright Axis Communications AB. +# +# For a full list of individual contributors, please see the commit history. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""ETOS internal event publisher.""" + +import os +import ctypes +import logging +from pathlib import Path +from .events import Event + + +LIBRARY_PATH = Path(os.getenv("BINDINGS", Path(__file__).parent)).joinpath("stream/client.so") + + +class Publisher: + """Internal messaging publisher for ETOS. + + This publisher is running in a shared C library created from + Go because the Python library that we have access to is not + fast enough and we could not create reliable connections using + it. + + start() - Start up the publisher + publish() - Publish an event + stop() - Stop the publisher and wait for outstanding events. + + Example usage: + + # Make sure that the ETOS_RABBITMQ_* environment variables are set. + from etos_lib.etos import ETOS as ETOS_LIBRARY + from etos_lib.messaging.protocol import Shutdown, Result + ETOS = ETOS_LIBRARY(...) + with ETOS.messagebus_publisher() as PUBLISHER: + PUBLISHER.publish("identifier", Shutdown(data=Result( + conclusion="SUCCESSFUL", + verdict="PASSED", + description="Hello world", + ))) + + Example usage with logger: + + # Make sure that the ETOS_RABBITMQ_* environment variables are set. + import logging + from etos_lib.logging.logger import setup_logging, FORMAT_CONFIG + setup_logging("name", "version", "environment", None) + FORMAT_CONFIG.identifier = "identifier" + LOGGER = logging.getLogger(__name__) + LOGGER.info("Hello world", extra={"user_log": True}) + """ + + __connected = False + logger = logging.getLogger(__name__) + + def __init__(self, connection_string: str, stream_name: str): + """Initialize the Publisher object.""" + self.stream_name = stream_name + self.__connection_string = connection_string + self.__library = ctypes.cdll.LoadLibrary(str(LIBRARY_PATH)) + self.__connect = self.__library.Connect + self.__connect.argtypes = [ + ctypes.c_char_p, # connectionString + ctypes.c_char_p, # streamName + ] + self.__publish = self.__library.Publish + self.__publish.argtypes = [ + ctypes.c_char_p, # event + ctypes.c_char_p, # identifier + ctypes.c_char_p, # eventType + ctypes.c_char_p, # meta + ] + self.__close = self.__library.Close + + def __enter__(self): + """Connect to the server.""" + self.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Close the connection to the server.""" + self.close() + + def start(self): + """Start the connection to the server.""" + if not self.__connected: + success = self.__connect( + self.__connection_string.encode("utf-8"), + self.stream_name.encode("utf-8"), + ) + if not success: + raise Exception("bruh") + self.__connected = True + + def publish(self, testrun_id: str, event: Event): + """Publish an event to the internal messagebus.""" + assert self.__connected, "Not connected to the server" + self.logger.debug( + "Publishing event '%s' to id %r", + event, + testrun_id, + ) + return self.__publish( + event.model_dump_json().encode("utf-8"), + testrun_id.encode("utf-8"), + event.event.lower().encode("utf-8"), + event.meta.encode("utf-8"), + ) + + def is_alive(self): + """Check if the connection is alive.""" + return self.__connected + + def close(self): + """Close the connection to the server.""" + if self.__connected: + self.__close() + self.__connected = False diff --git a/src/etos_lib/messaging/types.py b/src/etos_lib/messaging/types.py new file mode 100644 index 0000000..068de6c --- /dev/null +++ b/src/etos_lib/messaging/types.py @@ -0,0 +1,48 @@ +# Copyright Axis Communications AB. +# +# For a full list of individual contributors, please see the commit history. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Types used by events but are not events themselves.""" +from typing_extensions import Annotated +from pydantic import BaseModel, Field, StringConstraints + + +class File(BaseModel): + """An ETOS file event.""" + + url: str + name: str + checksums: dict = {} + + +class Log(BaseModel): + """An ETOS log.""" + + message: str + level: Annotated[str, StringConstraints(to_lower=True)] = Field("info", alias="levelname") + name: str + # The datestring field is, by default, generated as '@timestamp' but since + # that is illegal in python we convert the name over to 'datestring'. Using + # an aliased Field. + # The '@timestamp' key is necessary for logstash, which we support, so we + # cannot update the formatter that creates the '@timestamp' key. + datestring: str = Field("datestring", alias="@timestamp") + + +class Result(BaseModel): + """Shutdown result.""" + + conclusion: str + verdict: str + description: str = "" From 5c325853d5e014c56d9a8e7f5f24c74965b6e745 Mon Sep 17 00:00:00 2001 From: Tobias Persson Date: Tue, 4 Mar 2025 08:56:32 +0100 Subject: [PATCH 02/11] Clean up the Go code a lot With more knowledge comes cleaner code. Instead of managing the state in global variables we now pass a pointer to a handler back to the Python code. The Python code is expected to pass this pointer back when it calls subsequent methods. --- pkg/bindings/publisher.go | 130 ++++++++++++++++++---------- src/etos_lib/messaging/publisher.py | 19 +++- 2 files changed, 100 insertions(+), 49 deletions(-) diff --git a/pkg/bindings/publisher.go b/pkg/bindings/publisher.go index 724feb5..96255a0 100644 --- a/pkg/bindings/publisher.go +++ b/pkg/bindings/publisher.go @@ -15,27 +15,70 @@ // limitations under the License. package bindings +/* +#include +#include +*/ +import "C" + import ( - "C" "log/slog" "os" + "runtime/cgo" "github.com/eiffel-community/etos-library/pkg/stream" ) -var ( - Streamer stream.Streamer - Publisher stream.Publisher - Consumer stream.Consumer -) -var logger = slog.New(slog.NewTextHandler(os.Stdout, nil)) +// StreamHandler is a struct that holds the logger, connection string, stream name, streamer, +// publisher, and consumer. +// It is used to create a new publisher and publish messages to a stream from python. +// For using the streamers in Go code please refer to pkg/stream/stream.go. +// +// For usage examples view the python code in src/etos_lib/messaging/publisher.py. +type StreamHandler struct { + logger *slog.Logger + connectionString string + streamName string + streamer stream.Streamer + publisher stream.Publisher + consumer stream.Consumer +} + +// New creates a new publisher and returns a pointer to it. +// +//export New +func New(connectionString, streamName *C.char) (C.uintptr_t, bool) { + logger := newLogger() + addresses := []string{C.GoString(connectionString)} + streamer, err := stream.NewRabbitMQStreamer(C.GoString(streamName), addresses, logger) + if err != nil { + return C.uintptr_t(0), false + } + return C.uintptr_t(cgo.NewHandle(&StreamHandler{ + logger: logger, + streamer: streamer, + })), true +} -// Connect to the stream. +// Publisher creates a new publisher and starts it. // -//export Connect -func Connect(connectionString, streamName *C.char) bool { - if err := setupPublisher(C.GoString(connectionString), C.GoString(streamName)); err != nil { - logger.Error("Failed to setup publisher", slog.Any("Error", err)) +//export Publisher +func Publisher(p C.uintptr_t, name *C.char) bool { + h := cgo.Handle(p) + handler, ok := h.Value().(*StreamHandler) + if !ok { + // Creating a new logger here since if handler is nil, we don't have access to one. + newLogger().Error("Failed to get stream handler from memory") + return false + } + publisher, err := handler.streamer.Publisher(C.GoString(name)) + if err != nil { + handler.logger.Error("Failed to create publisher", slog.Any("Error", err)) + return false + } + handler.publisher = publisher + if err = publisher.Start(); err != nil { + handler.logger.Error("Failed to start publisher", slog.Any("Error", err)) return false } return true @@ -44,55 +87,50 @@ func Connect(connectionString, streamName *C.char) bool { // Publish a message to the stream. // //export Publish -func Publish(event, identifier, eventType, meta *C.char) bool { +func Publish(p C.uintptr_t, event, identifier, eventType, meta *C.char) bool { + h := cgo.Handle(p) + handler, ok := h.Value().(*StreamHandler) + if !ok { + // Creating a new logger here since if handler is nil, we don't have access to one. + newLogger().Error("Failed to get stream handler from memory") + return false + } message := C.GoString(event) filter := stream.Filter{ Identifier: C.GoString(identifier), Type: C.GoString(eventType), Meta: C.GoString(meta), } - Publisher.Publish([]byte(message), filter) + handler.publisher.Publish([]byte(message), filter) return true } -// Close any active publisher, streamer and consumer. +// Close closes the publisher and the streamer. // //export Close -func Close() { - if Publisher != nil { - Publisher.Close() - } - if Streamer != nil { - Streamer.Close() - } - if Consumer != nil { - Consumer.Close() +func Close(p C.uintptr_t) bool { + h := cgo.Handle(p) + // Delete the handle here, since it shall not be reused after close. + defer h.Delete() + handler, ok := h.Value().(*StreamHandler) + if !ok { + // Creating a new logger here since if handler is nil, we don't have access to one. + newLogger().Error("Failed to get stream handler from memory") + return false } -} - -// setupPublisher sets up a publisher for the stream. -func setupPublisher(connectionString, streamName string) error { - var err error - if Streamer == nil { - if err = setupStreamer(connectionString, streamName); err != nil { - return err - } + if handler.publisher != nil { + handler.publisher.Close() } - // TODO: This name - Publisher, err = Streamer.Publisher("publisher-1") - if err != nil { - return err + if handler.consumer != nil { + handler.consumer.Close() } - if err = Publisher.Start(); err != nil { - return err + if handler.streamer != nil { + handler.streamer.Close() } - return nil + return true } -// setupStreamer sets up a streamer for the stream. -func setupStreamer(connectionString, streamName string) error { - var err error - addresses := []string{connectionString} - Streamer, err = stream.NewRabbitMQStreamer(streamName, addresses, logger) - return err +// newLogger creates a new logger. +func newLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(os.Stdout, nil)) } diff --git a/src/etos_lib/messaging/publisher.py b/src/etos_lib/messaging/publisher.py index 0940eba..80423a3 100644 --- a/src/etos_lib/messaging/publisher.py +++ b/src/etos_lib/messaging/publisher.py @@ -68,20 +68,31 @@ def __init__(self, connection_string: str, stream_name: str): """Initialize the Publisher object.""" self.stream_name = stream_name self.__connection_string = connection_string - self.__library = ctypes.cdll.LoadLibrary(str(LIBRARY_PATH)) - self.__connect = self.__library.Connect + self.__setup_bindings(LIBRARY_PATH) + + def __setup_bindings(self, library_path: Path): + """Setup the bindings for the Publisher.""" + self.__library = ctypes.cdll.LoadLibrary(str(library_path)) + self.__handler = self.__library.New( + self.__connection_string.encode("utf-8"), + self.stream_name.encode("utf-8"), + ) + self.__connect = self.__library.Publisher self.__connect.argtypes = [ + ctypes.c_int, # pointer to stream handler ctypes.c_char_p, # connectionString ctypes.c_char_p, # streamName ] self.__publish = self.__library.Publish self.__publish.argtypes = [ + ctypes.c_int, # pointer to stream handler ctypes.c_char_p, # event ctypes.c_char_p, # identifier ctypes.c_char_p, # eventType ctypes.c_char_p, # meta ] self.__close = self.__library.Close + self.__close.argtypes = [ctypes.c_int] # pointer to stream handler def __enter__(self): """Connect to the server.""" @@ -96,6 +107,7 @@ def start(self): """Start the connection to the server.""" if not self.__connected: success = self.__connect( + self.__handler, self.__connection_string.encode("utf-8"), self.stream_name.encode("utf-8"), ) @@ -112,6 +124,7 @@ def publish(self, testrun_id: str, event: Event): testrun_id, ) return self.__publish( + self.__handler, event.model_dump_json().encode("utf-8"), testrun_id.encode("utf-8"), event.event.lower().encode("utf-8"), @@ -125,5 +138,5 @@ def is_alive(self): def close(self): """Close the connection to the server.""" if self.__connected: - self.__close() + self.__close(self.__handler) self.__connected = False From bb15fc65569bbeb64b6915d47d5bdc65dedae3f2 Mon Sep 17 00:00:00 2001 From: Tobias Persson Date: Tue, 4 Mar 2025 12:28:31 +0100 Subject: [PATCH 03/11] Make sure we can publish to both v2alpha and v1 Added the code back for the v1 protocol. Created a common interface that all publisher clients must adhere to. Also added the ability for the etos library to publish events on the v1 protocol. --- src/etos_lib/etos.py | 22 ++- src/etos_lib/logging/default_config.yaml | 2 + src/etos_lib/logging/logger.py | 37 ++++- src/etos_lib/logging/messagebus_handler.py | 19 ++- src/etos_lib/messaging/publisher.py | 124 ++-------------- src/etos_lib/messaging/v1/__init__.py | 16 +++ src/etos_lib/messaging/v1/publisher.py | 63 ++++++++ src/etos_lib/messaging/v2alpha/__init__.py | 16 +++ src/etos_lib/messaging/v2alpha/publisher.py | 150 ++++++++++++++++++++ 9 files changed, 316 insertions(+), 133 deletions(-) create mode 100644 src/etos_lib/messaging/v1/__init__.py create mode 100644 src/etos_lib/messaging/v1/publisher.py create mode 100644 src/etos_lib/messaging/v2alpha/__init__.py create mode 100644 src/etos_lib/messaging/v2alpha/publisher.py diff --git a/src/etos_lib/etos.py b/src/etos_lib/etos.py index 9c2d04c..38ca627 100644 --- a/src/etos_lib/etos.py +++ b/src/etos_lib/etos.py @@ -17,6 +17,8 @@ import time from etos_lib.messaging.publisher import Publisher +from etos_lib.messaging.v1.publisher import Publisher as V1Publisher +from etos_lib.messaging.v2alpha.publisher import Publisher as V2alphaPublisher from .eiffel.publisher import TracingRabbitMQPublisher as RabbitMQPublisher from .eiffel.subscriber import TracingRabbitMQSubscriber as RabbitMQSubscriber from .graphql.query_handler import GraphQLQueryHandler @@ -81,17 +83,25 @@ def start_subscriber(self): self.subscriber.start() self.config.set("subscriber", self.subscriber) - def messagebus_publisher(self) -> Publisher: - """Start the internal messagebus publisher using config data from ETOS library.""" + def messagebus_publisher(self, version: str = "v1") -> Publisher: + """Start the internal messagebus publisher using config data from ETOS library. + + :param version: Version of the messagebus protocol to use. + """ publisher = self.config.get("internal_publisher") if publisher is None: connection_parameters = self.config.etos_rabbitmq_publisher_data() if not connection_parameters: raise PublisherConfigurationMissing - publisher = Publisher( - self.config.etos_rabbitmq_publisher_uri(), - self.config.etos_stream_name(), - ) + if version == "v1": + publisher = V1Publisher(**connection_parameters) + elif version == "v2alpha": + publisher = V2alphaPublisher( + self.config.etos_rabbitmq_publisher_uri(), + self.config.etos_stream_name(), + ) + else: + raise ValueError(f"Unknown version {version!r} of messagebus") if not self.debug.disable_sending_events: publisher.start() # Wait for start. diff --git a/src/etos_lib/logging/default_config.yaml b/src/etos_lib/logging/default_config.yaml index a13e79e..b3b46a3 100644 --- a/src/etos_lib/logging/default_config.yaml +++ b/src/etos_lib/logging/default_config.yaml @@ -3,3 +3,5 @@ logging: loglevel: INFO file: loglevel: DEBUG + messagebus: + version: v1 diff --git a/src/etos_lib/logging/logger.py b/src/etos_lib/logging/logger.py index f9cb0bd..a7057fd 100644 --- a/src/etos_lib/logging/logger.py +++ b/src/etos_lib/logging/logger.py @@ -50,6 +50,8 @@ from etos_lib.logging.log_processors import ToStringProcessor from etos_lib.logging.messagebus_handler import MessagebusHandler from etos_lib.messaging.publisher import Publisher +from etos_lib.messaging.v1.publisher import Publisher as V1Publisher +from etos_lib.messaging.v2alpha.publisher import Publisher as V2alphaPublisher DEFAULT_CONFIG = Path(__file__).parent.joinpath("default_config.yaml") DEFAULT_LOG_PATH = Debug().default_log_path @@ -132,9 +134,11 @@ def setup_stream_logging(config: dict, log_filter: EtosFilter) -> None: root_logger.addHandler(stream_handler) -def setup_internal_messagebus_logging(log_filter: EtosFilter) -> None: +def setup_internal_messagebus_logging(config: dict, log_filter: EtosFilter) -> None: """Set up internal messagebus logging. + :param config: Internal logging configuration. + :type config: dict :param log_filter: LogFilter to add to stream handler. :type log_filter: :obj:`EtosFilter` """ @@ -143,15 +147,27 @@ def setup_internal_messagebus_logging(log_filter: EtosFilter) -> None: root_logger = logging.getLogger() publisher = Config().get("internal_publisher") + version = config.get("version", "v1") if publisher is None: - publisher = Publisher( - Config().etos_rabbitmq_publisher_uri(), - Config().etos_stream_name(), - ) + if version == "v1": + # These loggers need to be prevented from propagating their logs + # to V1Publisher. If they aren't, this may cause a deadlock. + logging.getLogger("pika").propagate = False + logging.getLogger("eiffellib.publishers.rabbitmq_publisher").propagate = False + logging.getLogger("etos_lib.eiffel.publisher").propagate = False + logging.getLogger("base_rabbitmq").propagate = False + publisher = V1Publisher(**Config().etos_rabbitmq_publisher_data()) + elif version == "v2alpha": + publisher = V2alphaPublisher( + Config().etos_rabbitmq_publisher_uri(), + Config().etos_stream_name(), + ) + else: + raise Exception("Unknown version of messagebus") Config().set("internal_publisher", publisher) if Debug().enable_sending_logs: publisher.start() - atexit.register(publisher.close) + atexit.register(close_rabbit, publisher) handler = MessagebusHandler(publisher) handler.setFormatter(EtosLogFormatter()) @@ -223,6 +239,13 @@ def setup_logging( setup_stream_logging(logging_config.get("stream"), log_filter) if logging_config.get("file"): setup_file_logging(logging_config.get("file"), log_filter) - setup_internal_messagebus_logging(log_filter) + setup_internal_messagebus_logging(logging_config.get("messagebus"), log_filter) if otel_resource: setup_otel_logging(log_filter, otel_resource) + + +def close_rabbit(rabbit: Publisher) -> None: + """Close down a rabbitmq connection.""" + if isinstance(rabbit, V1Publisher): + rabbit.wait_for_unpublished_events() + rabbit.close() diff --git a/src/etos_lib/logging/messagebus_handler.py b/src/etos_lib/logging/messagebus_handler.py index 3010738..0fd82d0 100644 --- a/src/etos_lib/logging/messagebus_handler.py +++ b/src/etos_lib/logging/messagebus_handler.py @@ -18,6 +18,7 @@ import json import logging from etos_lib.messaging.publisher import Publisher +from etos_lib.messaging.v2alpha.publisher import Publisher as V2alphaPublisher from etos_lib.messaging.events import Log, Message @@ -50,19 +51,25 @@ def emit(self, record: logging.LogRecord): """ if self.closing: return - # This feels volatile, but is necessary to avoid infinite recursion and - # still have access to the actual log prints. - # Maybe we should add this check to the record instead, but that would require - # us to change the logging calls in the code. - if record.name == "etos_lib.messaging.publisher": + try: + send = record.user_log + except AttributeError: + # If it's the v2alpha protocol, always set to True unless explicitly + # disabled with `user_log=False`. + send = isinstance(self.publisher, V2alphaPublisher) + if not send: return msg = self.format(record) if not isinstance(msg, dict): msg = json.loads(msg) + try: identifier = record.identifier except AttributeError: identifier = msg.get("identifier") - if self.publisher.is_alive() and identifier is not None and identifier != "Unknown": + if identifier is None or identifier == "Unknown": + return + + if self.publisher.is_alive(): self.publisher.publish(identifier, Message(data=Log(**msg))) diff --git a/src/etos_lib/messaging/publisher.py b/src/etos_lib/messaging/publisher.py index 80423a3..34a14a8 100644 --- a/src/etos_lib/messaging/publisher.py +++ b/src/etos_lib/messaging/publisher.py @@ -13,130 +13,26 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""ETOS internal event publisher.""" +"""ETOS publisher interface.""" -import os -import ctypes -import logging -from pathlib import Path from .events import Event -LIBRARY_PATH = Path(os.getenv("BINDINGS", Path(__file__).parent)).joinpath("stream/client.so") - - class Publisher: - """Internal messaging publisher for ETOS. - - This publisher is running in a shared C library created from - Go because the Python library that we have access to is not - fast enough and we could not create reliable connections using - it. - - start() - Start up the publisher - publish() - Publish an event - stop() - Stop the publisher and wait for outstanding events. - - Example usage: - - # Make sure that the ETOS_RABBITMQ_* environment variables are set. - from etos_lib.etos import ETOS as ETOS_LIBRARY - from etos_lib.messaging.protocol import Shutdown, Result - ETOS = ETOS_LIBRARY(...) - with ETOS.messagebus_publisher() as PUBLISHER: - PUBLISHER.publish("identifier", Shutdown(data=Result( - conclusion="SUCCESSFUL", - verdict="PASSED", - description="Hello world", - ))) - - Example usage with logger: - - # Make sure that the ETOS_RABBITMQ_* environment variables are set. - import logging - from etos_lib.logging.logger import setup_logging, FORMAT_CONFIG - setup_logging("name", "version", "environment", None) - FORMAT_CONFIG.identifier = "identifier" - LOGGER = logging.getLogger(__name__) - LOGGER.info("Hello world", extra={"user_log": True}) - """ - - __connected = False - logger = logging.getLogger(__name__) - - def __init__(self, connection_string: str, stream_name: str): - """Initialize the Publisher object.""" - self.stream_name = stream_name - self.__connection_string = connection_string - self.__setup_bindings(LIBRARY_PATH) - - def __setup_bindings(self, library_path: Path): - """Setup the bindings for the Publisher.""" - self.__library = ctypes.cdll.LoadLibrary(str(library_path)) - self.__handler = self.__library.New( - self.__connection_string.encode("utf-8"), - self.stream_name.encode("utf-8"), - ) - self.__connect = self.__library.Publisher - self.__connect.argtypes = [ - ctypes.c_int, # pointer to stream handler - ctypes.c_char_p, # connectionString - ctypes.c_char_p, # streamName - ] - self.__publish = self.__library.Publish - self.__publish.argtypes = [ - ctypes.c_int, # pointer to stream handler - ctypes.c_char_p, # event - ctypes.c_char_p, # identifier - ctypes.c_char_p, # eventType - ctypes.c_char_p, # meta - ] - self.__close = self.__library.Close - self.__close.argtypes = [ctypes.c_int] # pointer to stream handler + """Publisher interface for ETOS.""" - def __enter__(self): - """Connect to the server.""" - self.start() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """Close the connection to the server.""" - self.close() + def publish(self, testrun_id: str, event: Event): + """Publish an event to the internal messagebus.""" + raise NotImplementedError def start(self): """Start the connection to the server.""" - if not self.__connected: - success = self.__connect( - self.__handler, - self.__connection_string.encode("utf-8"), - self.stream_name.encode("utf-8"), - ) - if not success: - raise Exception("bruh") - self.__connected = True + raise NotImplementedError - def publish(self, testrun_id: str, event: Event): - """Publish an event to the internal messagebus.""" - assert self.__connected, "Not connected to the server" - self.logger.debug( - "Publishing event '%s' to id %r", - event, - testrun_id, - ) - return self.__publish( - self.__handler, - event.model_dump_json().encode("utf-8"), - testrun_id.encode("utf-8"), - event.event.lower().encode("utf-8"), - event.meta.encode("utf-8"), - ) + def close(self): + """Close the connection to the server.""" + raise NotImplementedError def is_alive(self): """Check if the connection is alive.""" - return self.__connected - - def close(self): - """Close the connection to the server.""" - if self.__connected: - self.__close(self.__handler) - self.__connected = False + raise NotImplementedError diff --git a/src/etos_lib/messaging/v1/__init__.py b/src/etos_lib/messaging/v1/__init__.py new file mode 100644 index 0000000..217dd7a --- /dev/null +++ b/src/etos_lib/messaging/v1/__init__.py @@ -0,0 +1,16 @@ +# Copyright Axis Communications AB. +# +# For a full list of individual contributors, please see the commit history. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""ETOS internal event publisher.""" diff --git a/src/etos_lib/messaging/v1/publisher.py b/src/etos_lib/messaging/v1/publisher.py new file mode 100644 index 0000000..6fa7f72 --- /dev/null +++ b/src/etos_lib/messaging/v1/publisher.py @@ -0,0 +1,63 @@ +# Copyright Axis Communications AB. +# +# For a full list of individual contributors, please see the commit history. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""ETOS rabbitmq log publisher.""" + +import time +import json +import pika + +from eiffellib.publishers import RabbitMQPublisher + +from ..events import Event +from ..publisher import Publisher as PublisherInterface + + +class Publisher(RabbitMQPublisher, PublisherInterface): + """A RabbitMQ publisher that can send JSON strings instead of Eiffel events.""" + + def publish(self, testrun_id, event: Event): + """Publish an event to the internal messagebus.""" + self.send_event( + event=event.model_dump_json(), + routing_key=f"{testrun_id}.{event.event.lower()}.{event.meta}", + ) + + def send_event(self, event, block=True, routing_key="#"): + """Overload the send_event from the eiffellib rabbitmq publisher to send strings. + + This method differs slightly from its parent in that it takes the routing_key as input. + """ + if block: + self.wait_start() + while self._channel is None or not self._channel.is_open: + time.sleep(0.1) + properties = pika.BasicProperties(content_type="application/json", delivery_mode=2) + if not isinstance(event, str): + event = json.dumps(event) + + with self._lock: + try: + self._channel.basic_publish( + self.exchange, + routing_key, + event, + properties, + ) + except: # pylint:disable=bare-except + self._nacked_deliveries.append(event) + return + self._delivered += 1 + self._deliveries[self._delivered] = event diff --git a/src/etos_lib/messaging/v2alpha/__init__.py b/src/etos_lib/messaging/v2alpha/__init__.py new file mode 100644 index 0000000..217dd7a --- /dev/null +++ b/src/etos_lib/messaging/v2alpha/__init__.py @@ -0,0 +1,16 @@ +# Copyright Axis Communications AB. +# +# For a full list of individual contributors, please see the commit history. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""ETOS internal event publisher.""" diff --git a/src/etos_lib/messaging/v2alpha/publisher.py b/src/etos_lib/messaging/v2alpha/publisher.py new file mode 100644 index 0000000..b533730 --- /dev/null +++ b/src/etos_lib/messaging/v2alpha/publisher.py @@ -0,0 +1,150 @@ +# Copyright Axis Communications AB. +# +# For a full list of individual contributors, please see the commit history. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""ETOS internal event publisher.""" + +import os +import ctypes +import logging +from pathlib import Path +from ..events import Event +from ..publisher import Publisher as PublisherInterface + + +LIBRARY_PATH = Path(os.getenv("BINDINGS", Path(__file__).parent)).joinpath("stream/client.so") + + +class Publisher(PublisherInterface): + """Internal messaging publisher for ETOS. + + This publisher is running in a shared C library created from + Go because the Python library that we have access to is not + fast enough and we could not create reliable connections using + it. + + start() - Start up the publisher + publish() - Publish an event + close() - Stop the publisher and wait for outstanding events. + is_alive() - Check if publisher is alive. + + Example usage: + + # Make sure that the ETOS_RABBITMQ_* and ETOS_STREAM_NAME environment variables are set. + from etos_lib.etos import ETOS as ETOS_LIBRARY + from etos_lib.messaging.protocol import Shutdown, Result + ETOS = ETOS_LIBRARY(...) + with ETOS.messagebus_publisher() as PUBLISHER: + PUBLISHER.publish("identifier", Shutdown(data=Result( + conclusion="SUCCESSFUL", + verdict="PASSED", + description="Hello world", + ))) + + Example usage with logger: + + # Make sure that the ETOS_RABBITMQ_* and ETOS_STREAM_NAME environment variables are set. + import logging + from etos_lib.logging.logger import setup_logging, FORMAT_CONFIG + setup_logging("name", "version", "environment", None) + FORMAT_CONFIG.identifier = "identifier" + LOGGER = logging.getLogger(__name__) + LOGGER.info("Hello world", extra={"user_log": True}) + """ + + __connected = False + logger = logging.getLogger(__name__) + + def __init__(self, connection_string: str, stream_name: str): + """Initialize the Publisher object.""" + self.stream_name = stream_name + self.__connection_string = connection_string + self.__setup_bindings(LIBRARY_PATH) + + def __setup_bindings(self, library_path: Path): + """Setup the bindings for the Publisher.""" + self.__library = ctypes.cdll.LoadLibrary(str(library_path)) + self.__handler = self.__library.New( + self.__connection_string.encode("utf-8"), + self.stream_name.encode("utf-8"), + ) + self.__connect = self.__library.Publisher + self.__connect.argtypes = [ + ctypes.c_int, # pointer to stream handler + ctypes.c_char_p, # connectionString + ctypes.c_char_p, # streamName + ] + self.__publish = self.__library.Publish + self.__publish.argtypes = [ + ctypes.c_int, # pointer to stream handler + ctypes.c_char_p, # event + ctypes.c_char_p, # identifier + ctypes.c_char_p, # eventType + ctypes.c_char_p, # meta + ] + self.__close = self.__library.Close + self.__close.argtypes = [ctypes.c_int] # pointer to stream handler + + def __enter__(self): + """Connect to the server.""" + self.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Close the connection to the server.""" + self.close() + + def __del__(self): + """Close the connection to the server.""" + self.close() + + def start(self): + """Start the connection to the server.""" + if not self.__connected: + success = self.__connect( + self.__handler, + self.__connection_string.encode("utf-8"), + self.stream_name.encode("utf-8"), + ) + if not success: + raise Exception("Failed to connect to stream") + self.__connected = True + + def publish(self, testrun_id: str, event: Event): + """Publish an event to the internal messagebus.""" + assert self.__connected, "Not connected to the server" + self.logger.debug( + "Publishing event '%s' to id %r", + event, + testrun_id, + extra={"user_log": False}, + ) + return self.__publish( + self.__handler, + event.model_dump_json().encode("utf-8"), + testrun_id.encode("utf-8"), + event.event.lower().encode("utf-8"), + event.meta.encode("utf-8"), + ) + + def is_alive(self): + """Check if the connection is alive.""" + return self.__connected + + def close(self): + """Close the connection to the server.""" + if self.__connected: + self.__close(self.__handler) + self.__connected = False + self.__handler = None From 75fb57629a1ddb089d5c22305c775967e04ab56d Mon Sep 17 00:00:00 2001 From: Tobias Persson Date: Tue, 4 Mar 2025 12:30:53 +0100 Subject: [PATCH 04/11] Rstream is unused, remove it --- pyproject.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 587f3f3..f0b57a3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,7 +22,6 @@ dependencies = [ "kubernetes~=26.1", "pydantic~=2.10", "pyyaml~=6.0", - "rstream~=0.20", "opentelemetry-api~=1.21", "opentelemetry-sdk~=1.21", "opentelemetry-exporter-otlp~=1.21", From 77e96f96e5a70cdb31a9321855ed35af59ebc281 Mon Sep 17 00:00:00 2001 From: Tobias Persson Date: Wed, 5 Mar 2025 14:53:49 +0100 Subject: [PATCH 05/11] Get port from another environment variable --- src/etos_lib/lib/config.py | 8 +++++--- src/etos_lib/messaging/v2alpha/publisher.py | 6 +++++- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/etos_lib/lib/config.py b/src/etos_lib/lib/config.py index 1be7d5a..633c11f 100644 --- a/src/etos_lib/lib/config.py +++ b/src/etos_lib/lib/config.py @@ -101,12 +101,14 @@ def etos_stream_name(self): def etos_rabbitmq_publisher_uri(self): """Get RabbitMQ URI for the ETOS rabbitmq service.""" data = self.etos_rabbitmq_publisher_data() + port = os.getenv("ETOS_RABBITMQ_STREAM_PORT", os.getenv("ETOS_RABBITMQ_PORT", "5672")) + if data.get("username") is not None and data.get("password") is None: - netloc = f"{data.get('username')}@{data.get('host')}:{data.get('port')}" + netloc = f"{data.get('username')}@{data.get('host')}:{port}" elif data.get("username") is not None and data.get("password") is not None: - netloc = f"{data.get('username')}:{data.get('password')}@{data.get('host')}:{data.get('port')}" + netloc = f"{data.get('username')}:{data.get('password')}@{data.get('host')}:{port}" else: - netloc = f"{data.get('host')}:{data.get('port')}" + netloc = f"{data.get('host')}:{port}" vhost = quote_plus(data.get("vhost", "/") or "/") return f"rabbitmq-stream://{netloc}/{vhost}" diff --git a/src/etos_lib/messaging/v2alpha/publisher.py b/src/etos_lib/messaging/v2alpha/publisher.py index b533730..766e4ea 100644 --- a/src/etos_lib/messaging/v2alpha/publisher.py +++ b/src/etos_lib/messaging/v2alpha/publisher.py @@ -23,7 +23,11 @@ from ..publisher import Publisher as PublisherInterface -LIBRARY_PATH = Path(os.getenv("BINDINGS", Path(__file__).parent)).joinpath("stream/client.so") +# This package: etos_lib/messaging/v2alpha/publisher.py +# Root directory: etos_lib/ +# Path: ../../ +ROOT = Path(__file__).parent.parent.parent.joinpath("bindings") +LIBRARY_PATH = Path(os.getenv("BINDINGS", ROOT)).joinpath("stream/client.so") class Publisher(PublisherInterface): From d79c8b79e730c99b4981bdcd493ad7c5648885b3 Mon Sep 17 00:00:00 2001 From: Tobias Persson Date: Thu, 6 Mar 2025 15:38:14 +0100 Subject: [PATCH 06/11] Fix a few problems Pass connection string as string instead if []string Add directory to reports and artifacts Make datestring a datetime object instead of string --- .gitignore | 1 + MANIFEST.in | 5 +++++ pkg/bindings/publisher.go | 3 +-- pkg/stream/rabbitmq.go | 4 ++-- src/etos_lib/messaging/types.py | 10 ++++++++-- 5 files changed, 17 insertions(+), 6 deletions(-) create mode 100644 MANIFEST.in diff --git a/.gitignore b/.gitignore index 279c72e..573a579 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,7 @@ __pycache__/* */.ipynb_checkpoints/* .DS_Store .tags +bin/ # Project files .ropeproject diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..2bd2f85 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,5 @@ +prune build +prune docs/_build +prune docs/api +graft src/etos_lib/bindings/ +global-exclude *.pyc *.o diff --git a/pkg/bindings/publisher.go b/pkg/bindings/publisher.go index 96255a0..e908da1 100644 --- a/pkg/bindings/publisher.go +++ b/pkg/bindings/publisher.go @@ -49,8 +49,7 @@ type StreamHandler struct { //export New func New(connectionString, streamName *C.char) (C.uintptr_t, bool) { logger := newLogger() - addresses := []string{C.GoString(connectionString)} - streamer, err := stream.NewRabbitMQStreamer(C.GoString(streamName), addresses, logger) + streamer, err := stream.NewRabbitMQStreamer(C.GoString(streamName), C.GoString(connectionString), logger) if err != nil { return C.uintptr_t(0), false } diff --git a/pkg/stream/rabbitmq.go b/pkg/stream/rabbitmq.go index d84f92a..8ff26d9 100644 --- a/pkg/stream/rabbitmq.go +++ b/pkg/stream/rabbitmq.go @@ -48,10 +48,10 @@ type RabbitMQStreamer struct { // NewRabbitMQStreamer creates a new RabbitMQ streamer. Only a single connection should be created. func NewRabbitMQStreamer( streamName string, - addresses []string, + address string, logger *slog.Logger, ) (Streamer, error) { - options := stream.NewEnvironmentOptions().SetMaxProducersPerClient(1).SetUris(addresses) + options := stream.NewEnvironmentOptions().SetMaxProducersPerClient(1).SetUri(address) env, err := stream.NewEnvironment(options) if err != nil { return nil, err diff --git a/src/etos_lib/messaging/types.py b/src/etos_lib/messaging/types.py index 068de6c..39a3d17 100644 --- a/src/etos_lib/messaging/types.py +++ b/src/etos_lib/messaging/types.py @@ -14,8 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. """Types used by events but are not events themselves.""" + +from datetime import datetime +from typing import Optional from typing_extensions import Annotated -from pydantic import BaseModel, Field, StringConstraints +from pydantic import AliasChoices, BaseModel, Field, StringConstraints class File(BaseModel): @@ -23,6 +26,7 @@ class File(BaseModel): url: str name: str + directory: Optional[str] = None checksums: dict = {} @@ -37,7 +41,9 @@ class Log(BaseModel): # an aliased Field. # The '@timestamp' key is necessary for logstash, which we support, so we # cannot update the formatter that creates the '@timestamp' key. - datestring: str = Field("datestring", alias="@timestamp") + datestring: datetime = Field( + serialization_alias="datestring", validation_alias=AliasChoices("@timestamp", "datestring") + ) class Result(BaseModel): From c2f629a81216d78fa0a4253cd150abbd82b1ad59 Mon Sep 17 00:00:00 2001 From: Tobias Persson Date: Thu, 6 Mar 2025 15:44:50 +0100 Subject: [PATCH 07/11] Make staticchecks happy --- src/etos_lib/logging/logger.py | 2 +- src/etos_lib/messaging/v2alpha/publisher.py | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/etos_lib/logging/logger.py b/src/etos_lib/logging/logger.py index a7057fd..cce9869 100644 --- a/src/etos_lib/logging/logger.py +++ b/src/etos_lib/logging/logger.py @@ -163,7 +163,7 @@ def setup_internal_messagebus_logging(config: dict, log_filter: EtosFilter) -> N Config().etos_stream_name(), ) else: - raise Exception("Unknown version of messagebus") + raise ValueError("Unknown version of messagebus") Config().set("internal_publisher", publisher) if Debug().enable_sending_logs: publisher.start() diff --git a/src/etos_lib/messaging/v2alpha/publisher.py b/src/etos_lib/messaging/v2alpha/publisher.py index 766e4ea..d6ba6c4 100644 --- a/src/etos_lib/messaging/v2alpha/publisher.py +++ b/src/etos_lib/messaging/v2alpha/publisher.py @@ -26,11 +26,11 @@ # This package: etos_lib/messaging/v2alpha/publisher.py # Root directory: etos_lib/ # Path: ../../ -ROOT = Path(__file__).parent.parent.parent.joinpath("bindings") +ROOT = str(Path(__file__).parent.parent.parent.joinpath("bindings")) LIBRARY_PATH = Path(os.getenv("BINDINGS", ROOT)).joinpath("stream/client.so") -class Publisher(PublisherInterface): +class Publisher(PublisherInterface): # pylint:disable=too-many-instance-attributes """Internal messaging publisher for ETOS. This publisher is running in a shared C library created from @@ -74,15 +74,15 @@ def __init__(self, connection_string: str, stream_name: str): """Initialize the Publisher object.""" self.stream_name = stream_name self.__connection_string = connection_string - self.__setup_bindings(LIBRARY_PATH) - - def __setup_bindings(self, library_path: Path): - """Setup the bindings for the Publisher.""" - self.__library = ctypes.cdll.LoadLibrary(str(library_path)) self.__handler = self.__library.New( self.__connection_string.encode("utf-8"), self.stream_name.encode("utf-8"), ) + self.__setup_bindings(LIBRARY_PATH) + + def __setup_bindings(self, library_path: Path): + """Set up the bindings for the Publisher.""" + self.__library = ctypes.cdll.LoadLibrary(str(library_path)) self.__connect = self.__library.Publisher self.__connect.argtypes = [ ctypes.c_int, # pointer to stream handler @@ -122,7 +122,7 @@ def start(self): self.stream_name.encode("utf-8"), ) if not success: - raise Exception("Failed to connect to stream") + raise ConnectionError("Failed to connect to stream") self.__connected = True def publish(self, testrun_id: str, event: Event): From 69b1ff820194c015638d5686eb37f4b37d72ad52 Mon Sep 17 00:00:00 2001 From: Tobias Persson Date: Mon, 10 Mar 2025 13:07:38 +0100 Subject: [PATCH 08/11] Build go and python from makefile --- .github/workflows/build-publish.yml | 8 +----- .github/workflows/main.yml | 8 ++++++ MANIFEST.in | 2 ++ Makefile | 31 +++++++++++++++++---- cmd/messaging/main.go | 4 +-- setup.py | 22 ++++++++++++++- src/etos_lib/messaging/v2alpha/publisher.py | 8 ++++-- 7 files changed, 66 insertions(+), 17 deletions(-) diff --git a/.github/workflows/build-publish.yml b/.github/workflows/build-publish.yml index 925cd70..9c00493 100644 --- a/.github/workflows/build-publish.yml +++ b/.github/workflows/build-publish.yml @@ -16,14 +16,8 @@ jobs: uses: actions/setup-python@v4 with: python-version: "3.9" - - name: Install pypa/build - run: >- - python3 -m - pip install - build - --user - name: Build a binary wheel and a source tarball - run: python3 -m build + run: make build - name: Store the distribution packages uses: actions/upload-artifact@v4 with: diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index a9db199..a4d4baf 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -27,3 +27,11 @@ jobs: run: pip install -U setuptools - name: Run Tox run: tox + check-golang: + # The type of runner that the job will run on + runs-on: ubuntu-latest + # Steps represent a sequence of tasks that will be executed as part of the job + steps: + - uses: actions/checkout@v2 + - name: Lint Go code + run: make check diff --git a/MANIFEST.in b/MANIFEST.in index 2bd2f85..7597310 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,5 +1,7 @@ prune build prune docs/_build prune docs/api +recursive-include *.go +include go.mod go.sum graft src/etos_lib/bindings/ global-exclude *.pyc *.o diff --git a/Makefile b/Makefile index 85322e6..dcf435e 100644 --- a/Makefile +++ b/Makefile @@ -1,9 +1,13 @@ export GOBIN := $(CURDIR)/bin -export BINDINGS := $(CURDIR)/bindings -export BUILD_MESSAGING := go build --buildmode=c-shared -o $(BINDINGS)/stream/client.so ./cmd/messaging/main.go +export PYBUILD := $(CURDIR)/bin +export VENVDIR := $(CURDIR)/.buildenv +export BINDINGS := $(CURDIR)/src/etos_lib/bindings +export BUILD_MESSAGING := go build --buildmode=c-shared -o $(BINDINGS)/messaging/client.so ./cmd/messaging/main.go +BUILD_PYTHON = python -m build +VIRTUALENV = $(VENVDIR)/bin/python GOLANGCI_LINT = $(GOBIN)/golangci-lint -GOLANGCI_LINT_VERSION = v1.52.2 +GOLANGCI_LINT_VERSION = v1.64.6 .PHONY: all all: check build @@ -30,14 +34,31 @@ staticcheck: $(GOLANGCI_LINT) .PHONY: clean clean: $(RM) $(GOBIN)/* - $(RM) -r $(BINDINGS)/* + $(RM) -r $(BINDINGS) + $(RM) -r $(VENVDIR) + $(RM) -r dist .PHONY: build -build: +build: build-bindings build-python + +.PHONY: build-bindings +build-bindings: $(BUILD_MESSAGING) +.PHONY: build-python +build-python: $(BUILD_PYTHON) + $(BUILD_PYTHON) + $(GOLANGCI_LINT): mkdir -p $(dir $@) curl -sfL \ https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh \ | sh -s -- -b $(GOBIN) $(GOLANGCI_LINT_VERSION) + +$(BUILD_PYTHON): $(VIRTUALENV) + mkdir -p $(dir $@) + $(VIRTUALENV) -m pip install build + +$(VIRTUALENV): + mkdir -p $(dir $@) + python -m virtualenv $(VENVDIR) diff --git a/cmd/messaging/main.go b/cmd/messaging/main.go index 29c691c..787807d 100644 --- a/cmd/messaging/main.go +++ b/cmd/messaging/main.go @@ -18,10 +18,10 @@ package main import ( "C" - // This import will make sure that we build the bindings on make build + // This import will make sure that we build the bindings on make build. _ "github.com/eiffel-community/etos-library/pkg/bindings" ) -// main is required for cgo to work +// main is required for cgo to work. func main() { } diff --git a/setup.py b/setup.py index a3b1c0c..8f383cd 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,10 @@ # -*- coding: utf-8 -*- """Setup file for etos_lib.""" + +from distutils.errors import CompileError +from subprocess import call from setuptools import setup +from setuptools.command.build_ext import build_ext from setuptools_scm.version import get_local_dirty_tag @@ -38,5 +42,21 @@ def local_scheme(version) -> str: return f"+{version.node}" +class BuildGoBindings(build_ext): + """Custom command to build the go bindings in this repository.""" + + def build_extension(self, ext): + """Build the go extension.""" + raise CompileError("Testing") + cmd = ["make", "build"] + out = call(cmd) + if out != 0: + raise CompileError("Go build failed") + + if __name__ == "__main__": - setup(use_scm_version={"local_scheme": local_scheme, "version_scheme": version_scheme}) + setup( + use_scm_version={"local_scheme": local_scheme, "version_scheme": version_scheme}, + cmdclass={"build_ext": BuildGoBindings}, + zip_safe=False, + ) diff --git a/src/etos_lib/messaging/v2alpha/publisher.py b/src/etos_lib/messaging/v2alpha/publisher.py index d6ba6c4..5624b60 100644 --- a/src/etos_lib/messaging/v2alpha/publisher.py +++ b/src/etos_lib/messaging/v2alpha/publisher.py @@ -27,7 +27,7 @@ # Root directory: etos_lib/ # Path: ../../ ROOT = str(Path(__file__).parent.parent.parent.joinpath("bindings")) -LIBRARY_PATH = Path(os.getenv("BINDINGS", ROOT)).joinpath("stream/client.so") +LIBRARY_PATH = Path(os.getenv("BINDINGS", ROOT)).joinpath("messaging/client.so") class Publisher(PublisherInterface): # pylint:disable=too-many-instance-attributes @@ -74,11 +74,11 @@ def __init__(self, connection_string: str, stream_name: str): """Initialize the Publisher object.""" self.stream_name = stream_name self.__connection_string = connection_string + self.__setup_bindings(LIBRARY_PATH) self.__handler = self.__library.New( self.__connection_string.encode("utf-8"), self.stream_name.encode("utf-8"), ) - self.__setup_bindings(LIBRARY_PATH) def __setup_bindings(self, library_path: Path): """Set up the bindings for the Publisher.""" @@ -152,3 +152,7 @@ def close(self): self.__close(self.__handler) self.__connected = False self.__handler = None + + +if __name__ == "__main__": + PUBLISHER = Publisher("amqp://kalle", "kula") From 995e10c504a73ad590f97173389fd8d8fd0a4cd5 Mon Sep 17 00:00:00 2001 From: Tobias Persson Date: Mon, 10 Mar 2025 14:40:59 +0100 Subject: [PATCH 09/11] Add a build check as well --- .github/workflows/main.yml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index a4d4baf..a02d6aa 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -35,3 +35,11 @@ jobs: - uses: actions/checkout@v2 - name: Lint Go code run: make check + run-build: + # The type of runner that the job will run on + runs-on: ubuntu-latest + # Steps represent a sequence of tasks that will be executed as part of the job + steps: + - uses: actions/checkout@v2 + - name: Build python module with Go code + run: make build From e30e8477e7e8b8354e13cd636c2453cdbb9e4431 Mon Sep 17 00:00:00 2001 From: Tobias Persson Date: Mon, 10 Mar 2025 14:46:05 +0100 Subject: [PATCH 10/11] Install virtualenv --- Makefile | 1 + 1 file changed, 1 insertion(+) diff --git a/Makefile b/Makefile index dcf435e..ede1ad3 100644 --- a/Makefile +++ b/Makefile @@ -61,4 +61,5 @@ $(BUILD_PYTHON): $(VIRTUALENV) $(VIRTUALENV): mkdir -p $(dir $@) + pip install virtualenv python -m virtualenv $(VENVDIR) From f8394690fa3c59aa367dc76811a946c5a57ee455 Mon Sep 17 00:00:00 2001 From: Tobias Persson Date: Mon, 10 Mar 2025 14:50:13 +0100 Subject: [PATCH 11/11] And make sure we build from the venv --- Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index ede1ad3..335be6e 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ export VENVDIR := $(CURDIR)/.buildenv export BINDINGS := $(CURDIR)/src/etos_lib/bindings export BUILD_MESSAGING := go build --buildmode=c-shared -o $(BINDINGS)/messaging/client.so ./cmd/messaging/main.go -BUILD_PYTHON = python -m build +BUILD_PYTHON = -m build VIRTUALENV = $(VENVDIR)/bin/python GOLANGCI_LINT = $(GOBIN)/golangci-lint GOLANGCI_LINT_VERSION = v1.64.6 @@ -47,7 +47,7 @@ build-bindings: .PHONY: build-python build-python: $(BUILD_PYTHON) - $(BUILD_PYTHON) + $(VIRTUALENV) $(BUILD_PYTHON) $(GOLANGCI_LINT): mkdir -p $(dir $@)