-
Notifications
You must be signed in to change notification settings - Fork 55
feat: Implement queue-based async DB export pipeline #188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b2e5ee7
b77cdd9
dc574fc
337809f
513a445
fff9e5d
1e0c4f0
18d56d9
8720a97
8365bc2
f239af0
7bbd7e7
e6bbf40
a11ebe3
9af3d7a
5fcc31e
8d27faf
e02ac5f
349d3c8
6308ad3
a9fc502
bc58f42
8652211
6c1d483
762dd1d
5503a95
87dc102
30e0182
91c0983
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| FROM node:22-alpine | ||
|
|
||
| WORKDIR /app | ||
|
|
||
| # Copy root package files | ||
| COPY package.json package-lock.json ./ | ||
|
|
||
| # Copy workspace package.json files | ||
| COPY packages/common/package.json ./packages/common/ | ||
| COPY apps/consumer/package.json ./apps/consumer/ | ||
|
|
||
| RUN npm ci | ||
|
|
||
| # Copy actual source code | ||
| COPY packages/common ./packages/common | ||
| COPY apps/consumer ./apps/consumer | ||
|
|
||
| WORKDIR /app/apps/consumer | ||
|
|
||
| CMD ["npm", "run", "start"] | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| { | ||
| "name": "consumer", | ||
| "version": "1.0.0", | ||
| "description": "", | ||
| "main": "src/app.js", | ||
| "scripts": { | ||
| "dev": "node src/index.js", | ||
| "start": "node src/index.js", | ||
| "test": "jest --testPathPatterns=src/" | ||
| }, | ||
| "keywords": [], | ||
| "author": "", | ||
| "license": "ISC", | ||
| "type": "commonjs" | ||
| } | ||
|
Comment on lines
+1
to
+15
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. π§© Analysis chainπ Script executed: #!/bin/bash
set -euo pipefail
# Show consumer manifest
echo "---- apps/consumer/package.json ----"
cat -n apps/consumer/package.json
# Identify import/require usage in consumer src
echo "---- imports in apps/consumer/src ----"
rg -n --no-heading "(require\\(|from\\s+['\"]|import\\s+['\"])\\s*('dotenv'|\"dotenv\"|dotenv)|\\bbullmq\\b|`@urbackend/common`" apps/consumer/src || true
# Show consumer src key files referenced
echo "---- list apps/consumer/src ----"
find apps/consumer/src -maxdepth 2 -type f -print
# Inspect monorepo root package.json for workspace/hoisting rules
echo "---- root package.json ----"
ls -1
if [ -f package.json ]; then cat -n package.json; fi
# If a package manager config exists (pnpm/yarn/npm), show workspace relevant sections
for f in pnpm-workspace.yaml yarn.lock package-lock.json pnpm-lock.yaml .npmrc .yarnrc.yml; do
if [ -e "$f" ]; then
echo "---- $f ----"
head -n 80 "$f" || true
fi
doneRepository: geturbackend/urBackend Length of output: 4313 π Script executed: #!/bin/bash
set -euo pipefail
# Show consumer manifest
echo "---- apps/consumer/package.json ----"
cat -n apps/consumer/package.json
# Identify import/require usage in consumer src
echo "---- imports in apps/consumer/src ----"
rg -n --no-heading "(require\\(|from\\s+['\"]|import\\s+['\"])\\s*('dotenv'|\"dotenv\"|dotenv)|\\bbullmq\\b|`@urbackend/common`" apps/consumer/src || true
# Show consumer src key files referenced
echo "---- list apps/consumer/src ----"
find apps/consumer/src -maxdepth 2 -type f -print
# Inspect monorepo root package.json for workspace/hoisting rules
echo "---- root package.json ----"
ls -1
if [ -f package.json ]; then cat -n package.json; fi
# If a package manager config exists (pnpm/yarn/npm), show workspace relevant sections
for f in pnpm-workspace.yaml yarn.lock package-lock.json pnpm-lock.yaml .npmrc .yarnrc.yml; do
if [ -e "$f" ]; then
echo "---- $f ----"
head -n 80 "$f" || true
fi
doneRepository: geturbackend/urBackend Length of output: 4313 Declare
π€ Prompt for AI Agents |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| const dotenv = require('dotenv'); | ||
| dotenv.config({ path: require('path').join(__dirname, '../../../.env') }); | ||
|
|
||
| const { validateEnv } = require('@urbackend/common'); | ||
|
|
||
| if (process.env.NODE_ENV !== 'test') { | ||
| validateEnv(); | ||
| } | ||
|
|
||
| const { initExportWorker } = require('./workers/export.worker'); | ||
|
|
||
| const { connectDB } = require('@urbackend/common'); | ||
|
|
||
| (async () => { | ||
| try { | ||
| await connectDB(); | ||
|
|
||
| const worker = initExportWorker(); | ||
|
|
||
| console.log('[CONSUMER] Export worker started and listening for jobs...'); | ||
|
|
||
| const shutdown = async () => { | ||
| console.log('Shutting down worker...'); | ||
| await worker.close(); | ||
| process.exit(0); | ||
| }; | ||
|
|
||
| process.on('SIGINT', shutdown); | ||
| process.on('SIGTERM', shutdown); | ||
|
|
||
| } catch (err) { | ||
| console.error('Failed to start worker:', err); | ||
| process.exit(1); | ||
| } | ||
| })(); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,161 @@ | ||
| const { Worker } = require('bullmq'); | ||
| const { PassThrough } = require('stream'); | ||
| const fs = require('fs'); | ||
| const os = require('os'); | ||
| const path = require('path'); | ||
| const { GetObjectCommand } = require('@aws-sdk/client-s3'); | ||
| const { getSignedUrl } = require('@aws-sdk/s3-request-presigner'); | ||
|
|
||
| const { | ||
| redis, | ||
| exportQueue, | ||
| emailQueue, | ||
| Project, | ||
| getConnection, | ||
| getCompiledModel, | ||
| getS3CompatibleStorage, | ||
| getStorage, | ||
| decrypt, | ||
| getBucket | ||
| } = require('@urbackend/common'); | ||
|
|
||
| const initExportWorker = () => { | ||
| const worker = new Worker(exportQueue.name, async (job) => { | ||
| const { projectId, collectionName, userId, email } = job.data; | ||
| console.log(`[ExportWorker] Starting export for collection ${collectionName} in project ${projectId} requested by ${email}`); | ||
|
|
||
| const project = await Project.findById(projectId); | ||
| if (!project) throw new Error('Project not found'); | ||
|
|
||
| const col = project.collections.find(c => c.name === collectionName); | ||
| if (!col) throw new Error(`Collection ${collectionName} not found`); | ||
|
|
||
| const connection = await getConnection(projectId); | ||
| const bucket = getBucket(project); | ||
| const storagePath = `${projectId}/exports/${collectionName}_export_${Date.now()}.json`; | ||
|
|
||
| let provider = 'supabase'; | ||
| if (project.resources?.storage?.isExternal) { | ||
| try { | ||
| const decrypted = decrypt(project.resources.storage.config); | ||
| const config = JSON.parse(decrypted); | ||
| provider = config.storageProvider || 'supabase'; | ||
| } catch (err) { | ||
| console.error("[ExportWorker] Error decrypting storage config:", err); | ||
| } | ||
| } | ||
|
|
||
| const client = await getStorage(project); | ||
|
|
||
| console.log(`[ExportWorker] Preparing upload to storage (Provider: ${provider})...`); | ||
|
|
||
| if (provider === 'supabase') { | ||
| const tempFilePath = path.join(os.tmpdir(), `export_${projectId}_${collectionName}_${Date.now()}.json`); | ||
| const writeStream = fs.createWriteStream(tempFilePath); | ||
|
|
||
| try { | ||
| writeStream.write('{\n'); | ||
| const Model = getCompiledModel(connection, col, projectId, project.resources.db.isExternal); | ||
|
|
||
| writeStream.write(` "${col.name}": [\n`); | ||
|
|
||
| const cursor = Model.find().lean().cursor(); | ||
| let first = true; | ||
|
|
||
| for await (const doc of cursor) { | ||
| if (!first) writeStream.write(',\n'); | ||
| writeStream.write(` ${JSON.stringify(doc)}`); | ||
| first = false; | ||
| } | ||
|
|
||
| writeStream.write('\n ]\n'); | ||
| writeStream.write('}\n'); | ||
| writeStream.end(); | ||
|
|
||
| await new Promise((resolve, reject) => { | ||
| writeStream.on('finish', resolve); | ||
| writeStream.on('error', reject); | ||
| }); | ||
|
|
||
| console.log(`[ExportWorker] Temp file created, uploading...`); | ||
| const fileBuffer = fs.readFileSync(tempFilePath); | ||
|
|
||
| const { error } = await client.storage.from(bucket).upload(storagePath, fileBuffer, { | ||
| contentType: 'application/json' | ||
| }); | ||
|
|
||
| if (error) throw error; | ||
| } finally { | ||
| if (fs.existsSync(tempFilePath)) { | ||
| fs.unlinkSync(tempFilePath); | ||
| } | ||
| } | ||
|
|
||
| } else if (provider === 's3' || provider === 'cloudflare_r2') { | ||
| const passThrough = new PassThrough(); | ||
|
|
||
| // Start the upload promise in parallel using the getStorage client | ||
| const uploadPromise = client.storage.from(bucket).upload(storagePath, passThrough, { | ||
| contentType: 'application/json' | ||
| }); | ||
|
|
||
| try { | ||
| passThrough.write('{\n'); | ||
|
|
||
| const Model = getCompiledModel(connection, col, projectId, project.resources.db.isExternal); | ||
|
|
||
| passThrough.write(` "${col.name}": [\n`); | ||
|
|
||
| const cursor = Model.find().lean().cursor(); | ||
| let first = true; | ||
|
|
||
| for await (const doc of cursor) { | ||
| if (!first) passThrough.write(',\n'); | ||
| passThrough.write(` ${JSON.stringify(doc)}`); | ||
| first = false; | ||
| } | ||
|
|
||
| passThrough.write('\n ]\n'); | ||
|
|
||
| passThrough.write('}\n'); | ||
| passThrough.end(); | ||
|
|
||
| console.log(`[ExportWorker] Database stream ended. Awaiting final storage upload...`); | ||
| const { error } = await uploadPromise; | ||
| if (error) throw error; | ||
| } catch (error) { | ||
| passThrough.destroy(error); | ||
| throw error; | ||
| } | ||
| } else { | ||
| throw new Error(`Unknown storage provider: ${provider}`); | ||
| } | ||
|
|
||
| let downloadUrl; | ||
| if (provider === 'supabase') { | ||
| const { data, error } = await client.storage.from(bucket).createSignedUrl(storagePath, 86400); | ||
| if (error) throw error; | ||
| downloadUrl = data?.signedUrl; | ||
| } else { | ||
| const { s3Client } = await getS3CompatibleStorage(project); | ||
| const command = new GetObjectCommand({ Bucket: bucket, Key: storagePath }); | ||
| downloadUrl = await getSignedUrl(s3Client, command, { expiresIn: 86400 }); | ||
| } | ||
|
|
||
| // queue the email to be sent to the user | ||
| await emailQueue.add('send-export-email', { email, downloadUrl, projectName: project.name }); | ||
| console.log(`[ExportWorker] Export completed! Email queued for ${email}`); | ||
| }, { connection: redis, concurrency: 2 }); | ||
|
|
||
| worker.on('completed', (job) => { | ||
| console.log(`[ExportWorker] Job ${job.id} for project ${job.data.projectId} completed.`); | ||
| }); | ||
|
|
||
| worker.on('failed', (job, err) => { | ||
| console.error(`[ExportWorker] Job ${job?.id} for project ${job?.data?.projectId} failed:`, err.message); | ||
| }); | ||
|
|
||
| return worker; | ||
| }; | ||
|
|
||
| module.exports = { initExportWorker }; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,66 @@ | ||
| const { AppError } = require('@urbackend/common'); | ||
| const { Developer } = require('@urbackend/common'); | ||
| const { Project } = require('@urbackend/common'); | ||
| const { exportQueue } = require('@urbackend/common'); | ||
| const { redis } = require('@urbackend/common'); | ||
| const { getProjectById, setProjectById } = require('@urbackend/common'); | ||
|
|
||
| module.exports.dbExportHandler = async (req, res, next) => { | ||
| try { | ||
| const { projectId, collectionName } = req.params; | ||
| const { _id: userId } = req.user; | ||
|
|
||
| let project = await getProjectById(projectId); | ||
| if (!project) { | ||
| project = await Project.findById(projectId).lean(); | ||
| if (!project) { | ||
| return next(new AppError(404, "Project not found.")); | ||
| } | ||
| await setProjectById(projectId, project); | ||
| } | ||
|
|
||
| if (project.owner.toString() !== userId.toString()) { | ||
| return next(new AppError(403, "Access denied. You are not the owner of this project.")); | ||
| } | ||
|
|
||
| if (!project.collections.some(c => c.name === collectionName)) { | ||
| return next(new AppError(404, "Collection not found in project.")); | ||
| } | ||
|
|
||
|
|
||
| const developer = await Developer.findById(userId).select('email plan').lean(); | ||
| if (!developer) { | ||
| return next(new AppError(404, "Authenticated developer not found.")); | ||
| } | ||
| const { email, plan = 'free' } = developer; | ||
|
|
||
| console.log(`[Dashboard API] Received export request for collection ${collectionName} in project ${projectId} from user ${userId} (${email})`); | ||
|
|
||
|
|
||
| const maxExports = plan === 'pro' ? 5 : 1; | ||
| const today = new Date().toISOString().split('T')[0]; | ||
| const key = `project:${projectId}:export_limit:${today}`; | ||
|
|
||
| const currentCount = await redis.get(key); | ||
| if (currentCount && Number(currentCount) >= maxExports) { | ||
| return next(new AppError(429, `Daily export limit reached (${maxExports}/${maxExports}). Please try again tomorrow.`)); | ||
| } | ||
|
|
||
| const newCount = await redis.incr(key); | ||
| if (newCount === 1) { | ||
| await redis.expire(key, 86400); // Set expiry to 24 hours | ||
| } | ||
|
yash-pouranik marked this conversation as resolved.
|
||
|
|
||
| await exportQueue.add('export-database', { projectId, collectionName, userId, email }); | ||
|
|
||
| return res.status(202).json({ | ||
| success: true, | ||
| data: {}, | ||
| message: `Collection export request received. You will receive an email with a download link shortly. Usage today: ${newCount}/${maxExports}.`, | ||
| }); | ||
|
yash-pouranik marked this conversation as resolved.
yash-pouranik marked this conversation as resolved.
yash-pouranik marked this conversation as resolved.
yash-pouranik marked this conversation as resolved.
|
||
|
|
||
| } catch (err) { | ||
| console.error("[Dashboard API] Error handling export request for project - ", req.params.projectId, ": ", err); | ||
| return next(new AppError(500, err.message || "Failed to initiate database export.")); | ||
| } | ||
| }; | ||
Uh oh!
There was an error while loading. Please reload this page.