Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions migrations/0004_repo_report_canonical_full_names.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
WITH ranked_repo_reports AS (
SELECT
ctid,
row_number() OVER (
PARTITION BY lower(full_name)
ORDER BY
CASE
WHEN status = 'ready' AND report_json IS NOT NULL THEN 3
WHEN status = 'processing' THEN 2
WHEN status = 'queued' THEN 1
ELSE 0
END DESC,
updated_at DESC,
created_at DESC,
full_name ASC
) AS duplicate_rank
FROM repo_reports
)
DELETE FROM repo_reports
WHERE ctid IN (
SELECT ctid
FROM ranked_repo_reports
WHERE duplicate_rank > 1
);

UPDATE repo_reports
SET
full_name = lower(full_name),
owner = lower(owner),
repo = lower(repo),
github_url = 'https://github.com/' || lower(full_name)
WHERE
full_name <> lower(full_name)
OR owner <> lower(owner)
OR repo <> lower(repo)
OR github_url <> 'https://github.com/' || lower(full_name);

CREATE UNIQUE INDEX IF NOT EXISTS repo_reports_full_name_lower_idx
ON repo_reports ((lower(full_name)));
8 changes: 5 additions & 3 deletions src/server/live-status.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { Logger } from "../core/logger.ts"
import { REPO_PROGRESS_PREFIX, REPO_PROGRESS_TTL_SECONDS } from "./constants.ts"
import { canonicalizeRepoFullName } from "./repo-key.ts"
import { getRedisClient } from "./queue.ts"

export type RepoLiveStatusPayload = {
Expand All @@ -12,7 +13,7 @@ export type RepoLiveStatusPayload = {
}

function progressKey(fullName: string): string {
return `${REPO_PROGRESS_PREFIX}${fullName}`
return `${REPO_PROGRESS_PREFIX}${canonicalizeRepoFullName(fullName)}`
}

export async function writeRepoLiveStatus(
Expand All @@ -21,16 +22,17 @@ export async function writeRepoLiveStatus(
logger?: Logger,
): Promise<void> {
const redis = await getRedisClient()
const canonicalFullName = canonicalizeRepoFullName(fullName)
const value: RepoLiveStatusPayload = {
...payload,
updatedAt: new Date().toISOString(),
}

await redis.set(progressKey(fullName), JSON.stringify(value), {
await redis.set(progressKey(canonicalFullName), JSON.stringify(value), {
EX: REPO_PROGRESS_TTL_SECONDS,
})
await logger?.debug("repo_live_status:write", {
repository: fullName,
repository: canonicalFullName,
...value,
})
}
71 changes: 63 additions & 8 deletions src/server/queue.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { createClient, type RedisClientType } from "redis"

import { REPO_PROCESSING_QUEUE_KEY, REPO_QUEUE_DEDUPE_PREFIX, REPO_QUEUE_DEDUPE_TTL_SECONDS, REPO_QUEUE_KEY } from "./constants.ts"
import { canonicalizeRepoFullName, repoIdentifierAliases } from "./repo-key.ts"

let client: RedisClientType | null = null

Expand Down Expand Up @@ -31,12 +32,41 @@ export async function getRedisClient(): Promise<RedisClientType> {
}

export function queueDedupeKey(fullName: string): string {
return `${REPO_QUEUE_DEDUPE_PREFIX}${fullName}`
return `${REPO_QUEUE_DEDUPE_PREFIX}${canonicalizeRepoFullName(fullName)}`
}

async function findStoredRepoEntries(redis: RedisClientType, fullName: string): Promise<string[]> {
const canonicalFullName = canonicalizeRepoFullName(fullName)
const [queued, processing] = await Promise.all([
redis.lRange(REPO_QUEUE_KEY, 0, -1),
redis.lRange(REPO_PROCESSING_QUEUE_KEY, 0, -1),
])

return Array.from(
new Set([
...queued.filter((entry) => canonicalizeRepoFullName(entry) === canonicalFullName),
...processing.filter((entry) => canonicalizeRepoFullName(entry) === canonicalFullName),
]),
)
}

async function loadStoredRepoAliases(redis: RedisClientType, fullName: string): Promise<string[]> {
return Array.from(new Set([...repoIdentifierAliases(fullName), ...(await findStoredRepoEntries(redis, fullName))]))
}

export async function enqueueRepoJob(fullName: string): Promise<boolean> {
const redis = await getRedisClient()
const key = queueDedupeKey(fullName)
const canonicalFullName = canonicalizeRepoFullName(fullName)
const existingEntries = await findStoredRepoEntries(redis, canonicalFullName)

if (existingEntries.length > 0) {
await redis.set(queueDedupeKey(canonicalFullName), "1", {
EX: REPO_QUEUE_DEDUPE_TTL_SECONDS,
})
return false
}

const key = queueDedupeKey(canonicalFullName)
const wasQueued = await redis.set(key, "1", {
NX: true,
EX: REPO_QUEUE_DEDUPE_TTL_SECONDS,
Expand All @@ -46,7 +76,7 @@ export async function enqueueRepoJob(fullName: string): Promise<boolean> {
return false
}

await redis.lPush(REPO_QUEUE_KEY, fullName)
await redis.lPush(REPO_QUEUE_KEY, canonicalFullName)
return true
}

Expand All @@ -59,14 +89,30 @@ export async function dequeueRepoJob(timeoutSeconds: number): Promise<string | n

export async function acknowledgeRepoJob(fullName: string): Promise<void> {
const redis = await getRedisClient()
await redis.multi().lRem(REPO_PROCESSING_QUEUE_KEY, 1, fullName).del(queueDedupeKey(fullName)).exec()
const aliases = await loadStoredRepoAliases(redis, fullName)
const tx = redis.multi()

for (const alias of aliases) {
tx.lRem(REPO_PROCESSING_QUEUE_KEY, 1, alias)
tx.del(queueDedupeKey(alias))
}

await tx.exec()
}

export async function requeueProcessingJob(fullName: string): Promise<void> {
const redis = await getRedisClient()
await redis.multi().lRem(REPO_PROCESSING_QUEUE_KEY, 1, fullName).lPush(REPO_QUEUE_KEY, fullName).exec()
}
const canonicalFullName = canonicalizeRepoFullName(fullName)
const aliases = await loadStoredRepoAliases(redis, fullName)
const tx = redis.multi()

for (const alias of aliases) {
tx.lRem(REPO_PROCESSING_QUEUE_KEY, 1, alias)
}

tx.lPush(REPO_QUEUE_KEY, canonicalFullName)
await tx.exec()
}

export async function listQueuedRepoJobs(): Promise<string[]> {
const redis = await getRedisClient()
Expand All @@ -75,10 +121,19 @@ export async function listQueuedRepoJobs(): Promise<string[]> {
redis.lRange(REPO_PROCESSING_QUEUE_KEY, 0, -1),
])

return Array.from(new Set([...queued, ...processing]))
return Array.from(new Set([...queued, ...processing].map((fullName) => canonicalizeRepoFullName(fullName))))
}

export async function dropRepoJob(fullName: string): Promise<void> {
const redis = await getRedisClient()
await redis.multi().lRem(REPO_QUEUE_KEY, 0, fullName).lRem(REPO_PROCESSING_QUEUE_KEY, 0, fullName).del(queueDedupeKey(fullName)).exec()
const aliases = await loadStoredRepoAliases(redis, fullName)
const tx = redis.multi()

for (const alias of aliases) {
tx.lRem(REPO_QUEUE_KEY, 0, alias)
tx.lRem(REPO_PROCESSING_QUEUE_KEY, 0, alias)
tx.del(queueDedupeKey(alias))
}

await tx.exec()
}
39 changes: 39 additions & 0 deletions src/server/repo-key.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
export type CanonicalRepoIdentity = {
owner: string
repo: string
fullName: string
githubUrl: string
}

function normalizeRepoSegment(value: string): string {
return value.trim().toLowerCase()
}

export function canonicalizeRepoIdentity(owner: string, repo: string): CanonicalRepoIdentity {
const canonicalOwner = normalizeRepoSegment(owner)
const canonicalRepo = normalizeRepoSegment(repo)
const fullName = `${canonicalOwner}/${canonicalRepo}`

return {
owner: canonicalOwner,
repo: canonicalRepo,
fullName,
githubUrl: `https://github.com/${fullName}`,
}
}

export function canonicalizeRepoFullName(fullName: string): string {
const [owner = "", repo = ""] = fullName.split("/", 2)

if (!owner || !repo) {
return fullName.trim().toLowerCase()
}

return canonicalizeRepoIdentity(owner, repo).fullName
}

export function repoIdentifierAliases(fullName: string): string[] {
const trimmed = fullName.trim()
const canonical = canonicalizeRepoFullName(trimmed)
return Array.from(new Set([trimmed, canonical].filter((value) => value.length > 0)))
}
42 changes: 27 additions & 15 deletions src/server/reports.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { AppError } from "../core/errors.ts"
import { serializeJsonSafely } from "../core/json.ts"
import type { FinalReport } from "../core/types.ts"
import { query } from "./database.ts"
import { canonicalizeRepoFullName, canonicalizeRepoIdentity } from "./repo-key.ts"

export type RepoRetryState = "none" | "retrying" | "terminal"

Expand Down Expand Up @@ -38,6 +39,7 @@ function appendFailureHistorySql(state: RepoRetryState, messagePlaceholder: stri
}

export async function getRepoRecord(fullName: string): Promise<RepoRecord | null> {
const canonicalFullName = canonicalizeRepoFullName(fullName)
const rows = await query<RepoRecord>(
`select
full_name,
Expand All @@ -60,16 +62,17 @@ export async function getRepoRecord(fullName: string): Promise<RepoRecord | null
created_at,
updated_at
from repo_reports
where full_name = $1`,
[fullName],
where lower(full_name) = lower($1)
order by case when full_name = $1 then 0 else 1 end, updated_at desc
limit 1`,
[canonicalFullName],
)

return rows[0] ?? null
}

export async function touchQueuedRepo(owner: string, repo: string, queuedNow: boolean): Promise<void> {
const fullName = `${owner}/${repo}`
const githubUrl = `https://github.com/${fullName}`
const canonical = canonicalizeRepoIdentity(owner, repo)

await query(
`insert into repo_reports (
Expand Down Expand Up @@ -103,23 +106,27 @@ export async function touchQueuedRepo(owner: string, repo: string, queuedNow: bo
last_error_message = case when $5 = true then null else repo_reports.last_error_message end,
failure_history = case when $5 = true then '[]'::jsonb else repo_reports.failure_history end,
updated_at = now()`,
[fullName, owner, repo, githubUrl, queuedNow],
[canonical.fullName, canonical.owner, canonical.repo, canonical.githubUrl, queuedNow],
)
}

export async function markRepoProcessing(fullName: string): Promise<void> {
const canonicalFullName = canonicalizeRepoFullName(fullName)

await query(
`update repo_reports
set status = 'processing',
processing_started_at = coalesce(processing_started_at, now()),
error_message = null,
updated_at = now()
where full_name = $1`,
[fullName],
where lower(full_name) = lower($1)`,
[canonicalFullName],
)
}

export async function markRepoRetrying(fullName: string, retryCount: number, nextRetryAt: string, errorMessage: string): Promise<void> {
const canonicalFullName = canonicalizeRepoFullName(fullName)

await query(
`update repo_reports
set status = 'processing',
Expand All @@ -131,12 +138,13 @@ export async function markRepoRetrying(fullName: string, retryCount: number, nex
error_message = $4,
failure_history = ${appendFailureHistorySql("retrying", "$4")},
updated_at = now()
where full_name = $1`,
[fullName, retryCount, nextRetryAt, errorMessage],
where lower(full_name) = lower($1)`,
[canonicalFullName, retryCount, nextRetryAt, errorMessage],
)
}

export async function markRepoReady(report: FinalReport): Promise<void> {
const canonicalFullName = canonicalizeRepoFullName(report.repository.fullName)
const serialized = serializeJsonSafely(report)

if (serialized.sanitizedPaths.length > 0) {
Expand All @@ -156,8 +164,8 @@ export async function markRepoReady(report: FinalReport): Promise<void> {
next_retry_at = null,
last_error_message = null,
updated_at = now()
where full_name = $1`,
[report.repository.fullName, serialized.json],
where lower(full_name) = lower($1)`,
[canonicalFullName, serialized.json],
)
} catch (error) {
if (error instanceof Error && /invalid input syntax for type json/i.test(error.message)) {
Expand All @@ -176,6 +184,8 @@ export async function markRepoReady(report: FinalReport): Promise<void> {
}

export async function markRepoFailedTerminal(fullName: string, retryCount: number, errorMessage: string): Promise<void> {
const canonicalFullName = canonicalizeRepoFullName(fullName)

await query(
`update repo_reports
set status = 'failed',
Expand All @@ -187,12 +197,14 @@ export async function markRepoFailedTerminal(fullName: string, retryCount: numbe
error_message = $3,
failure_history = ${appendFailureHistorySql("terminal", "$3")},
updated_at = now()
where full_name = $1`,
[fullName, retryCount, errorMessage],
where lower(full_name) = lower($1)`,
[canonicalFullName, retryCount, errorMessage],
)
}

export async function markRepoQueued(fullName: string): Promise<void> {
const canonicalFullName = canonicalizeRepoFullName(fullName)

await query(
`update repo_reports
set status = 'queued',
Expand All @@ -206,7 +218,7 @@ export async function markRepoQueued(fullName: string): Promise<void> {
last_error_message = null,
failure_history = '[]'::jsonb,
updated_at = now()
where full_name = $1`,
[fullName],
where lower(full_name) = lower($1)`,
[canonicalFullName],
)
}
Loading