Skip to content

Commit e244bdb

Browse files
committed
more things
1 parent ab333d6 commit e244bdb

File tree

8 files changed

+996
-68
lines changed

8 files changed

+996
-68
lines changed

contrib/haproxy/stream-processing-offload/cmd/spoa/log.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ func NewLogger() *Logger {
2525
// Info logs an informational message
2626
func (l *Logger) Info(format string, v ...interface{}) {
2727
l.SetPrefix("INFO: ")
28+
//l.Printf(format, v...)
2829
l.Printf(format, v...)
2930
}
3031

@@ -40,6 +41,10 @@ func (l *Logger) Error(format string, v ...interface{}) {
4041
l.Printf(format, v...)
4142
}
4243

44+
func (l *Logger) Errorf(format string, v ...interface{}) {
45+
l.Error("haproxy_spoa: "+format, v...)
46+
}
47+
4348
// Debug logs a debug message
4449
func (l *Logger) Debug(format string, v ...interface{}) {
4550
l.SetPrefix("DEBUG: ")

contrib/haproxy/stream-processing-offload/cmd/spoa/main.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,8 @@ import (
1111
"net"
1212
"os"
1313

14-
"github.com/negasus/haproxy-spoe-go/agent"
15-
"github.com/negasus/haproxy-spoe-go/logger"
16-
1714
"github.com/DataDog/dd-trace-go/contrib/haproxy/stream-processing-offload/v2"
15+
"github.com/negasus/haproxy-spoe-go/agent"
1816
)
1917

2018
var log = NewLogger()
@@ -36,10 +34,10 @@ func main() {
3634
Context: context.Background(),
3735
})
3836

39-
a := agent.New(appsecHAProxy.Handler, logger.NewDefaultLog())
37+
a := agent.New(appsecHAProxy.Handler, log)
4038

4139
log.Info("haproxy_spoa: started\n")
4240
if err := a.Serve(listener); err != nil {
43-
log.Printf("haproxy_spoa: error agent serve: %+v\n", err)
41+
log.Error("haproxy_spoa: error agent serve: %+v\n", err)
4442
}
4543
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2025 Datadog, Inc.
5+
6+
package streamprocessingoffload
7+
8+
import (
9+
"context"
10+
"log"
11+
"net"
12+
13+
"github.com/negasus/haproxy-spoe-go/agent"
14+
"github.com/negasus/haproxy-spoe-go/logger"
15+
)
16+
17+
func Example_server() {
18+
// Create a listener for the server.
19+
ln, err := net.Listen("tcp4", "127.0.0.1:3000")
20+
if err != nil {
21+
log.Fatal(err)
22+
}
23+
24+
// Initialize the SPOA agent server with the configuration
25+
appsecHAProxy := NewHAProxySPOA(AppsecHAProxyConfig{
26+
BlockingUnavailable: false,
27+
BodyParsingSizeLimit: 1000000, // 1MB
28+
Context: context.Background(),
29+
})
30+
31+
a := agent.New(appsecHAProxy.Handler, logger.NewDefaultLog())
32+
33+
// Start serving incoming connections.
34+
if err := a.Serve(ln); err != nil {
35+
log.Fatalf("failed to serve: %v", err)
36+
}
37+
}

contrib/haproxy/stream-processing-offload/haproxy.go

Lines changed: 23 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,6 @@ func NewHAProxySPOA(config AppsecHAProxyConfig) *HAProxySPOA {
4545
func (s *HAProxySPOA) Handler(req *request.Request) {
4646
instr.Logger().Debug("haproxy_spoa: handle request EngineID: '%s', StreamID: '%d', FrameID: '%d' with %d messages", req.EngineID, req.StreamID, req.FrameID, req.Messages.Len())
4747

48-
mp := message_processor.NewMessageProcessor(
49-
message_processor.MessageProcessorConfig{
50-
BlockingUnavailable: false,
51-
BodyParsingSizeLimit: 1024,
52-
},
53-
instr,
54-
)
55-
5648
// Process each message
5749
for i := 0; i < req.Messages.Len(); i++ {
5850
msg, err := req.Messages.GetByIndex(i)
@@ -61,74 +53,65 @@ func (s *HAProxySPOA) Handler(req *request.Request) {
6153
continue
6254
}
6355

56+
// Get current request state from cache or if nil it will be created by the request headers message
57+
reqState, _ := getCurrentRequest(msg)
58+
6459
ctx := context.Background()
65-
mpAction, reqState, err := processMessage(mp, ctx, req, msg)
60+
reqState, mpAction, err := processMessage(s.mp, ctx, req, msg, reqState)
6661
if err != nil {
6762
instr.Logger().Error("haproxy_spoa: error processing message %s: %v", msg.Name, err)
6863
return
6964
}
7065

71-
err = s.handleAction(mpAction, req, &reqState)
66+
err = s.handleAction(mpAction, req, msg, reqState)
7267
if err != nil {
7368
instr.Logger().Error("haproxy_spoa: error processing message %s: %v", msg.Name, err)
7469
return
7570
}
7671
}
7772
}
7873

79-
func processMessage(mp message_processor.MessageProcessor, ctx context.Context, req *request.Request, msg *message.Message) (message_processor.Action, message_processor.RequestState, error) {
80-
instr.Logger().Debug("haproxy_spoa: handling message: %s", msg.Name)
74+
func processMessage(mp message_processor.MessageProcessor, ctx context.Context, req *request.Request, msg *message.Message, currentRequest *message_processor.RequestState) (*message_processor.RequestState, message_processor.Action, error) {
75+
instr.Logger().Debug("f: handling message: %s", msg.Name)
8176

8277
switch msg.Name {
8378
case "http-request-headers-msg":
84-
var (
85-
mpAction message_processor.Action
86-
err error
87-
currentRequest message_processor.RequestState
88-
)
89-
currentRequest, mpAction, err = mp.OnRequestHeaders(ctx, &requestHeadersHAProxy{req: req, msg: msg})
90-
return mpAction, currentRequest, err
79+
return mp.OnRequestHeaders(ctx, &requestHeadersHAProxy{req: req, msg: msg})
9180
case "http-request-body-msg":
92-
currentRequest, err := getCurrentRequest(msg)
93-
if err != nil {
94-
return message_processor.Action{}, message_processor.RequestState{}, err
81+
if currentRequest == nil || !currentRequest.Ongoing {
82+
return nil, message_processor.Action{}, fmt.Errorf("received request body without request headers")
9583
}
9684

97-
var action message_processor.Action
98-
action, err = mp.OnRequestBody(&requestBodyHAProxy{msg: msg}, currentRequest)
99-
return action, currentRequest, err
85+
action, err := mp.OnRequestBody(&requestBodyHAProxy{msg: msg}, currentRequest)
86+
return currentRequest, action, err
10087
case "http-response-headers-msg":
101-
currentRequest, err := getCurrentRequest(msg)
102-
if err != nil {
103-
return message_processor.Action{}, message_processor.RequestState{}, err
88+
if currentRequest == nil || !currentRequest.Ongoing {
89+
return nil, message_processor.Action{}, fmt.Errorf("received response headers without request context")
10490
}
10591

106-
var action message_processor.Action
107-
action, err = mp.OnResponseHeaders(&responseHeadersHAProxy{msg: msg}, currentRequest)
108-
return action, currentRequest, err
92+
action, err := mp.OnResponseHeaders(&responseHeadersHAProxy{msg: msg}, currentRequest)
93+
return currentRequest, action, err
10994
case "http-response-body-msg":
110-
currentRequest, err := getCurrentRequest(msg)
111-
if err != nil {
112-
return message_processor.Action{}, message_processor.RequestState{}, err
95+
if currentRequest == nil || !currentRequest.Ongoing {
96+
return nil, message_processor.Action{}, fmt.Errorf("received response body without request context")
11397
}
11498

115-
var action message_processor.Action
116-
action, err = mp.OnResponseBody(&responseBodyHAProxy{msg: msg}, currentRequest)
117-
return action, currentRequest, err
99+
action, err := mp.OnResponseBody(&responseBodyHAProxy{msg: msg}, currentRequest)
100+
return currentRequest, action, err
118101
default:
119-
return message_processor.Action{}, message_processor.RequestState{}, fmt.Errorf("unknown message type: %s", msg.Name)
102+
return nil, message_processor.Action{}, fmt.Errorf("unknown message type: %s", msg.Name)
120103
}
121104
}
122105

123-
func (s *HAProxySPOA) handleAction(action message_processor.Action, req *request.Request, reqState *message_processor.RequestState) error {
106+
func (s *HAProxySPOA) handleAction(action message_processor.Action, req *request.Request, msg *message.Message, reqState *message_processor.RequestState) error {
124107
switch action.Type {
125108
case message_processor.ActionTypeContinue:
126109
if action.Response == nil {
127110
return nil
128111
}
129112

130113
if data := action.Response.(*message_processor.HeadersResponseData); data != nil {
131-
return setHeadersResponseData(data, req, reqState)
114+
return setHeadersResponseData(data, req, msg, reqState)
132115
}
133116

134117
// Could happen if a new response type with data is implemented, and we forget to handle it here.

contrib/haproxy/stream-processing-offload/haproxy_mp.go

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,16 @@ import (
1111
"fmt"
1212
"github.com/DataDog/dd-trace-go/contrib/envoyproxy/go-control-plane/v2/message_processor"
1313
"net/http"
14+
"strconv"
1415

1516
"github.com/negasus/haproxy-spoe-go/message"
1617
"github.com/negasus/haproxy-spoe-go/request"
1718
)
1819

1920
type requestHeadersHAProxy struct {
20-
req *request.Request
21-
msg *message.Message
21+
req *request.Request
22+
msg *message.Message
23+
hasBody bool
2224
}
2325

2426
func (a *requestHeadersHAProxy) NewRequest(ctx context.Context) (*http.Request, error) {
@@ -43,18 +45,28 @@ func (a *requestHeadersHAProxy) NewRequest(ctx context.Context) (*http.Request,
4345
return nil, fmt.Errorf("no Host header")
4446
}
4547

48+
// Define if a body is present based on Content-Length header
49+
contentLength := headers.Get("Content-Length")
50+
if contentLength != "" {
51+
length, err := strconv.Atoi(contentLength)
52+
if err != nil {
53+
return nil, fmt.Errorf("invalid Content-Length header: %v", err)
54+
}
55+
a.hasBody = length > 0
56+
}
57+
4658
return message_processor.NewRequest(ctx,
4759
scheme,
4860
authority,
4961
path,
5062
method,
5163
headers,
52-
"18.8.8.8",
64+
"123.123.456.111",
5365
tlsState)
5466
}
5567

5668
func (a *requestHeadersHAProxy) EndOfStream() bool {
57-
return true
69+
return !a.hasBody
5870
}
5971

6072
func (a *requestHeadersHAProxy) Component(_ context.Context) string {
@@ -78,7 +90,8 @@ func (a *requestBodyHAProxy) EndOfStream() bool {
7890
}
7991

8092
type responseHeadersHAProxy struct {
81-
msg *message.Message
93+
msg *message.Message
94+
hasBody bool
8295
}
8396

8497
func (a *responseHeadersHAProxy) InitResponseWriter(w http.ResponseWriter) error {
@@ -93,12 +106,22 @@ func (a *responseHeadersHAProxy) InitResponseWriter(w http.ResponseWriter) error
93106
w.Header()[k] = v
94107
}
95108

109+
// Set has body based on Content-Length header
110+
contentLength := headers.Get("Content-Length")
111+
if contentLength != "" {
112+
length, err := strconv.Atoi(contentLength)
113+
if err != nil {
114+
return fmt.Errorf("invalid Content-Length header: %v", err)
115+
}
116+
a.hasBody = length > 0
117+
}
118+
96119
w.WriteHeader(status)
97120
return nil
98121
}
99122

100123
func (a *responseHeadersHAProxy) EndOfStream() bool {
101-
return true
124+
return !a.hasBody
102125
}
103126

104127
type responseBodyHAProxy struct {

0 commit comments

Comments
 (0)