Skip to content

Commit af7f745

Browse files
timursevimlitshemsedinov
authored andcommitted
Add concurrent queue example on readable streams
1 parent 25da948 commit af7f745

File tree

1 file changed

+60
-0
lines changed

1 file changed

+60
-0
lines changed

JavaScript/6-stream.js

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
'use strict';
2+
3+
const { Readable } = require('node:stream');
4+
5+
class QueueStream extends Readable {
6+
constructor(concurrent) {
7+
super({ objectMode: true });
8+
this.concurrent = concurrent;
9+
this.count = 0;
10+
this.queue = [];
11+
}
12+
13+
static channels(concurrent) {
14+
return new QueueStream(concurrent);
15+
}
16+
17+
add(task) {
18+
this.queue.push(task);
19+
}
20+
21+
_read() {
22+
while (this.count < this.concurrent && this.queue.length > 0) {
23+
const task = this.queue.shift();
24+
this.count++;
25+
this.onProcess(task, (err, res) => {
26+
if (err) this.emit('error', err);
27+
this.push({ err, res });
28+
this.count--;
29+
});
30+
}
31+
if (this.queue.length === 0 && this.count === 0) {
32+
this.push(null);
33+
}
34+
}
35+
36+
process(listener) {
37+
this.onProcess = listener;
38+
return this;
39+
}
40+
}
41+
42+
// Usage
43+
44+
const job = (task, next) => {
45+
setTimeout(next, task.interval, null, task);
46+
};
47+
48+
const queue = QueueStream.channels(3).process(job);
49+
50+
for (let i = 0; i < 20; i++) {
51+
if (i < 10) queue.add({ name: `Task${i}`, interval: 1000 });
52+
else queue.add({ name: `Task${i}`, interval: i * 100 });
53+
}
54+
55+
queue.on('data', (data) => void console.log(data));
56+
queue.on('end', () => void console.log('Tasks end'));
57+
queue.on('close', () => void console.log('Stream closed'));
58+
queue.on('error', (err) => {
59+
throw err;
60+
});

0 commit comments

Comments
 (0)