Skip to content

Commit f8ad049

Browse files
committed
more progression
1 parent d2ca22b commit f8ad049

File tree

10 files changed

+679
-127
lines changed

10 files changed

+679
-127
lines changed
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package streamprocessingoffload
2+
3+
import (
4+
"fmt"
5+
"github.com/DataDog/dd-trace-go/contrib/envoyproxy/go-control-plane/v2/message_processor"
6+
"github.com/jellydator/ttlcache/v3"
7+
"github.com/negasus/haproxy-spoe-go/message"
8+
"time"
9+
)
10+
11+
const (
12+
requestStateTTL = 60 * time.Second
13+
)
14+
15+
var requestStateCache *ttlcache.Cache[uint64, *message_processor.RequestState]
16+
17+
func initRequestStateCache() {
18+
requestStateCache = ttlcache.New[uint64, *message_processor.RequestState](
19+
ttlcache.WithTTL[uint64, *message_processor.RequestState](requestStateTTL),
20+
)
21+
go requestStateCache.Start()
22+
}
23+
24+
func getCurrentRequest(msg *message.Message) (message_processor.RequestState, error) {
25+
if requestStateCache == nil {
26+
return message_processor.RequestState{}, fmt.Errorf("requestStateCache is not initialized")
27+
}
28+
key := spanIDFromMessage(msg)
29+
if key == 0 {
30+
return message_processor.RequestState{}, fmt.Errorf("span_id not found in message")
31+
}
32+
if item := requestStateCache.Get(key); item != nil {
33+
if v := item.Value(); v != nil {
34+
return *v, nil
35+
}
36+
}
37+
return message_processor.RequestState{}, fmt.Errorf("no current request found for span_id %d", key)
38+
}
39+
40+
func storeCurrentRequest(spanId uint64, rs message_processor.RequestState) error {
41+
if requestStateCache == nil {
42+
return fmt.Errorf("requestStateCache is not initialized")
43+
}
44+
local := rs
45+
requestStateCache.Set(spanId, &local, ttlcache.DefaultTTL)
46+
return nil
47+
}
48+
49+
// deleteCurrentRequest removes a RequestState from the cache; call this at end of request lifecycle
50+
func deleteCurrentRequest(spanId uint64) error {
51+
if requestStateCache == nil {
52+
return fmt.Errorf("requestStateCache is not initialized")
53+
}
54+
requestStateCache.Delete(spanId)
55+
return nil
56+
}

contrib/haproxy/stream-processing-offload/cmd/spoa/main.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
package main
77

88
import (
9+
"context"
10+
"github.com/DataDog/dd-trace-go/v2/ddtrace/tracer"
911
"net"
1012
"os"
1113

@@ -27,7 +29,16 @@ func main() {
2729
}
2830
defer listener.Close()
2931

30-
a := agent.New(haproxy.Handler, logger.NewDefaultLog())
32+
_ = tracer.Start(tracer.WithAppSecEnabled(true))
33+
defer tracer.Stop()
34+
35+
appsecHAProxy := streamprocessingoffload.NewHAProxySPOA(streamprocessingoffload.AppsecHAProxyConfig{
36+
BlockingUnavailable: false,
37+
BodyParsingSizeLimit: 1000000, // 1MB
38+
Context: context.Background(),
39+
})
40+
41+
a := agent.New(appsecHAProxy.Handler, logger.NewDefaultLog())
3142

3243
if err := a.Serve(listener); err != nil {
3344
log.Printf("error agent serve: %+v\n", err)

contrib/haproxy/stream-processing-offload/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.23.1
55
require (
66
github.com/DataDog/dd-trace-go/v2 v2.3.0-dev.1
77
github.com/negasus/haproxy-spoe-go v1.0.6
8+
github.com/jellydator/ttlcache/v3 v3.4.0
89
)
910

1011
require (
Lines changed: 116 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,61 @@
1-
package main
1+
package streamprocessingoffload
22

33
import (
4+
"context"
5+
"fmt"
46
"log"
5-
"strings"
67

7-
"github.com/negasus/haproxy-spoe-go/action"
8+
"github.com/DataDog/dd-trace-go/contrib/envoyproxy/go-control-plane/v2/message_processor"
9+
"github.com/DataDog/dd-trace-go/v2/instrumentation"
10+
811
"github.com/negasus/haproxy-spoe-go/message"
912
"github.com/negasus/haproxy-spoe-go/request"
1013
)
1114

15+
var instr *instrumentation.Instrumentation
16+
17+
func init() {
18+
instr = instrumentation.Load(instrumentation.PackageHAProxyStreamProcessingOffload)
19+
}
20+
21+
type HAProxySPOA struct {
22+
mp message_processor.MessageProcessor
23+
}
24+
25+
type AppsecHAProxyConfig struct {
26+
Context context.Context
27+
BlockingUnavailable bool
28+
BodyParsingSizeLimit int
29+
}
30+
31+
func NewHAProxySPOA(config AppsecHAProxyConfig) *HAProxySPOA {
32+
mp := message_processor.NewMessageProcessor(message_processor.MessageProcessorConfig{
33+
BlockingUnavailable: config.BlockingUnavailable,
34+
BodyParsingSizeLimit: config.BodyParsingSizeLimit,
35+
}, instr)
36+
37+
handler := &HAProxySPOA{
38+
mp: mp,
39+
}
40+
41+
initRequestStateCache()
42+
43+
return handler
44+
}
45+
1246
// Handler processes SPOE requests from HAProxy
13-
func Handler(req *request.Request) {
47+
func (s *HAProxySPOA) Handler(req *request.Request) {
1448
log.Printf("handle request EngineID: '%s', StreamID: '%d', FrameID: '%d' with %d messages",
1549
req.EngineID, req.StreamID, req.FrameID, req.Messages.Len())
1650

51+
mp := message_processor.NewMessageProcessor(
52+
message_processor.MessageProcessorConfig{
53+
BlockingUnavailable: false,
54+
BodyParsingSizeLimit: 1024,
55+
},
56+
instr,
57+
)
58+
1759
// Process each message
1860
for i := 0; i < req.Messages.Len(); i++ {
1961
msg, err := req.Messages.GetByIndex(i)
@@ -22,123 +64,90 @@ func Handler(req *request.Request) {
2264
continue
2365
}
2466

25-
//log.Printf("Processing message: '%s'", msg.Name)
26-
27-
switch msg.Name {
28-
case "http-request-headers-msg":
29-
handleRequestHeadersMessage(req, msg)
30-
case "http-request-body-msg":
31-
handleRequestBodyMessage(req, msg)
32-
case "http-response-headers-msg":
33-
handleResponseHeadersMessage(req, msg)
34-
case "http-response-body-msg":
35-
handleResponseBodyMessage(req, msg)
36-
default:
37-
log.Printf("Unknown message type: %s", msg.Name)
67+
ctx := context.Background()
68+
mpAction, reqState, err := processMessage(mp, ctx, req, msg)
69+
if err != nil {
70+
log.Printf("Error processing message %s: %v", msg.Name, err)
71+
return
3872
}
39-
}
40-
}
4173

42-
func handleRequestHeadersMessage(req *request.Request, msg *message.Message) {
43-
// Extract headers and analyze them
44-
method := getStringValue(msg, "method")
45-
path := getStringValue(msg, "path")
46-
headers := getStringValue(msg, "headers")
47-
48-
log.Printf("Headers - Method: %s, Path: %s", method, path)
49-
50-
isJSON := isJSONContentType(headers)
51-
52-
log.Printf("Content-Type analysis - Is JSON: %t", isJSON)
53-
54-
// Always mark headers as processed
55-
setVariable(req, "headers_processed", "true")
56-
57-
req.Actions.SetVar(action.ScopeTransaction, "span_id", 1234)
58-
}
59-
60-
func handleRequestBodyMessage(req *request.Request, msg *message.Message) {
61-
// This should only be called for JSON content
62-
body := getBytesArrayValue(msg, "body")
63-
64-
log.Printf("Processing JSON Request body - Size: %d bytes", len(body))
74+
err = s.handleAction(mpAction, req, &reqState)
75+
if err != nil {
76+
log.Printf("Error handling action for message %s: %v", msg.Name, err)
77+
return
78+
}
79+
}
6580
}
6681

67-
func handleResponseHeadersMessage(req *request.Request, msg *message.Message) {
68-
status := getIntValue(msg, "status")
69-
headers := getStringValue(msg, "headers")
70-
71-
log.Printf("Response Headers - Status: %d", status)
72-
log.Printf("Response Headers content: %s", headers)
73-
74-
isJSON := isJSONContentType(headers)
75-
log.Printf("Response Content-Type analysis - Is JSON: %t", isJSON)
76-
}
82+
func processMessage(mp message_processor.MessageProcessor, ctx context.Context, req *request.Request, msg *message.Message) (message_processor.Action, message_processor.RequestState, error) {
83+
log.Printf("Handling message: %s", msg.Name)
84+
85+
switch msg.Name {
86+
case "http-request-headers-msg":
87+
var (
88+
mpAction message_processor.Action
89+
err error
90+
currentRequest message_processor.RequestState
91+
)
92+
currentRequest, mpAction, err = mp.OnRequestHeaders(ctx, &requestHeadersHAProxy{req: req, msg: msg})
93+
return mpAction, currentRequest, err
94+
case "http-request-body-msg":
95+
currentRequest, err := getCurrentRequest(msg)
96+
if err != nil {
97+
return message_processor.Action{}, message_processor.RequestState{}, err
98+
}
7799

78-
func handleResponseBodyMessage(req *request.Request, msg *message.Message) {
79-
body := getBytesArrayValue(msg, "body")
100+
var action message_processor.Action
101+
action, err = mp.OnRequestBody(&requestBodyHAProxy{msg: msg}, currentRequest)
102+
return action, currentRequest, err
103+
case "http-response-headers-msg":
104+
currentRequest, err := getCurrentRequest(msg)
105+
if err != nil {
106+
return message_processor.Action{}, message_processor.RequestState{}, err
107+
}
80108

81-
log.Printf("Processing JSON Response body - Size: %d bytes", len(body))
82-
}
109+
var action message_processor.Action
110+
action, err = mp.OnResponseHeaders(&responseHeadersHAProxy{msg: msg}, currentRequest)
111+
return action, currentRequest, err
112+
case "http-response-body-msg":
113+
currentRequest, err := getCurrentRequest(msg)
114+
if err != nil {
115+
return message_processor.Action{}, message_processor.RequestState{}, err
116+
}
83117

84-
// Helper function to set SPOE variables
85-
func setVariable(req *request.Request, name, value string) {
86-
//log.Printf("Setting variable %s = %s", name, value)
87-
88-
// Use the Actions interface to set variables
89-
if req.Actions != nil {
90-
// Create a set-var action using the library's action interface
91-
// The exact API may vary, but this is the typical pattern
92-
req.Actions.SetVar(action.ScopeTransaction, name, value)
93-
} else {
94-
log.Printf("WARNING: req.Actions is nil, cannot set variable %s", name)
118+
var action message_processor.Action
119+
action, err = mp.OnResponseBody(&responseBodyHAProxy{msg: msg}, currentRequest)
120+
return action, currentRequest, err
121+
default:
122+
return message_processor.Action{}, message_processor.RequestState{}, fmt.Errorf("unknown message type: %s", msg.Name)
95123
}
96124
}
97125

98-
// Helper function to check if Content-Type indicates JSON
99-
func isJSONContentType(headers string) bool {
100-
// Parse headers and look for Content-Type
101-
lines := strings.Split(headers, "\n")
102-
for _, line := range lines {
103-
if strings.HasPrefix(strings.ToLower(line), "content-type:") {
104-
contentType := strings.ToLower(strings.TrimSpace(line[13:]))
105-
return strings.Contains(contentType, "application/json") ||
106-
strings.Contains(contentType, "text/json") ||
107-
strings.HasSuffix(contentType, "+json")
126+
func (s *HAProxySPOA) handleAction(action message_processor.Action, req *request.Request, reqState *message_processor.RequestState) error {
127+
switch action.Type {
128+
case message_processor.ActionTypeContinue:
129+
if action.Response == nil {
130+
return nil
108131
}
109-
}
110-
return false
111-
}
112132

113-
// Helper functions to extract values from SPOE messages
114-
func getStringValue(msg *message.Message, key string) string {
115-
if val, exists := msg.KV.Get(key); exists {
116-
if str, ok := val.(string); ok {
117-
return str
133+
if data := action.Response.(*message_processor.HeadersResponseData); data != nil {
134+
// Set the headers in the request
135+
// setHeadersResponseData(data)
136+
return nil
118137
}
119-
}
120-
return ""
121-
}
122138

123-
func getBytesArrayValue(msg *message.Message, key string) []byte {
124-
if val, exists := msg.KV.Get(key); exists {
125-
if bytes, ok := val.([]byte); ok {
126-
return bytes
127-
}
139+
// Could happen if a new response type with data is implemented, and we forget to handle it here.
140+
// However, at the moment, we only have HeadersResponseData as a response type for ActionTypeContinue
141+
return fmt.Errorf("unknown action data type: %T for ActionTypeContinue", action.Response)
142+
case message_processor.ActionTypeBlock:
143+
data := action.Response.(*message_processor.BlockResponseData)
144+
setBlockResponseData(data, req)
145+
_ = deleteCurrentRequest(reqState.Span.Context().SpanID())
146+
return nil
147+
case message_processor.ActionTypeFinish:
148+
// Remove the current request from the cache
149+
_ = deleteCurrentRequest(reqState.Span.Context().SpanID())
128150
}
129-
return nil
130-
}
131151

132-
func getIntValue(msg *message.Message, key string) int {
133-
if val, exists := msg.KV.Get(key); exists {
134-
switch v := val.(type) {
135-
case int:
136-
return v
137-
case int64:
138-
return int(v)
139-
case uint64:
140-
return int(v)
141-
}
142-
}
143-
return 0
152+
return fmt.Errorf("unknown action type: %T", action.Type)
144153
}

0 commit comments

Comments
 (0)