Skip to content

Commit 0d8bc16

Browse files
timursevimlitshemsedinov
authored andcommitted
Add pipeline usage example for queue on streams
1 parent af7f745 commit 0d8bc16

File tree

1 file changed

+88
-0
lines changed

1 file changed

+88
-0
lines changed

JavaScript/7-pipeline.js

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
'use strict';
2+
3+
const { Readable, Writable, Transform, pipeline } = require('node:stream');
4+
5+
class QueueStream extends Readable {
6+
constructor(concurrent) {
7+
super({ objectMode: true });
8+
this.concurrent = concurrent;
9+
this.count = 0;
10+
this.queue = [];
11+
}
12+
13+
static channels(concurrent) {
14+
return new QueueStream(concurrent);
15+
}
16+
17+
add(task) {
18+
this.queue.push(task);
19+
}
20+
21+
_read() {
22+
while (this.count < this.concurrent && this.queue.length > 0) {
23+
const task = this.queue.shift();
24+
this.count++;
25+
this.onProcess(task, (err, res) => {
26+
if (err) this.emit('error', err);
27+
this.push({ err, res });
28+
this.count--;
29+
});
30+
}
31+
if (this.queue.length === 0 && this.count === 0) {
32+
this.push(null);
33+
}
34+
}
35+
36+
process(listener) {
37+
this.onProcess = listener;
38+
return this;
39+
}
40+
}
41+
42+
// Usage
43+
44+
const fs = require('node:fs');
45+
46+
const fileWStream = fs.createWriteStream('./tasks.txt');
47+
48+
const stringifyStream = new Transform({
49+
readableObjectMode: true,
50+
writableObjectMode: true,
51+
write(data, encoding, next) {
52+
const result = JSON.stringify(data);
53+
this.push(result + '\n');
54+
next();
55+
},
56+
});
57+
58+
const writable = new Writable({
59+
objectMode: true,
60+
write(data, encoding, next) {
61+
console.log({ data });
62+
next();
63+
}
64+
});
65+
66+
const job = (task, next) => {
67+
setTimeout(next, task.interval, null, task);
68+
};
69+
70+
const queue = QueueStream.channels(3).process(job);
71+
72+
pipeline(queue, stringifyStream, fileWStream, (err) => {
73+
if (err) throw err;
74+
console.log('pipeline done');
75+
});
76+
77+
queue.pipe(writable);
78+
// queue.on('data', (data) => void console.log(data));
79+
80+
queue.on('end', () => void console.log('tasks end'));
81+
queue.on('close', () => void console.log('stream closed'));
82+
83+
writable.on('finish', () => void console.log('writable finished'));
84+
85+
for (let i = 0; i < 20; i++) {
86+
if (i < 10) queue.add({ name: `Task${i}`, interval: 1000 });
87+
else queue.add({ name: `Task${i}`, interval: i * 100 });
88+
}

0 commit comments

Comments
 (0)