semtax의 개발 일지

3. node.js와 rabbitMQ를 이용한 pub-sub 패턴 구현 본문

개발/node.js

3. node.js와 rabbitMQ를 이용한 pub-sub 패턴 구현

semtax 2020. 1. 18. 23:40
반응형

개요

이번 포스팅에서는, node.js 와 rabbitMQ를 이용해서 pub-sub패턴을 구현하는 내용을 진행하려고 합니다.

Pub-Sub 패턴?

Pub-Sub 패턴은 : Publisher Subscriber 패턴의 약자로, 발행자와 구독자 이 2가지로 나누어서

구독자(Subscriber)가 발행자(Publisher)에게 자기가 발행자의 정보를 받겠다고 구독 신청을 하면, 발행자가 자신에게 구독 신청을 한 구독자에게 메시지를 보내는 패턴을 말합니다.

쉽게 생각해서 TV 방송국과 시청자와의 관계를 생각하시면 됩니다.

RabbitMQ Exchage

이전 포스팅에서 다루었던 개념을 다시 한번 복습해봅시다.

  • Producer : 메시지를 보내는 사람
  • Queue : 메시지를 저장하는 큐(메시지 큐)
  • Consumer : 메시지를 받는 사람

위에서 언급했던 TV 방송국 예제를 다시 한번 생각을 해봅시다.

TV 방송국 에서 자기 채널을 보고 있는 시청자에게 방송을 송출한다고 가정합시다.

이때, 방송국은 자기 채널을 보고있는 사람한테만 방송을 보내야 합니다. 다시 말하면, 송신자가 어떤 수신자에게 메시지를 보내줄지 결정을 해주는 중계기 또는 길잡이가 필요하다는 의미입니다.

바로 Exchange가 RabbitMQ에서 이 역할을 해줍니다.

Exchange는 아래 그림 같이 메시지를 받아서 어느 큐에 넣어 줄지를 정하는 역할을 합니다.

img

node.js 에서는 아래와 같이 사용합니다.

ch.assertExchange('logs', 'fanout', {durable: false})

임시 큐

그러나, 문제가 하나 있습니다. 지난 포스팅에서 RabbitMQ의 큐를 배웠을때 각 큐는 유일한 이름을 가져야 한다는 내용을 보았을겁니다.

하지만 구독자가 여러명인 경우 같은 메시지를 여러 큐에 동시에 보내야하는 문제가 발생합니다.

물론 여러개의 큐를 사용자가 직접 적절하게 이름이 안겹치게 만들 수는 있겠지만, 그렇게 하면 엄청나게 번거롭습니다.

다행히도, RabbitMQ 에서는 이러한 번거러움을 피하게 해주는 임시 큐라는 것을 제공합니다. 아래 코드와 같이, 큐를 선언해주면 rabbitMQ에서 여러개의 임시큐를 생성해서 한 개의 큐인것 처럼 사용을 할 수 있게 해줍니다.

channel.assertQueue('', {
  exclusive: true
});

또한 위에서 언급한 Exchange와 Exchange가 어느 큐에 메시지를 보낼지 관계를 정하는것을 바인딩(Binding) 이라고 합니다.

node.js 에서는 아래와 같이 바인딩을 수행합니다.

channel.bindQueue(queue_name, 'logs', '');

예제

아래 예제는 Pub-Sub 패턴을 이용해서 로그를 수집하는 예제입니다.

아래 코드는 로그 정보를 송신하는 코드입니다.

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(error0, connection) {
  if (error0) {
    throw error0;
  }
  connection.createChannel(function(error1, channel) {
    if (error1) {
      throw error1;
    }
    var exchange = 'logs';
    var msg = process.argv.slice(2).join(' ') || 'Hello World!';

    channel.assertExchange(exchange, 'fanout', {
      durable: false
    });
    channel.publish(exchange, '', Buffer.from(msg));
    console.log(" [x] Sent %s", msg);
  });

  setTimeout(function() { 
    connection.close(); 
    process.exit(0); 
  }, 500);
});

아래 코드는 로그 정보를 수신해서 출력하는 예제입니다.

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(error0, connection) {
  if (error0) {
    throw error0;
  }
  connection.createChannel(function(error1, channel) {
    if (error1) {
      throw error1;
    }
    var exchange = 'logs';

    channel.assertExchange(exchange, 'fanout', {
      durable: false
    });

    channel.assertQueue('', {
      exclusive: true
    }, function(error2, q) {
      if (error2) {
        throw error2;
      }
      console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue);
      channel.bindQueue(q.queue, exchange, '');

      channel.consume(q.queue, function(msg) {
        if(msg.content) {
            console.log(" [x] %s", msg.content.toString());
          }
      }, {
        noAck: true
      });
    });
  });
});

아래와 같이 실행을 해주면 됩니다.

./emit_log.js
./receive_logs.js
반응형
Comments