semtax의 개발 일지

2. rabbitmq를 이용한 작업 큐 구현 본문

개발/node.js

2. rabbitmq를 이용한 작업 큐 구현

semtax 2020. 1. 15. 23:08
반응형

개요

이번 포스팅에서는, rabbitMQ를 이용해서 작업 큐를 구현해보는 내용을 진행하도록 하겠습니다.

작업 큐(Work Queue) 란?

작업 큐는 쉽게 말해서 은행이나 식당에서 사용하는 번호표 같은것입니다. 사림들이 은행에 왔는데 창구가 꽉 찬 경우 번호표를 받고 대기 한 뒤, 차례가 오면 가서 업무를 처리하는 식이죠. 만약 이러한 번호표가 없다면 손님들이 대기하지 않고 떠나게 되겠죠.

즉, 작업 큐의 핵심은 오래걸리는 작업들의 요청이 허용량 이상으로 쏟아졌을때 를 이용해서 대기열을 둔 뒤에 실제 작업하는 사람이 작업을 하나씩 꺼내가서 작업을 한다는 것입니다. 이렇게 하는 경우 실제 작업을 시키는 사람의 입장에서는 요청을 즉시 즉시 받는것 처럼 느껴지는 것이지요. 즉, I/O를 요청하고나서 결과값을 받으려고 대기하지 않아도 된다는 사실입니다. 사실 이 작업 큐 가 비동기 로직의 핵심입니다.

또한 데이터를 이러한 식으로 받는 경우 보통 OLAP 방식의 시스템이라고도 합니다.(이와 대조적인 예가 OLTP 시스템입니다. 보통 일반적인 RDMBS가 이러한 구조를 가지고 있죠)

예제

저번 포스팅에서는 간단하게 Hello World만을 찍어 보았지만 이번에는 조금더 복잡한 작업을 해보도록 하겠습니다.

아래 코드는 RabbitMQ를 이용해서 실제 작업을 요청하는 예제입니다.

#!/usr/bin/env node

#!/usr/bin/env node

var amqp = require("amqplib/callback_api");

amqp.connect("amqp://localhost", function(error0, connection){

    if(error0){
        throw error0;
    }

    connection.createChannel(function(error1, channel){

        if(error1){
            throw error1;
        }

        var queue = 'task_queue';
        var msg = process.argv.slice(2).join(' ') || "Hello World!";

        channel.assertQueue(queue,{
            durable: true
        });

        channel.sendToQueue(queue, Buffer.from(msg),{
            persistent: true
        });

        console.log(" [x] Sent %s", msg);
    });

    setTimeout(function(){
        connection.close();
        process.exit(0);
    }, 500);
});

먼저 connect 함수를 이용해서 메시지 큐 서비스와 연결을 요청한 뒤, *createChannel *함수를 이용해서 채널을 생성합니다.

채널은 일종의 통로입니다. 사람들끼리 서로 통신을 주고 받는 경우에는, 서로 주고 받는 사람이 누구인지가 명확해야 하겠지요? 이렇게 수신자와 송신자의 관계를 명확하게 나타내는게 바로 채널입니다. (TV채널도 이와 비슷하다고 볼 수 있습니다. 주고 받는사람들이 명확하니깐요)

그런 뒤, assertQueue 함수를 이용해서 사용할 큐를 정합니다. 이때 durable을 true로 설정해서 메시지 큐 서비스가 죽어도 큐가 사라지지 않고 보존되게 하였습니다.

그리고 sendToQueue 함수를 이용해서 RabbitMQ 서비스를 거쳐서 작업을 요청합니다. 이때 persistent 옵션을 줌으로써 메시지 큐 서비스가 죽어도 큐의 내용이 보존되게 하였습니다.

아래는 실제 작업하는 worker 코드 입니다.

#!/usr/bin/env node

var amqp = require("amqplib/callback_api");

amqp.connect("amqp://localhost", function(error0, connection){

    if(error0){
        throw error0;
    }

    connection.createChannel(function(error1, channel){

        if(error1){
            throw error1;
        }

        var queue = 'task_queue';
        var msg = 'Hello world!';

        channel.assertQueue(queue,{
            durable: true
        });

        channel.prefetch(1);

        channel.consume(queue, function(msg){

            var secs = msg.content.toString().split('.').length - 1;

            console.log("[x] Received %s", msg.content.toString());

            setTimeout(function(){
                console.log("[x] Done");
            }, secs * 1000);

        }, {
            noAck: true
        });
    });
});

먼저 connect 함수를 이용해서 메시지 큐 서비스와 연결을 요청한 뒤, *createChannel *함수를 이용해서 채널을 생성합니다.

그런 뒤, assertQueue 함수를 이용해서 사용할 큐를 정합니다. 이때 durable을 true로 설정해서 메시지 큐 서비스가 죽어도 큐가 사라지지 않고 보존되게 하였습니다.

그리고 prefetch 함수를 이용해서 최대 허용 가능한 수를 1개로 제한 함으로써, 너무 과도한 작업을 하지 않도록 하였습니다. 이는, 워커의 급격한 시스템 자원 고갈을 막기 위한것입니다(보통 이러한 기법을 백 프레셔 "Back pressure" 라고 합니다.)

그 다음으로, consume 함수를 이용해서 큐에서 데이터를 꺼낸 뒤 메시지를 찍어주고 1초동안 대기를 하게됩니다. 이때, noAck을 true로 줌으로써, 따로 메시지를 꺼내서 잘 받았다는 응답을 해주지 않겠다고 설정을 하였습니다.(수신자가 굳이 응답을 받을 필요가 없을때 이러한 방식을 많이 채택합니다.)

실행 해보면 재미있는 결과를 얻을 수 있습니다. 먼저 워커를 1대만 띄워놓은 경우에는 1대가 모든 메시지를 받지만 워커를 2대이상 띄워놓는 경우에는 메시지가 각각 (1번큐, 2번큐, ... n번큐) 순서대로 순차적으로 1개씩 전송되게 됩니다. 이러한 방식을 라운드 로빈 방식이라고 합니다. 또한 이러한식으로 처리를 분산시키는것을 로드 밸런싱 이라고 합니다.

반응형
Comments