Skip to content

Commit 1b145a7

Browse files
committed
chore: working on chapter 13
1 parent 96a8840 commit 1b145a7

File tree

11 files changed

+244
-3
lines changed

11 files changed

+244
-3
lines changed

13-messaging-and-integration-patterns/04-pubsub-amqp/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ If you have docker installed, you can run an ephemeral instance of RabbitMQ with
1212
the following command:
1313

1414
```bash
15-
docker run -it -p 5672:5672 --hostname my-rabbit rabbitmq:3
15+
docker run -it -p 5672:5672 --hostname my-rabbit rabbitmq:4
1616
```
1717

1818
This example requires you to install some third-party dependencies from npm.

13-messaging-and-integration-patterns/06-task-distribution-zmq/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# 05-pubsub-redis-streams
1+
# 06-task-distribution-zmq
22

33
This sample demonstrates how to run parallel tasks using a messaging pipeline
44
and ZeroMQ

13-messaging-and-integration-patterns/06-task-distribution-zmq/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"name": "05-pubsub-redis-streams",
2+
"name": "06-task-distribution-zmq",
33
"version": "1.0.0",
44
"description": "This sample demonstrates how to run parallel tasks using a messaging pipeline and ZeroMQ",
55
"type": "module",
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# 07-task-distribution-amqp
2+
3+
This sample demonstrates how to distribute tasks to a set of remote workers
4+
using RabbitMQ and AMQP.
5+
6+
## Dependencies
7+
8+
As pre-requisite to this sample, you first need to
9+
[install RabbitMQ](http://www.rabbitmq.com/download.html)
10+
11+
If you have docker installed, you can run an ephemeral instance of RabbitMQ with
12+
the following command:
13+
14+
```bash
15+
docker run -it -p 5672:5672 --hostname my-rabbit rabbitmq:4
16+
```
17+
18+
This example requires you to install some third-party dependencies from npm.
19+
20+
If you have `pnpm` installed, you can do that with:
21+
22+
```bash
23+
pnpm install
24+
```
25+
26+
Alternatively, if you prefer to use another package manager, make sure to delete
27+
the `pnpm-lock.yaml` file before using it.
28+
29+
If you want to use `npm`, you can run:
30+
31+
```bash
32+
npm install
33+
```
34+
35+
If you want to use `yarn`, you can run:
36+
37+
```bash
38+
yarn install
39+
```
40+
41+
## Run
42+
43+
To run the various components, run in different terminals the following
44+
commands:
45+
46+
```bash
47+
node worker.js # runs a worker that will process tasks
48+
node worker.js # runs a second worker that will process tasks (you can run as many as you want)
49+
node collector.js # runs a collector that will collect results from the workers
50+
node producer.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b # runs a producer that will send tasks to the workers
51+
```
52+
53+
> [!TIP] If you want to test other hashes, you can generate them with the
54+
> following code:
55+
>
56+
> ```js
57+
> import { createHash } from "node:crypto";
58+
> console.log(createHash("sha1").update("your-string-here").digest("hex"));
59+
> ```
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import amqp from 'amqplib' // v0.10.8
2+
3+
const connection = await amqp.connect('amqp://localhost')
4+
const channel = await connection.createChannel()
5+
const { queue } = await channel.assertQueue('results_queue')
6+
7+
channel.consume(queue, async msg => {
8+
console.log(`Message from worker: ${msg.content.toString()}`)
9+
await channel.ack(msg)
10+
})
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
export function* generateTasks(searchHash, alphabet, maxWordLength, batchSize) {
2+
const alphabetLength = BigInt(alphabet.length)
3+
const maxWordLengthBigInt = BigInt(maxWordLength)
4+
let nVariations = 0n
5+
for (let n = 1n; n <= maxWordLengthBigInt; n++) {
6+
nVariations += alphabetLength ** n
7+
}
8+
console.log(
9+
`Finding the hashsum source string over ${nVariations} possible variations`
10+
)
11+
12+
let batchStart = 1n
13+
while (batchStart <= nVariations) {
14+
const expectedBatchSize = batchStart + BigInt(batchSize) - 1n
15+
const batchEnd =
16+
expectedBatchSize > nVariations ? nVariations : expectedBatchSize
17+
yield JSON.stringify({
18+
searchHash,
19+
alphabet: alphabet,
20+
// convert BigInt to string for JSON serialization
21+
batchStart: batchStart.toString(),
22+
batchEnd: batchEnd.toString(),
23+
})
24+
25+
batchStart = batchEnd + 1n
26+
}
27+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
{
2+
"name": "07-task-distribution-amqp",
3+
"version": "1.0.0",
4+
"description": "This sample demonstrates how to distribute tasks to a set of remote workers using RabbitMQ and AMQP",
5+
"type": "module",
6+
"scripts": {},
7+
"engines": {
8+
"node": ">=24"
9+
},
10+
"engineStrict": true,
11+
"keywords": [],
12+
"author": "Luciano Mammino and Mario Casciaro",
13+
"license": "MIT",
14+
"dependencies": {
15+
"indexed-string-variation": "^2.1.0",
16+
"amqplib": "^0.10.8"
17+
}
18+
}

13-messaging-and-integration-patterns/07-task-distribution-amqp/pnpm-lock.yaml

Lines changed: 58 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import { createHash } from 'node:crypto'
2+
import isv from 'indexed-string-variation' // v2.0.1
3+
4+
export function processTask(task) {
5+
const strings = isv({
6+
alphabet: task.alphabet,
7+
from: BigInt(task.batchStart),
8+
to: BigInt(task.batchEnd),
9+
})
10+
11+
let first
12+
let last
13+
for (const string of strings) {
14+
if (!first) {
15+
first = string
16+
}
17+
18+
const digest = createHash('sha1').update(string).digest('hex')
19+
20+
if (digest === task.searchHash) {
21+
console.log(`>> Found: ${string} => ${digest}`)
22+
return string
23+
}
24+
last = string
25+
}
26+
console.log(
27+
`Processed ${first}..${last} (${task.batchStart}..${task.batchEnd})`
28+
)
29+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import amqp from 'amqplib' // v0.10.8
2+
import { generateTasks } from './generateTasks.js'
3+
4+
const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
5+
const BATCH_SIZE = 10000
6+
7+
const [, , maxLength, searchHash] = process.argv
8+
9+
const connection = await amqp.connect('amqp://localhost')
10+
const channel = await connection.createConfirmChannel()
11+
await channel.assertQueue('tasks_queue')
12+
13+
const generatorObj = generateTasks(searchHash, ALPHABET, maxLength, BATCH_SIZE)
14+
for (const task of generatorObj) {
15+
console.log(`Sending task: ${task}`)
16+
await channel.sendToQueue('tasks_queue', Buffer.from(task))
17+
}
18+
19+
await channel.waitForConfirms()
20+
channel.close()
21+
connection.close()

0 commit comments

Comments
 (0)