Skip to content

Commit f0e24cd

Browse files
authored
fix: Dont await on table resovler (#283)
~~Might fix~~ Fixes cloudquery/cloudquery#20910
1 parent 3db7f73 commit f0e24cd

File tree

3 files changed

+61
-30
lines changed

3 files changed

+61
-30
lines changed

package-lock.json

Lines changed: 23 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@
101101
"modern-errors": "^7.0.0",
102102
"modern-errors-bugs": "^5.0.0",
103103
"p-map": "^7.0.0",
104+
"p-queue": "^8.1.0",
104105
"p-timeout": "^6.1.2",
105106
"path-exists": "^5.0.0",
106107
"path-type": "^6.0.0",

src/scheduler/scheduler.ts

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { Duplex } from 'node:stream';
22

33
import pMap from 'p-map';
4+
import pQueue from 'p-queue';
45
import pTimeout from 'p-timeout';
56
import type { Logger } from 'winston';
67

@@ -39,26 +40,22 @@ export enum Strategy {
3940
}
4041

4142
class TableResolverStream extends Duplex {
42-
queue: unknown[] = [];
43-
4443
constructor() {
4544
super({ objectMode: true });
4645
}
4746

48-
_read() {
49-
while (this.queue.length > 0) {
50-
this.push(this.queue.shift());
51-
}
52-
if (this.writableEnded) {
53-
// end readable stream if writable stream has ended
54-
this.push(null);
55-
}
56-
}
47+
_read() {}
5748

5849
_write(chunk: unknown, _: string, next: (error?: Error | null) => void) {
59-
this.queue.push(chunk);
50+
this.emit('data', chunk);
6051
next();
6152
}
53+
54+
end(callback?: () => void): this {
55+
this.emit('end');
56+
callback?.();
57+
return this;
58+
}
6259
}
6360

6461
const validateResource = (resource: Resource) => {
@@ -92,20 +89,8 @@ const resolveTable = async (
9289
) => {
9390
logger.info(`resolving table ${table.name}`);
9491
const stream = new TableResolverStream();
95-
try {
96-
await table.resolver(client, parent, stream);
97-
} catch (error) {
98-
const tableError = new SyncTableResolveError(`error resolving table ${table.name}`, {
99-
cause: error,
100-
props: { table, client },
101-
});
102-
logger.error(`error resolving table ${table.name}`, tableError);
103-
return;
104-
} finally {
105-
stream.end();
106-
}
10792

108-
for await (const data of stream) {
93+
const processData = async (data: unknown) => {
10994
logger.debug(`resolving resource for table ${table.name}`);
11095
const resolveResourceTimeout = 10 * 60 * 1000;
11196
const resource = new Resource(table, parent, data);
@@ -118,7 +103,7 @@ const resolveTable = async (
118103
props: { resource, table, client },
119104
});
120105
logger.error(preResolverError);
121-
continue;
106+
return;
122107
}
123108

124109
try {
@@ -128,7 +113,7 @@ const resolveTable = async (
128113
await pTimeout(allColumnsPromise, { milliseconds: resolveResourceTimeout });
129114
} catch (error) {
130115
logger.error(`error resolving columns for table ${table.name}`, error);
131-
continue;
116+
return;
132117
}
133118

134119
try {
@@ -139,7 +124,7 @@ const resolveTable = async (
139124
props: { resource, table, client },
140125
});
141126
logger.error(postResolveError);
142-
continue;
127+
return;
143128
}
144129

145130
setCQId(resource, deterministicCQId);
@@ -148,7 +133,7 @@ const resolveTable = async (
148133
validateResource(resource);
149134
} catch (error) {
150135
logger.error(error);
151-
continue;
136+
return;
152137
}
153138

154139
try {
@@ -161,14 +146,36 @@ const resolveTable = async (
161146
},
162147
});
163148
logger.error(encodeError);
164-
continue;
149+
return;
165150
}
166151

167152
logger.debug(`done resolving resource for table ${table.name}`);
168153

169154
await pMap(table.relations, (child) =>
170155
resolveTable(logger, client, child, resource, syncStream, deterministicCQId),
171156
);
157+
};
158+
159+
const queue = new pQueue({ concurrency: 5 });
160+
161+
stream.on('data', async (data) => {
162+
await queue.add(() => processData(data));
163+
});
164+
165+
const resolverPromise = table.resolver(client, parent, stream);
166+
167+
try {
168+
await resolverPromise;
169+
} catch (error) {
170+
const tableError = new SyncTableResolveError(`error resolving table ${table.name}`, {
171+
cause: error,
172+
props: { table, client },
173+
});
174+
logger.error(`error resolving table ${table.name}`, tableError);
175+
return;
176+
} finally {
177+
stream.end();
178+
await queue.onIdle();
172179
}
173180

174181
logger.info(`done resolving table ${table.name}`);

0 commit comments

Comments
 (0)