Skip to content

Commit 8a818be

Browse files
committed
upload buffer when limit is reached
Signed-off-by: Sebastian Spaink <[email protected]>
1 parent cb97641 commit 8a818be

File tree

14 files changed

+466
-170
lines changed

14 files changed

+466
-170
lines changed

docs/docs/configuration.md

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -828,24 +828,24 @@ included in the actual bundle gzipped tarball.
828828

829829
## Decision Logs
830830

831-
| Field | Type | Required | Description |
832-
|----------------------------------------------------|-----------|-----------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
833-
| `decision_logs.service` | `string` | No | Name of the service to use to contact remote server. If no `plugin` is specified, and `console` logging is disabled, this will default to the first `service` name defined in the Services configuration. |
834-
| `decision_logs.partition_name` | `string` | No | Deprecated: Use `resource` instead. Path segment to include in status updates. |
835-
| `decision_logs.resource` | `string` | No (default: `/logs`) | Full path to use for sending decision logs to a remote server. |
836-
| `decision_logs.reporting.buffer_type` | `string` | No (default: `size`) | Toggles the type of buffer to use. The two available options are "size" or "event". Refer to the [Decision Log Plugin README](https://github.com/open-policy-agent/opa/tree/main/v1/plugins/logs/README.md) for for a detailed comparison. |
837-
| `decision_logs.reporting.buffer_size_limit_events` | `int64` | No (default: `10000`) | Decision log buffer size limit by events. OPA will drop old events from the log if this limit is exceeded. By default, 100 events are held. This number has to be greater than zero. Only works with "event" buffer type. |
838-
| `decision_logs.reporting.buffer_size_limit_bytes` | `int64` | No (default: `unlimited`) | Decision log buffer size limit in bytes. OPA will drop old events from the log if this limit is exceeded. By default, no limit is set. Only one of `buffer_size_limit_bytes`, `max_decisions_per_second` may be set. Only works with "size" buffer type. |
839-
| `decision_logs.reporting.max_decisions_per_second` | `float64` | No | Maximum number of decision log events to buffer per second. OPA will drop events if the rate limit is exceeded. Only one of `buffer_size_limit_bytes`, `max_decisions_per_second` may be set. |
840-
| `decision_logs.reporting.upload_size_limit_bytes` | `int64` | No (default: `32768`) | Decision log upload size limit in bytes. This limit enforces the maximum size of a gzip compressed payload of events within the message body. |
841-
| `decision_logs.reporting.min_delay_seconds` | `int64` | No (default: `300`) | Minimum amount of time to wait between uploads. |
842-
| `decision_logs.reporting.max_delay_seconds` | `int64` | No (default: `600`) | Maximum amount of time to wait between uploads. |
843-
| `decision_logs.reporting.trigger` | `string` | No (default: `periodic`) | Controls how decision logs are reported to the remote server. Allowed values are `periodic` and `manual` (`manual` triggers are only possible when using OPA as a Go package). |
844-
| `decision_logs.mask_decision` | `string` | No (default: `/system/log/mask`) | Set path of masking decision. |
845-
| `decision_logs.drop_decision` | `string` | No (default: `/system/log/drop`) | Set path of drop decision. |
846-
| `decision_logs.plugin` | `string` | No | Use the named plugin for decision logging. If this field exists, the other configuration fields are not required. |
847-
| `decision_logs.console` | `boolean` | No (default: `false`) | Log the decisions locally to the console. When enabled alongside a remote decision logging API the `service` must be configured, the default `service` selection will be disabled. |
848-
| `decision_logs.request_context.http.headers` | `array` | No | List of HTTP headers to include in the decision log. OPA will include the values for these headers in the decision log if they exist in the incoming HTTP request. |
831+
| Field | Type | Required | Description |
832+
|----------------------------------------------------|-----------|----------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
833+
| `decision_logs.service` | `string` | No | Name of the service to use to contact remote server. If no `plugin` is specified, and `console` logging is disabled, this will default to the first `service` name defined in the Services configuration. |
834+
| `decision_logs.partition_name` | `string` | No | Deprecated: Use `resource` instead. Path segment to include in status updates. |
835+
| `decision_logs.resource` | `string` | No (default: `/logs`) | Full path to use for sending decision logs to a remote server. |
836+
| `decision_logs.reporting.buffer_type` | `string` | No (default: `size`) | Toggles the type of buffer to use. The two available options are "size" or "event". Refer to the [Decision Log Plugin README](https://github.com/open-policy-agent/opa/tree/main/v1/plugins/logs/README.md) for for a detailed comparison. |
837+
| `decision_logs.reporting.buffer_size_limit_events` | `int64` | No (default: `10000`) | Decision log buffer size limit by events. OPA will drop old events from the log if this limit is exceeded. By default, 100 events are held. This number has to be greater than zero. Only works with "event" buffer type. |
838+
| `decision_logs.reporting.buffer_size_limit_bytes` | `int64` | No (default: `unlimited`) | Decision log buffer size limit in bytes. OPA will drop old events from the log if this limit is exceeded. By default, no limit is set. Only one of `buffer_size_limit_bytes`, `max_decisions_per_second` may be set. Only works with "size" buffer type. |
839+
| `decision_logs.reporting.max_decisions_per_second` | `float64` | No | Maximum number of decision log events to buffer per second. OPA will drop events if the rate limit is exceeded. Only one of `buffer_size_limit_bytes`, `max_decisions_per_second` may be set. |
840+
| `decision_logs.reporting.upload_size_limit_bytes` | `int64` | No (default: `32768`) | Decision log upload size limit in bytes. This limit enforces the maximum size of a gzip compressed payload of events within the message body. |
841+
| `decision_logs.reporting.min_delay_seconds` | `int64` | No (default: `300`) | Minimum amount of time to wait between uploads. |
842+
| `decision_logs.reporting.max_delay_seconds` | `int64` | No (default: `600`) | Maximum amount of time to wait between uploads. |
843+
| `decision_logs.reporting.trigger` | `string` | No (default: `periodic`) | Controls how decision logs are reported to the remote server. Allowed values are `periodic`, `manual` and `immediate` (`manual` triggers are only possible when using OPA as a Go package). |
844+
| `decision_logs.mask_decision` | `string` | No (default: `/system/log/mask`) | Set path of masking decision. |
845+
| `decision_logs.drop_decision` | `string` | No (default: `/system/log/drop`) | Set path of drop decision. |
846+
| `decision_logs.plugin` | `string` | No | Use the named plugin for decision logging. If this field exists, the other configuration fields are not required. |
847+
| `decision_logs.console` | `boolean` | No (default: `false`) | Log the decisions locally to the console. When enabled alongside a remote decision logging API the `service` must be configured, the default `service` selection will be disabled. |
848+
| `decision_logs.request_context.http.headers` | `array` | No | List of HTTP headers to include in the decision log. OPA will include the values for these headers in the decision log if they exist in the incoming HTTP request. |
849849

850850
## Discovery
851851

v1/plugins/bundle/config_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ func TestParseConfigTriggerMode(t *testing.T) {
471471
conf: `{"b1":{"service": "s1", "trigger": "foo"}}`,
472472
services: []string{"s1"},
473473
wantError: true,
474-
err: errors.New("invalid configuration for bundle \"b1\": invalid trigger mode \"foo\" (want \"periodic\" or \"manual\")"),
474+
err: errors.New("invalid configuration for bundle \"b1\": invalid trigger mode \"foo\" (want \"periodic\", \"manual\" or \"immediate\")"),
475475
triggerMode: nil,
476476
},
477477
}

v1/plugins/logs/README.md

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@ the user to decide when to upload, drop or proxy a logged event. Each configurat
77
Events are uploaded in gzip compressed JSON array's at a user defined interval. This can either be triggered periodically
88
or manually through the SDK. The size of the gzip compressed JSON array is limited by `upload_size_limit_bytes`.
99

10+
## Buffer Type
11+
1012
There are two buffer implementations that can be selected by setting `decision_logs.reporting.buffer_type`, defaults to `size`
1113

12-
## Event Buffer
14+
### Event Buffer
1315

1416
* `decision_logs.reporting.buffer_type=event`
1517

@@ -45,7 +47,7 @@ flowchart LR
4547
4648
```
4749

48-
## Size Buffer
50+
### Size Buffer
4951

5052
* `decision_logs.reporting.buffer_type=size`
5153

@@ -85,4 +87,33 @@ flowchart LR
8587
Buffer -. POST .-> service
8688
classDef large font-size:20pt;
8789
88-
```
90+
```
91+
92+
## Triggers
93+
94+
There are three trigger options that can be selected by setting `decision_logs.reporting.trigger`, defaults to
95+
`periodic`.
96+
97+
### Periodic
98+
99+
Uploads are purposely delayed by number of seconds randomly selected between a minimum and maximum. The default delay
100+
range is 300-600 seconds, this can be configured by setting `decision_logs.reporting.min_delay_seconds` and
101+
`decision_logs.reporting.max_delay_seconds`.
102+
103+
It is recommended to use this trigger mode to prevent overloading the service with upload requests.
104+
105+
### Immediate
106+
107+
As soon as enough events are received that fill the buffer the plugin will trigger an upload. When using this
108+
trigger mode the `min_delay_seconds` cannot be set as it can be considered to be 0. The `max_delay_seconds` is still
109+
configurable in case not enough events are received to hit the upload limit. Configure `max_delay_seconds` to be larger
110+
than the average amount of time it takes to fill up the buffer.
111+
112+
It is recommended to use this trigger mode if you want a constant stream of incoming data. Minimizes amount of events
113+
dropped during peak traffic hours.
114+
115+
### Manual
116+
117+
This option can only be used when using OPA as a Go package. The OPA Go package exposes as method
118+
called [Plugin.Trigger](https://pkg.go.dev/github.com/open-policy-agent/[email protected]/v1/plugins/logs#Plugin.Trigger)
119+
that can be called to trigger an upload.

v1/plugins/logs/buffer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func newLogBuffer(limit int64) *logBuffer {
2929
}
3030
}
3131

32-
func (lb *logBuffer) Push(bs []byte) (dropped int) {
32+
func (lb *logBuffer) Push(bs []byte) (dropped int, full bool) {
3333
size := int64(len(bs))
3434

3535
if lb.limit > 0 {
@@ -45,7 +45,7 @@ func (lb *logBuffer) Push(bs []byte) (dropped int) {
4545

4646
lb.l.PushBack(elem)
4747
lb.usage += size
48-
return dropped
48+
return dropped, lb.usage >= lb.limit
4949
}
5050

5151
func (lb *logBuffer) Pop() []byte {

v1/plugins/logs/buffer_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ func TestLogBuffer(t *testing.T) {
1313

1414
buffer := newLogBuffer(int64(20)) // 20 byte limit for test purposes
1515

16-
dropped := buffer.Push(make([]byte, 20))
16+
dropped, _ := buffer.Push(make([]byte, 20))
1717
if dropped != 0 {
1818
t.Fatal("Expected dropped to be zero")
1919
}
@@ -28,17 +28,17 @@ func TestLogBuffer(t *testing.T) {
2828
t.Fatal("Expected buffer to be nil")
2929
}
3030

31-
dropped = buffer.Push(bytes.Repeat([]byte(`1`), 10))
31+
dropped, _ = buffer.Push(bytes.Repeat([]byte(`1`), 10))
3232
if dropped != 0 {
3333
t.Fatal("Expected dropped to be zero")
3434
}
3535

36-
dropped = buffer.Push(bytes.Repeat([]byte(`2`), 10))
36+
dropped, _ = buffer.Push(bytes.Repeat([]byte(`2`), 10))
3737
if dropped != 0 {
3838
t.Fatal("Expected dropped to be zero")
3939
}
4040

41-
dropped = buffer.Push(bytes.Repeat([]byte(`3`), 10))
41+
dropped, _ = buffer.Push(bytes.Repeat([]byte(`3`), 10))
4242
if dropped != 1 {
4343
t.Fatal("Expected dropped to be 1")
4444
}

v1/plugins/logs/encoder.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func (enc *chunkEncoder) Encode(event EventV1, eventBytes []byte) ([][]byte, err
135135
}
136136
}
137137

138-
if int64(len(eventBytes)+enc.bytesWritten+1) <= enc.uncompressedLimit {
138+
if int64(len(eventBytes)+enc.bytesWritten+1) < enc.uncompressedLimit {
139139
return nil, enc.appendEvent(eventBytes)
140140
}
141141

@@ -167,16 +167,12 @@ func (enc *chunkEncoder) Encode(event EventV1, eventBytes []byte) ([][]byte, err
167167
}
168168

169169
currentSize := len(result)
170-
if currentSize < int(enc.limit) {
171-
// success! the incoming chunk doesn't have to lose the ND cache and can go into a chunk by itself
172-
// scale up the uncompressed limit using the uncompressed event size as a base
173-
err = enc.appendEvent(eventBytes)
174-
if err != nil {
175-
return nil, err
176-
}
170+
if currentSize <= int(enc.limit) {
177171
enc.uncompressedLimit = int64(len(eventBytes))
178172
enc.scaleUp()
179-
return nil, nil
173+
// success! the incoming chunk doesn't have to lose the ND cache and can go into a chunk by itself
174+
// scale up the uncompressed limit using the uncompressed event size as a base
175+
return [][]byte{result}, nil
180176
}
181177

182178
// The ND cache has to be dropped, record this size as a known maximum event size
@@ -266,7 +262,7 @@ func (enc *chunkEncoder) Encode(event EventV1, eventBytes []byte) ([][]byte, err
266262
}
267263

268264
// 3) Equilibrium: If the chunk size is between 90% and 100% of the user-configured limit, maintain uncompressed limit value.
269-
if int(enc.limit) > len(result) && len(result) >= enc.threshold {
265+
if int(enc.limit) >= len(result) && len(result) >= enc.threshold {
270266
enc.incrMetric(encUncompressedLimitStableCounterName)
271267
enc.incrMetric(encSoftLimitStableCounterName)
272268

v1/plugins/logs/encoder_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ func TestChunkEncoderAdaptive(t *testing.T) {
348348
expectedMaxEventsInChunk: 1,
349349
expectedScaleUpEvents: 1,
350350
expectedScaleDownEvents: 0,
351-
expectedEquiEvents: 999,
351+
expectedEquiEvents: 998,
352352
},
353353
{
354354
// 61 events can fit, but takes some guessing before it gets to the uncompressed limit 7200

0 commit comments

Comments
 (0)