Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "apify-orchestrator",
"version": "0.6.1",
"version": "0.7.0-rc.2",
"description": "Utilities to orchestrate Apify Actors.",
"engines": {
"node": ">=16.0.0"
Expand Down Expand Up @@ -39,6 +39,7 @@
"@apify/eslint-config": "^1.1.0",
"@apify/tsconfig": "^0.1.1",
"@vitest/coverage-v8": "^4.0.8",
"axios": "^1.13.2",
"eslint": "^9.39.1",
"eslint-config-prettier": "^10.1.8",
"husky": "^9.1.7",
Expand Down
115 changes: 42 additions & 73 deletions src/clients/apify-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { MAIN_LOOP_COOLDOWN_MS, MAIN_LOOP_INTERVAL_MS } from '../constants.js';
import { InsufficientActorJobsError, InsufficientMemoryError } from '../errors.js';
import { isRunOkStatus } from '../tracker.js';
import type { DatasetItem, ExtendedApifyClient, RunRecord } from '../types.js';
import { getUserLimits } from '../utils/apify-api.js';
import { parseStartRunError } from '../utils/apify-client.js';
import type { OrchestratorContext } from '../utils/context.js';
import { Queue } from '../utils/queue.js';
import type { EnqueuedRequest, ExtActorClientOptions, RunResult } from './actor-client.js';
Expand Down Expand Up @@ -80,14 +80,14 @@ export class ExtApifyClient extends ApifyClient implements ExtendedApifyClient {
}
}

if (this.mainLoopId !== undefined) {
if (this.mainLoopId === undefined) {
// Avoid blocking if the orchestrator is not running
for (const callback of runRequest.startCallbacks) {
callback({ run: undefined, error: new Error('Orchestrator is not running') });
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd probably do an early return here, drop the else and have implicit return.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The interface for this function is terrible: the return value is unintelligible.

} else {
this.context.logger.prfxInfo(runRequest.runName, 'Enqueuing Run request');
this.runRequestsQueue.enqueue(runRequest);
} else {
// Avoid blocking if the orchestrator is not running
runRequest.startCallbacks.map((callback) =>
callback({ run: undefined, error: new Error('Orchestrator is not running') }),
);
}

return undefined;
Expand Down Expand Up @@ -178,85 +178,54 @@ export class ExtApifyClient extends ApifyClient implements ExtendedApifyClient {
// Main loop
this.mainLoopId = setInterval(
withMainLoopLock(async () => {
const nextRunRequest = this.runRequestsQueue.peek();
const nextRunRequest = this.runRequestsQueue.dequeue();
if (!nextRunRequest) {
return;
}

const userLimits = await getUserLimits(this.token);

const requiredMemoryMBs =
const getRequiredMemoryMbytes = async () =>
nextRunRequest.options?.memory ??
(await nextRunRequest.defaultMemoryMbytes()) ??
// If no information about memory is available, set the requirement to zero.
0;
const requiredMemoryGBs = requiredMemoryMBs / 1024;
const availableMemoryGBs = userLimits.maxMemoryGBs - userLimits.currentMemoryUsageGBs;
const availableActorJobs = userLimits.maxConcurrentActorJobs - userLimits.activeActorJobCount;

const hasEnoughMemory = availableMemoryGBs - requiredMemoryGBs > 0;
const canRunMoreActors = availableActorJobs > 0;

// Start the next run
if (hasEnoughMemory && canRunMoreActors) {
const runRequest = this.runRequestsQueue.dequeue();
if (runRequest) {
const { runName, input, options } = runRequest;
this.context.logger.prfxInfo(
runName,
'Starting next',
{ queue: this.runRequestsQueue.length, requiredMemoryGBs },
{ availableMemoryGBs },
);
try {
const run = await runRequest.startRun(input, options);
await this.context.runsTracker.updateRun(runName, run);
runRequest.startCallbacks.map((callback) => callback({ run, error: undefined }));
} catch (e) {
this.context.logger.prfxError(runName, 'Failed to start Run', {
message: (e as Error)?.message,
});
runRequest.startCallbacks.map((callback) =>
callback({ run: undefined, error: e as Error }),
);
}
} else {
this.context.logger.error("Something wrong with the Apify orchestrator's queue!");
}
} else if (!this.retryOnInsufficientResources) {
const runRequest = this.runRequestsQueue.dequeue();
if (!runRequest) {
throw new Error('Insufficient resources have been retrieved but no runRequest found!');

const { runName, input, options } = nextRunRequest;

this.context.logger.prfxInfo(runName, 'Starting next', { queue: this.runRequestsQueue.length });

let result: RunResult;

try {
const run = await nextRunRequest.startRun(input, options);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to self: what is startRun doing since we can just fail in case it threw? I'd have thought we had to check for memory/concurrency errors in this case..

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, the startRun function is being passed to the run request, so you have to check who the hell creates the request to understand what's happening. In the refactor, I would like to pass the whole ActorClient in the request, so whoever is in charge to start the run will call the start method, and you'll know what's going on. 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, sorry, I forgot to delete this comment before posting the review - when I have questions I haven't yet quite answered I put them in comments planning to delete them later 😅

result = { run, error: undefined };
} catch (startError) {
this.context.logger.prfxError(runName, 'Failed to start Run', {
message: (startError as Error)?.message,
});
const error = await parseStartRunError(startError, runName, getRequiredMemoryMbytes);
result = { run: undefined, error };
}

const { run, error } = result;

if (run) {
await this.context.runsTracker.updateRun(runName, run);
for (const callback of nextRunRequest.startCallbacks) {
callback({ run, error: undefined });
}
const { runName } = runRequest;

const errorToThrow = (() => {
if (!hasEnoughMemory) {
return new InsufficientMemoryError(runName, requiredMemoryGBs, availableMemoryGBs);
}
if (!canRunMoreActors) {
return new InsufficientActorJobsError(runName);
}
throw new Error(
'Insufficient resources have been retrieved but they did not match any of the checks!',
);
})();

this.context.logger.prfxError(
runName,
'Failed to start Run and retryOnInsufficientResources is set to false',
{
message: errorToThrow.message,
},
);
runRequest.startCallbacks.map((callback) => callback({ run: undefined, error: errorToThrow }));
} else {
// Wait for sometime before checking again
} else if (
this.retryOnInsufficientResources &&

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this isn't related to this PR but was there ever a use-case for not retrying? Feels like an option we preemptively added but now might keep with us but perhaps I'm wrong :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default behavior used to be to retry, which became a problem in specific cases when the memory would never be enough and the actor would retry indefinitely until the run's timeout, since we don't have an option to specify the maximum number of retries.
Catching the error is a way to give the caller the power to decide what to do (retry N times, retry for some time, check other constraints...).
In the future, we could also add the max number of retries (maybe setting a default value) as an easy workaround for most cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we needed it in socials team - turns out users don't appreciate when they have a run running for 20 hours without progress 😅

(error instanceof InsufficientMemoryError || error instanceof InsufficientActorJobsError)
) {
this.context.logger.info(
`Not enough resources: waiting ${MAIN_LOOP_COOLDOWN_MS}ms before trying again`,
{ availableMemoryGBs, availableActorJobs },
);
this.runRequestsQueue.enqueue(nextRunRequest);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we put the request back at the bottom of the queue. I'm thinking of a refactor that would make it easier to customize this behavior, but for now this was the simplest implementation.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would you put it at the end? The reason that is done in Crawlee was probably the idea that it will improve blocking if there is a delay in the same URL but here it makes more sense to just continue in order

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, and with the refactor I would actually change the behavior. I made it so the retried request is prepended instead of enqueued.

this.mainLoopCooldown = MAIN_LOOP_COOLDOWN_MS;
} else {
for (const callback of nextRunRequest.startCallbacks) {
callback({ run: undefined, error });
}
}
}),
MAIN_LOOP_INTERVAL_MS,
Expand Down
8 changes: 2 additions & 6 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,9 @@ export class InsufficientMemoryError extends OrchestratorError {

constructor(
runName: string,
public readonly requiredMemoryGBs: number,
public readonly availableMemoryGBs: number,
public readonly requiredMemoryMBs: number,
) {
super(
`Insufficient memory to start run '${runName}'. Required: ${requiredMemoryGBs}GB, Available: ${availableMemoryGBs}GB`,
runName,
);
super(`Insufficient memory to start run '${runName}'. Required: ${requiredMemoryMBs / 1024}GB.`, runName);
}
}

Expand Down
43 changes: 0 additions & 43 deletions src/utils/apify-api.ts

This file was deleted.

25 changes: 25 additions & 0 deletions src/utils/apify-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { ApifyApiError } from 'apify-client';

import { InsufficientActorJobsError, InsufficientMemoryError } from '../errors.js';

export const MEMORY_LIMIT_EXCEEDED_ERROR_TYPE = 'actor-memory-limit-exceeded';
export const CONCURRENT_RUNS_LIMIT_EXCEEDED_ERROR_TYPE = 'concurrent-runs-limit-exceeded';

export async function parseStartRunError(
error: unknown,
runName: string,
getRequiredMemoryMbytes: () => Promise<number>,
): Promise<Error> {
if (error instanceof ApifyApiError) {
if (error.type === MEMORY_LIMIT_EXCEEDED_ERROR_TYPE) {
return new InsufficientMemoryError(runName, await getRequiredMemoryMbytes());
}
if (error.type === CONCURRENT_RUNS_LIMIT_EXCEEDED_ERROR_TYPE) {
return new InsufficientActorJobsError(runName);
}
}
if (error instanceof Error) {
return error;
}
return new Error(`Unknown error occurred while starting the run: ${runName}`);
}
27 changes: 0 additions & 27 deletions test/clients/actor-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { ExtRunClient } from 'src/clients/run-client.js';
import { DEFAULT_ORCHESTRATOR_OPTIONS, MAIN_LOOP_COOLDOWN_MS } from 'src/constants.js';
import { RunsTracker } from 'src/tracker.js';
import type { OrchestratorOptions } from 'src/types.js';
import * as apifyApi from 'src/utils/apify-api.js';
import type { OrchestratorContext } from 'src/utils/context.js';
import { CustomLogger } from 'src/utils/logging.js';

Expand All @@ -26,18 +25,6 @@ describe('ExtActorClient', () => {
} as ActorRun;
};

/**
* Mocks the user limits API call.
* Necessary for letting the scheduler to start new runs.
*/
const mockUserLimits = () =>
vi.spyOn(apifyApi, 'getUserLimits').mockImplementation(async () => ({
currentMemoryUsageGBs: 1,
maxMemoryGBs: 8,
activeActorJobCount: 1,
maxConcurrentActorJobs: 8,
}));

beforeEach(async () => {
vi.useFakeTimers();
const logger = new CustomLogger(false, false);
Expand Down Expand Up @@ -84,8 +71,6 @@ describe('ExtActorClient', () => {
const existingRun = getMockRun('existing-run-id', 'FAILED');
await context.runsTracker.updateRun('test-run', existingRun);

mockUserLimits();

const newRun = getMockRun('new-run-id', 'READY');
const startSpy = vi.spyOn(ActorClient.prototype, 'start').mockImplementation(async () => {
return newRun;
Expand All @@ -106,8 +91,6 @@ describe('ExtActorClient', () => {
client.startScheduler();
const actorClient = client.actor('test-actor-id');

mockUserLimits();

const newRun = getMockRun('new-run-id', 'READY');
const startSpy = vi.spyOn(ActorClient.prototype, 'start').mockImplementation(async () => {
return newRun;
Expand All @@ -129,8 +112,6 @@ describe('ExtActorClient', () => {
client.startScheduler();
const actorClient = client.actor('test-actor-id');

mockUserLimits();

const startSpy = vi.spyOn(ActorClient.prototype, 'start').mockImplementation(async () => {
return getMockRun('test-id');
});
Expand Down Expand Up @@ -179,8 +160,6 @@ describe('ExtActorClient', () => {
const existingRun = getMockRun('existing-run-id', 'FAILED');
await context.runsTracker.updateRun('test-run', existingRun);

mockUserLimits();

const newRun = getMockRun('new-run-id', 'READY');
vi.spyOn(ActorClient.prototype, 'start').mockImplementation(async () => {
return newRun;
Expand All @@ -207,8 +186,6 @@ describe('ExtActorClient', () => {
client.startScheduler();
const actorClient = client.actor('test-actor-id');

mockUserLimits();

const newRun = getMockRun('new-run-id', 'READY');
vi.spyOn(ActorClient.prototype, 'start').mockImplementation(async () => {
return newRun;
Expand Down Expand Up @@ -360,8 +337,6 @@ describe('ExtActorClient', () => {
client.startScheduler();
const actorClient = client.actor('test-actor-id');

mockUserLimits();

// Mock ActorClient.start
const mockRun = getMockRun('batch-run-id', 'READY');
const startSpy = vi.spyOn(ActorClient.prototype, 'start').mockImplementation(async () => mockRun);
Expand Down Expand Up @@ -417,8 +392,6 @@ describe('ExtActorClient', () => {
client.startScheduler();
const actorClient = client.actor('test-actor-id');

mockUserLimits();

// Mock ActorClient.start
const mockRun = getMockRun('batch-run-id', 'READY');
const startSpy = vi.spyOn(ActorClient.prototype, 'start').mockImplementation(async () => mockRun);
Expand Down
Loading