Skip to content

Commit d3d3812

Browse files
committed
fix(ingest) write fs error to events log
fix(operator) forbid fetch local from local IPs fix(functions-server) get fetch timeout from rotor
1 parent 5d36b26 commit d3d3812

File tree

5 files changed

+33
-17
lines changed

5 files changed

+33
-17
lines changed

bulker/ingest/router.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ func NewRouter(appContext *Context, partitionSelector kafkabase.PartitionSelecto
117117
})
118118

119119
httpClient := &http.Client{
120-
Timeout: time.Duration(appContext.config.DeviceFunctionsTimeoutMs) * time.Millisecond,
120+
Timeout: time.Duration(int64(float64(appContext.config.DeviceFunctionsTimeoutMs)*1.1)) * time.Millisecond,
121121
}
122122

123123
var dataHosts []string
@@ -494,7 +494,7 @@ func (r *Router) processSyncDestination(message *IngestMessage, stream *StreamWi
494494
// Use functions server (new format with execLog)
495495
fsURL := getFunctionsServerURL(r.config.FunctionsServerURLTemplate, stream.Stream.WorkspaceId, classes)
496496
endpointURL := fsURL + "/multi"
497-
r.callFunctionsEndpoint(functionDestinations, endpointURL, messageBytes, functionsResults)
497+
r.callFunctionsEndpoint(stream, functionDestinations, endpointURL, messageBytes, functionsResults)
498498
} else {
499499
// Use rotor (legacy format)
500500
endpointURL := r.config.RotorURL + "/func/multi"
@@ -606,19 +606,23 @@ type ConnectionChainResult struct {
606606

607607
// callFunctionsEndpoint sends a request to functions endpoint and expects new format with execLog
608608
// Response format: map[connectionId]{ events: [], execLog: [] }
609-
func (r *Router) callFunctionsEndpoint(destinations []*ShortDestinationConfig, baseURL string, messageBytes []byte, functionsResults map[string]any) {
609+
func (r *Router) callFunctionsEndpoint(stream *StreamWithDestinations, destinations []*ShortDestinationConfig, baseURL string, messageBytes []byte, functionsResults map[string]any) {
610610
if len(destinations) == 0 {
611611
return
612612
}
613613

614614
ids := utils.ArrayMap(destinations, func(d *ShortDestinationConfig) string { return d.ConnectionId })
615615
var err error
616616
defer func() {
617-
for _, id := range ids {
618-
if err != nil {
617+
if err != nil {
618+
obj := map[string]any{"body": string(messageBytes), "error": "Functions server ", "status": "FS_ERROR"}
619+
r.eventsLogService.PostAsync(&eventslog.ActorEvent{EventType: eventslog.EventTypeIncoming, Level: eventslog.LevelError, ActorId: stream.Stream.Id, Event: obj})
620+
for _, id := range ids {
619621
DeviceFunctions(id, "error").Inc()
620622
DeviceFunctions("total", "error").Inc()
621-
} else {
623+
}
624+
} else {
625+
for _, id := range ids {
622626
DeviceFunctions(id, "success").Inc()
623627
DeviceFunctions("total", "success").Inc()
624628
}
@@ -641,19 +645,22 @@ func (r *Router) callFunctionsEndpoint(destinations []*ShortDestinationConfig, b
641645
res, err := r.httpClient.Do(req)
642646
if err != nil {
643647
r.Errorf("failed to send functions request for connections: %s: %v", ids, err)
648+
err = fmt.Errorf("functions request error: %v", err)
644649
return
645650
}
646651
defer res.Body.Close()
647652

648653
body, err := io.ReadAll(res.Body)
649654
if res.StatusCode != 200 || err != nil {
650655
r.Errorf("Failed to send functions request for connections: %s: status: %v body: %s", ids, res.StatusCode, string(body))
656+
err = fmt.Errorf("functions response error: status %d err: %v", res.StatusCode, err)
651657
return
652658
}
653659
var result map[string]ConnectionChainResult
654660
err = jsoniter.Unmarshal(body, &result)
655661
if err != nil {
656662
r.Errorf("Failed to unmarshal functions response for connections: %s: %v", ids, err)
663+
err = fmt.Errorf("functions response error: %v", err)
657664
return
658665
}
659666

bulker/operator/operator.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1226,6 +1226,10 @@ func (o *Operator) buildDeploymentFromData(data *DeploymentData) *appsv1.Deploym
12261226
Name: "FUNCTIONS_CLASS",
12271227
Value: data.FunctionsClass,
12281228
},
1229+
{
1230+
Name: "FETCH_FORBID_LOCAL",
1231+
Value: "true",
1232+
},
12291233
}
12301234

12311235
// Build containers list

services/rotor/src/functions-server.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -988,7 +988,7 @@ async function main() {
988988
const eventContext = createEventContextFromMessage(message, connection, 0);
989989
const functionsFetchTimeout = req.headers["x-request-timeout-ms"]
990990
? parseNumber(req.headers["x-request-timeout-ms"] as string, 2000)
991-
: 2000;
991+
: parseNumber(env.FETCH_TIMEOUT_MS, 2000);
992992
try {
993993
const result = await runChain(chain, event, eventContext, functionsFetchTimeout);
994994

@@ -1071,7 +1071,11 @@ async function main() {
10711071
...customContext,
10721072
} as EventContext & { retries?: number };
10731073

1074-
const result = await runChain(chain, event, eventContext, parseNumber(env.FETCH_TIMEOUT_MS, 2000));
1074+
const functionsFetchTimeout = req.headers["x-request-timeout-ms"]
1075+
? parseNumber(req.headers["x-request-timeout-ms"] as string, 2000)
1076+
: parseNumber(env.FETCH_TIMEOUT_MS, 2000);
1077+
1078+
const result = await runChain(chain, event, eventContext, functionsFetchTimeout);
10751079

10761080
const totalMs = result.execLog.reduce((sum, e) => sum + (e.ms || 0), 0);
10771081
log.atInfo().log(`← ${connectionId} (${chain.functions.length} functions) completed in ${totalMs}ms`);

services/rotor/src/lib/functions-chain.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,8 @@ export function buildFunctionChain(
300300
functionsClass,
301301
chainCtx,
302302
funcCtx,
303-
eventsLogger
303+
eventsLogger,
304+
fetchTimeoutMs
304305
);
305306
return async (event: AnyEvent, ctx: EventContext) => {
306307
return wrapper(event, ctx);

services/rotor/src/lib/functions-server-client.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -130,19 +130,19 @@ export async function callFunctionsServer(
130130
eventContext: EventContext,
131131
chainCtx: FunctionChainContext,
132132
funcCtx: FunctionContext,
133-
eventsLogger: EventsStore
133+
eventsLogger: EventsStore,
134+
fetchTimeoutMs?: number
134135
): Promise<FunctionsServerResult> {
135136
const serverEnv = getServerEnv();
136137
const url = getFunctionsServerUrl(workspaceId, connectionId, functionsClass);
137138
const timeoutMs = parseInt(serverEnv.FUNCTIONS_SERVER_TIMEOUT_MS);
138139

139-
const controller = new AbortController();
140-
const timeoutId = setTimeout(() => controller.abort(), timeoutMs);
141140
try {
142141
const response = await fetch(url, {
143142
method: "POST",
144143
headers: {
145144
"Content-Type": "application/json",
145+
...(fetchTimeoutMs ? { "x-request-timeout-ms": String(fetchTimeoutMs) } : {}),
146146
},
147147
body: JSON.stringify({
148148
event,
@@ -156,7 +156,7 @@ export async function callFunctionsServer(
156156
retries: eventContext.retries ?? 0,
157157
},
158158
}),
159-
signal: controller.signal,
159+
signal: AbortSignal.timeout(timeoutMs),
160160
});
161161

162162
if (!response.ok) {
@@ -202,8 +202,6 @@ export async function callFunctionsServer(
202202
}
203203
chainCtx.metrics?.status("functions_server", "error", "other").inc(1);
204204
throw new RetryError(`Functions processing failed: ${e.message}`);
205-
} finally {
206-
clearTimeout(timeoutId);
207205
}
208206
}
209207

@@ -217,7 +215,8 @@ export function createFunctionsServerWrapper(
217215
functionsClass: Omit<FunctionsClass, "legacy">,
218216
chainCtx: FunctionChainContext,
219217
funcCtx: FunctionContext,
220-
eventsLogger: EventsStore
218+
eventsLogger: EventsStore,
219+
fetchTimeoutMs?: number
221220
): (event: AnyEvent, ctx: EventContext) => Promise<AnyEvent | AnyEvent[] | "drop" | undefined> {
222221
return async (event: AnyEvent, ctx: EventContext) => {
223222
try {
@@ -229,7 +228,8 @@ export function createFunctionsServerWrapper(
229228
ctx,
230229
chainCtx,
231230
funcCtx,
232-
eventsLogger
231+
eventsLogger,
232+
fetchTimeoutMs
233233
);
234234

235235
// Check for errors in execLog - similar to checkError in udf-wrapper-code.txtjs

0 commit comments

Comments
 (0)