rabbitmq tutorial Hello World

This tutorials is based on the official document in RabbitMQ

prerequisites: RabbitMQ is installed and running on localhost:5672

Work Queues

work_queues

Targets:

  1. In first tutorials we wrote pragrams to send and recieve messages from a named queues.
  2. In this one we will create a Work queues that will be used to distribute time-consuming tasks amount multiple workers.

When should I use it?

doing a resource-intensive task immediately and having to wait for it to complete. Instead, we schedule the task to be done later

Prepairation

  1. We’ll take the number of dots in the string as its complexity; every dot will account for one second of “work”. For example, a fake task described by Hello… will take three seconds.
  2. So we create a new_task.js to allow arbitrary messages to be sent from the command line. This program will schedule tasks to our work queue.

Steps:

  1. change sender to new_task:

    1
    2
    3
    4
    5
    var msg = process.argv.slice(2).join(' ') || "Hello World!";  //用户输入参数,用用户给的,否则用hello World

    ch.assertQueue(q, {durable: true});
    ch.sendToQueue(q, new Buffer(msg), {persistent: true});
    console.log(" [x] Sent '%s'", msg);
  2. change receive.js to worker.js, it needs to fake a second of work for every dot in the message body.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    ch.assertQueue(q, {durable: true});//note that durable: true, it can allow 'thread' keep alive
    console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
    // ch.consume(q, function(msg) {
    // console.log(" [x] Received %s", msg.content.toString());
    // }, {noAck:allow
    ch.consume(q, function(msg) {
    var secs = msg.content.toString().split('.').length - 1;
    console.log(" [x] Received %s", msg.content.toString());
    setTimeout(function() {
    console.log(" [x] Done");
    }, secs * 1000);
    }, {noAck: true});
  3. run script in three terminal:
    two or nore workders run:

    1
    shell$ nodejs worker.js

    one new_task run:

    1
    shell$ nodejs new_task.js
  4. By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin. Try this out with three or more workers.

Message acknowledgment

  • If you kill a worker we will lose the message it was just processing. We’ll also lose all the messages that were dispatched to this particular worker but were not yet handled. we don’t want to lose any tasks. If a worker dies, We’d like the task to be delivered to another worker.
  • In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back from the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    ch.consume(q, function(msg) {
    var secs = msg.content.toString().split('.').length - 1;

    console.log(" [x] Received %s", msg.content.toString());
    setTimeout(function() {
    console.log(" [x] Done");
    ch.ack(msg);
    }, secs * 1000);
    }, {noAck: false});

Message durability

  • When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren’t lost: we need to mark both the queue and messages as durable.

    1
    ch.assertQueue('task_queue', {durable: true});
  • This durable option change needs to be applied to both the producer and consumer code.

  • At this point we’re sure that the task_queue queue won’t be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by using the persistent option Channel.sendToQueue takes.
    1
    ch.sendToQueue(q, new Buffer(msg), {persistent: true});

Fair dispatch

  • You might have noticed that the dispatching still doesn’t work exactly as we want. For example in a situation with two workers, when all odd messages are heavy and even messages are light, one worker will be constantly busy and the other one will do hardly any work. Well, RabbitMQ doesn’t know anything about that and will still dispatch messages evenly.

  • This happens because RabbitMQ just dispatches a message when the message enters the queue. It doesn’t look at the number of unacknowledged messages for a consumer. It just blindly dispatches every n-th message to the n-th consumer.

    fair_dispatch

  • In order to defeat that we can use the prefetch method with the value of 1. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don’t dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.

    1
    ch.prefetch(1);

Putting it all together

Final code of our new_task.js class:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(err, conn) {
conn.createChannel(function(err, ch) {
var q = 'task_queue';
var msg = process.argv.slice(2).join(' ') || "Hello World!";

ch.assertQueue(q, {durable: true});
ch.sendToQueue(q, new Buffer(msg), {persistent: true});
console.log(" [x] Sent '%s'", msg);
});
setTimeout(function() { conn.close(); process.exit(0) }, 500);
});
And our worker.js:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(err, conn) {
conn.createChannel(function(err, ch) {
var q = 'task_queue';

ch.assertQueue(q, {durable: true});
ch.prefetch(1);
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
ch.consume(q, function(msg) {
var secs = msg.content.toString().split('.').length - 1;

console.log(" [x] Received %s", msg.content.toString());
setTimeout(function() {
console.log(" [x] Done");
ch.ack(msg);
}, secs * 1000);
}, {noAck: false});
});
});

Recommended Posts