semtax의 개발 일지

5. rabbitMQ 토픽 본문

개발/node.js

5. rabbitMQ 토픽

semtax 2020. 2. 26. 23:16
반응형

 

개요

 

 

이번 시간에는 RabbitMQ의 topic 기능 에 대해 알아보고, node.js를 이용해서 예제를 작성해보도록 하겠습니다.

 

 

 

 

토픽?

 

 

한 가지 가정을 해봅시다. 여러분이 신문이나 잡지를 구독하고 있다는 가정 말이죠.

잡지를 구독하면, 보통 구독자는 구독자가 원하는 주제(토픽)를 골라서 구독하고,

 

 

구독 서비스를 제공하는 회사는 구독자가 원하는 주제(토픽)에 맞춰서 그에 알맞은 잡지를 제공하게 됩니다.

이와 같이, 구독 서비스를 제공하는 회사는 구독자에게, 주제(토픽)에 맞춰서 원하는것을 제공하는 기능을 가지고 있습니다.

 

 

RabbitMQ도 마찬가지 입니다. RabbitMQ도 구독자에 해당하는 각 수신자에게,

 

 

각 수신자가 원하는 정보만을 골라서 가져갈 수 있게 토픽(TOPIC) 이라는 기능을 제공합니다.

 

 

 

 

그림으로 나타내면 아래와 같습니다.

 

img

 

 

위와 같이, rabbitMQ의 exchange(옛날 전화기의 교환원 같은 존재라고 상상하시면 됩니다.)가 어떤 토픽이냐에 따라,

해당 토픽에 관심있어하는 구독자(여기서는 큐 + 수신자 에 해당합니다)에게 선별적으로 데이터를 전달해 줍니다.

 

 

 

또한 RabbitMQ는 이러한 토픽을 전달하는데, 와일드 카드 기능도 제공을 합니다.

 

 

제공되는 기능은 아래와 같습니다.

  1. * (star) : 단 한 단어 만을 가르킴
  2. # (hash) : 한 단어 이상을 가르킴

 

 

예시를 들어서 설명하면, lazy.# 같은 경우, lazy.aa.bb, lazy.aa.cc, lazy.asdasdas.qweqwe 모두와 매칭되지만,

 

*.orange.* 와 같은 경우, a.orange.b 는 매칭이 되지만 a.b.orange와 같은건 매칭이 되지 않습니다.

 

 

예시

 

 

아래 예제는, 구독자에게 알맞은 데이터를 전달하는 구독 서비스에 해당하는 프로그램 코드 입니다.

 

 

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(error0, connection) {
  if (error0) {
    throw error0;
  }
  connection.createChannel(function(error1, channel) {
    if (error1) {
      throw error1;
    }
    var exchange = 'topic_logs';
    var args = process.argv.slice(2);
    var key = (args.length > 0) ? args[0] : 'anonymous.info';
    var msg = args.slice(1).join(' ') || 'Hello World!';

    channel.assertExchange(exchange, 'topic', {
      durable: false
    });
    channel.publish(exchange, key, Buffer.from(msg));
    console.log(" [x] Sent %s:'%s'", key, msg);
  });

  setTimeout(function() { 
    connection.close(); 
    process.exit(0) 
  }, 500);
});

 

 

아래 예제는 원하는 데이터를 받으려고 하는 구독자에 해당하는 프로그램 코드 입니다.

 

 

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

var args = process.argv.slice(2);

if (args.length == 0) {
  console.log("Usage: receive_logs_topic.js <facility>.<severity>");
  process.exit(1);
}

amqp.connect('amqp://localhost', function(error0, connection) {
  if (error0) {
    throw error0;
  }
  connection.createChannel(function(error1, channel) {
    if (error1) {
      throw error1;
    }
    var exchange = 'topic_logs';

    channel.assertExchange(exchange, 'topic', {
      durable: false
    });

    channel.assertQueue('', {
      exclusive: true
    }, function(error2, q) {
      if (error2) {
        throw error2;
      }
      console.log(' [*] Waiting for logs. To exit press CTRL+C');

      args.forEach(function(key) {
        channel.bindQueue(q.queue, exchange, key);
      });

      channel.consume(q.queue, function(msg) {
        console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
      }, {
        noAck: true
      });
    });
  });
});

 

 

실행 결과

 

 

먼저 구독자에 해당하는 프로그램을 먼저 실행 시켜보자.

 

 

아래 예제는, 모든 메시지를 구독하겠다는 의미이다.

 

 

./receive_logs_topic.js "#"

 

 

 

구독자를 하나 더 실행 시켜보자.

 

 

아래 예제는, 커널 로그만 받겠다는 의미이다.

 

./receive_logs_topic.js "kern.*"

 

 

 

이번에는, 메시지를 전달하는 구독서비스에 해당하는 프로그램을 실행시켜보자.

 

 

아래 예제는, 커널의 크리티컬한 정보(Crash 에러 로그 같은)를 전송하는 예제이다.

 

 

./emit_log_topic.js "kern.critical" "A critical kernel error"

 

 

아래 예제를 실행시켜보면 2번째로 실행한 구독자에게 메시지가 갈 것이다.

 

 

 

출처

1. https://www.rabbitmq.com/tutorials/tutorial-five-python.html

 

RabbitMQ tutorial - Topics — RabbitMQ

Topics (using the Pika Python client) Prerequisites This tutorial assumes RabbitMQ is installed and running on localhost on standard port (5672). In case you use a different host, port or credentials, connections settings would require adjusting. Where to

www.rabbitmq.com

반응형
Comments