Replies: 10 comments
-
Thanks for this submission. I think this is a good idea - a final, last failure is a special event indeed. The idea to make the options object available in handler via context object is another good one. For immediate solution, is it possible for you to just share the scope between options object and your handler? const RETRY_LIMIT = 10;
function handler(input, { rawMessage }) {
try {
// stuff
} catch (e) {
if (rawMessage.read_ct > RETRY_LIMIT) {
// do some additional stuff
}
throw e;
}
}
EdgeWorker.start(handler, {
retry: {
strategy: 'fixed',
limit: RETRY_LIMIT,
baseDelay: 10
}
}); With this new handler, it would look kinda like this, right: EdgeWorker.start(handler, {
retry: {
strategy: 'fixed',
limit: RETRY_LIMIT,
baseDelay: 10
},
async onLastFailure(input, context, error) {
// handle final failure here
}
}); I'm wondering how we would handle the failures in the onLastFailure handler tho - probably just logging the error appropriately and doing nothing, right? |
Beta Was this translation helpful? Give feedback.
-
edit: moved |
Beta Was this translation helpful? Give feedback.
-
Both of your naming options are much better than my initial name. I'm leaning slightly towards That brings me to another thing I implemented yesterday. I implemented a permanent failure by using |
Beta Was this translation helpful? Give feedback.
-
@dmndtrl you are right about the permanent failure, its confusing! onFinalFailure sounds good enough imo, so lets stick to that. I'm working currently on the worker config in the context, should be released in 1-2h. How are you using this permanent failure - its like a failure on demand, in case you still have retries but discover that you do not want to continue and want to fail without exhausting retries? |
Beta Was this translation helpful? Give feedback.
-
btw, we have a discord, if you are into that just follow https://pgflow.dev/discord/ |
Beta Was this translation helpful? Give feedback.
-
@jumski Exactly, I need to read a domain out of a database to continue trying to validate some TXT records. So the first step is to load the domain (that is also input to the queue) from the database because it could be that a user removed the custom domain while the Here's the code for reference (uses neverthrow). And most errors are const handler: MessageHandlerFn<VerifyDomainContent> = async (input, ctx) => {
let foundDomain: OkType<typeof adminFindDomainInternalByName>[number]
await okAsync()
// check if job domain is even relevant
.andThen(_i => adminFindDomainInternalByName(drizzle, input.domain)
.mapErr(e => ({ "type": "database_error", retryable: true, error: e } as VerifyDomainError))
)
.andTee(([domain]) => foundDomain = domain)
.andThen(d => d.length > 0
? okAsync(d[0])
: errAsync({ type: "domain_missing" } as VerifyDomainError))
.andThen(i => checkDnsRecords(i.domain, i.dnsVerificationTxtName)
.mapErr(e => ({ "type": "dns_fetch_error", retryable: true, error: e } as VerifyDomainError))
)
// check if TXT records match, if not retry
.andThen(records => okAsync(records
.filter(d => d.type === "TXT" && d.name === foundDomain.dnsVerificationTxtName && d.content === foundDomain.dnsVerificationTxtContent))
.andThen(records => records.length > 0 ? okAsync(records[0]) : err({ type: "records_missing", retryable: true } as VerifyDomainError))
)
// mark domain status verified or retry if failed
.andThen(i => adminUpdateDomainInternalStatus(drizzle, foundDomain.domain, "verified")
.mapErr(e => ({ "type": "database_error", retryable: true, error: e } as VerifyDomainError))
)
.match(fromOk, async (err) => await customFromErr(err, ctx))
return
} |
Beta Was this translation helpful? Give feedback.
-
@smndtrl your Two ideas that are worse and one that I think is more explicit and future proof:
Alternative: Flow with Explicit OutcomesIf you want more explicit tracking, you could leverage the fact that return value of handlers are persisted for flow steps and model domain deletion as a valid Flow outcome rather than something exceptional: new Flow<{ domain: string }>({ slug: 'verifyDomain' }).step(
{ slug: 'verify' },
async (input) => {
const domain = await adminFindDomainInternalByName(
drizzle,
input.run.domain
);
if (!domain.length) {
return { status: 'domain_deleted', domain: input.run.domain };
}
// Continue with DNS verification...
// Return { status: "verified" } on success, throw on retryable errors
}
); Note minimal flow contains just a single step, its perfectly fine! This way you can model multiple different outcomes explicitly (domain_deleted, verified, etc.) and query them directly using JSON operators: -- Find all domain deletions during verification
SELECT run_id, flow_slug, output->'verify'->>'domain' as domain
FROM pgflow.runs
WHERE flow_slug = 'verifyDomain'
AND status = 'completed'
AND output @> '{"verify": {"status": "domain_deleted"}}';
-- Find all successful verifications
SELECT run_id, flow_slug, output->'verify'->>'domain' as domain
FROM pgflow.runs
WHERE flow_slug = 'verifyDomain'
AND status = 'completed'
AND output @> '{"verify": {"status": "verified"}}'; TIL about neverthrow - I love functional approach, this is great! |
Beta Was this translation helpful? Give feedback.
-
I think the reason I decided against using flow was that I couldn't represent the polling/retry for 24h*60min times every 60s in the current released version. And because I wanted to get going :) Might revisit the decision later but currently just trying the simple and quick things. |
Beta Was this translation helpful? Give feedback.
-
I'm not sure i get the "polling/retry for 24h*60min every 60s" - would love to understand what you want to accomplish without focusing on particular solution. Thanks for the feedback - there is only exponential backoff retry for flows for now, but it may change! edit: the workerConfig is building but took way longer than anticipated due to type shenanigans i encountered :) should land soon! |
Beta Was this translation helpful? Give feedback.
-
hey @smndtrl i just released 0.6.1 which contains:
Check #203 and check the docs for the workerConfig https://www.pgflow.dev/concepts/context/#workerconfig Would love feedback when you happen to use it in your system. For the final failure handler i need to think about it to nail down edge cases etc, but it is something that pgflow should have. Cheers! |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Hi,
I came across a use case today where I need to trigger a final action when the last retry fails. E.g. calling a webhook or marking a field in the db as "failed".
My immediate instinct was to keep changes simple and add
QueueWorkerConfig
toMessageExecution
so I can compare rawMessage.read_ct to retry.limit and do it fully inside the handler using the context.What do you think?
Beta Was this translation helpful? Give feedback.
All reactions