개발

[RabbitMQ] - 3. Spring boot로 Work Queue에 다수의 consumer를 등록하여 task 처리하기

primayy 2024. 1. 14. 17:52
728x90

개요

오늘은 Work Queue를 등록해서 resource-intensive task(리소스를 많이 사용하는 작업)를 다수의 consumer에게 분배하는 방법을 spring boot로 작성해보려고 합니다.

작업의 핵심은 Work Queue를 등록하여 리소스를 많이 사용하는 작업을 바로 수행하지 않고, 나중에 수행하도록 스케쥴링하며 요청자는 작업이 완료될 때까지 기다리지 않도록 하는 것입니다.

Producer가 전송한 메시지는 큐에 담기고 연결된 Consumer에게 분배된다

 

준비

준비해야 할 것은 다음과 같습니다.

1) 프로젝트 생성

2) Queue, Consumer, Producer 설정

 

1) 프로젝트 생성

https://start.spring.io/

프로젝트를 생성하겠습니다.

spring boot를 사용하고 의존성은 간단하게 lombok과 RabbitMq만 추가하도록 하겠습니다.

 

2) Queue, Consumer, Producer 설정

queue 등록

메시지를 전송하고 consume하는데 사용할 queue부터 설정하겠습니다.

queue는 gui 페이지에서 등록했고 이름을 worker로 설정한 것 외에는 별도의 추가적인 설정은 없습니다.

@Configuration
public class QueueConfig {
    @Bean
    public Queue worker() {
        return new Queue("worker");
    }
}

그리고 추가한 queue를 사용할 수 있도록 bean으로 등록합니다.

 

다음으로는 worker queue에 메시지를 전송할 producer와 consumer를 설정해야합니다.

우선은 consumer부터 설정하겠습니다.

@RabbitListener(queues = "worker") // worker queue에서 메시지를 consume
@AllArgsConstructor
public class WorkerQueueConsumer {
    private int num; // consumer 구분자

    // consume시 메시지를 처리할 메소드 
    @RabbitHandler 
    public void consume(WorkerQueueTask workerQueueTask) throws InterruptedException {
        System.out.println(num +" received uuid: " + workerQueueTask.getId());
        doWork(workerQueueTask);
    }

    public void doWork(WorkerQueueTask workerQueueTask) throws InterruptedException {
        Thread.sleep(1000); // 처리시 1초가 걸림을 가정
        System.out.println(num+ " done. uuid: " + workerQueueTask.getId());
    }
}

consumer는 메시지를 받아서 처리할 예정인데, resource-intensive task를 가정하기 위해 Thread.sleep을 이용하여 각 처리마다 1초가 걸리도록 했습니다.

@Profile("consumer")
@Configuration
public class ConsumerConfig {
    @Bean
    public WorkerQueueConsumer workerQueueConsumer1() {
        return new WorkerQueueConsumer(1);
    }

    @Bean
    public WorkerQueueConsumer workerQueueConsumer2() {
        return new WorkerQueueConsumer(2);
    }
}

이렇게 작성한 consumer는 bean으로 등록합니다. 2개의 consumer를 등록했고 메시지를 처리하는 순서를 확인하기 위해 각각 1,2라는 구분자를 설정해주었습니다.

 

다음으로는 메시지를 전송할 producer를 설정하겠습니다.

@Profile("sender")
@Component
public class WorkerQueueSender {
    private final RabbitTemplate rabbitTemplate;
    private final Queue workerQueue;

    public WorkerQueueSender(RabbitTemplate rabbitTemplate, @Qualifier("worker") Queue workerQueue) {
        this.rabbitTemplate = rabbitTemplate;
        this.workerQueue = workerQueue;
    }

    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void send() {
        WorkerQueueTask message = new WorkerQueueTask(UUID.randomUUID(), "message");
        rabbitTemplate.convertAndSend(workerQueue.getName(), message);
        System.out.println("Sent message. uuid: " + message.getId());
    }
}

메시지를 전송하는 메소드는 send메소드이며 1초에 한번씩 메시지를 전송하기 위해 스케줄러를 사용했습니다. (@Scheduled)

스케줄러를 동작시키기 위해 @EnableScheduling을 선언해주는 것을 잊지맙시다.

@SpringBootApplication
@EnableScheduling
public class WorkerqueueApplication {

	public static void main(String[] args) {
		SpringApplication.run(WorkerqueueApplication.class, args);
	}

}

 

producer가 전송하는 메시지의 스펙은 다음과 같습니다

@NoArgsConstructor
@AllArgsConstructor
@Getter
public class WorkerQueueTask implements Serializable {
    private UUID id; // 메시지 구분자
    private String message; // 메시지
}

전송할 메시지의 구분자와 담긴 메시지로 간단하게 구성했습니다.

 

실행

앞서 producer와 consumer를 나눠서 실행할 수 있도록 프로필을 분리했습니다.

producer는 sender, consumer는 consumer로 프로필을 적용해서 실행해보겠습니다.

java -jar workerqueue-0.0.1-SNAPSHOT.jar --spring.profiles.active=consumer
java -jar workerqueue-0.0.1-SNAPSHOT.jar --spring.profiles.active=sender

 

producer(sender) 실행
consumer 실행

producer가 전송한 메시지는 consumer에게 순차적으로 전달되어 처리되는 것처럼 보입니다.

실제로는 어떻게 전달되고 있을까요?

 

기본적으로 RabbitMQ는 각 메시지를 다음 consumer에게 순차적으로 전달합니다.

이러한 분배방법을 round-robin이라고도 부르는데, 이 방법의 단점으로는 각 consumer의 상황과는 상관없이 항상 순차적으로 메시지를 전달한다는 것입니다.

예를 들어서 2개의 consumer가 존재하고, 짝수번째의 메시지가 heavy한 처리를 유발한다면 항상 짝수번째 메시지를 받는 consumer는 바쁘게 될 것입니다.

 

각 consumer에게 조금 더 균등하게 메시지를 분배하는 방법은 없을까요?

spring AMQP를 사용한다면 fair dispatch를 기본으로 사용합니다.

사전에 정의한 prefetch count만큼 consumer가 메시지를 메모리에 적재해서 순차적으로 처리하는 방법입니다.

 

이번 글에서 작성한 예제 또한 spring AMQP를 사용했기 때문에 fair dispatch를 기본 설정으로 사용하고 있겠습니다.

rabbitMQ gui에서 worker queue에 연결된 consumer를 확인해보겠습니다.

consumer 기본 설정

따로 설정한 적 없는 prefetch count가 250으로 설정되어있습니다.

즉, 각 consumer는 250개의 메시지를 한번에 메모리로 가져갈 것이고 rabbitMQ는 컨슈머에게 unacked된 메시지가 250개라면 더이상 새로운 메시지를 보내지 않고 다른 consumer에게 분배할 것입니다.

 

fair dispath의 동작을 확인하기위해 worker queue에 250(prefetch count) x 2(consumer count) 이상의 메시지를 담아보았습니다.

worker queue에 prefetch될 메시지 이상의 메시지를 담는다

 

consumer가 연결된 순간 unacked가 500개가 된다

잠깐 정신을 판 순간 메시지는 3700개가 담겼습니다.

2개의 consumer가 연결되자 prefetch count 250 x 2가 되어 unacked된 메시지가 500개가 되는 것을 볼 수 있습니다.

그리고 각 consumer의 메모리에 적재된 메시지는 순차적으로 1개씩 처리가 됩니다.

 

그렇다면 fair dispatch의 단점은 무엇일까요?

예를 들어서 서비스 관리자가 queue의 메시지를 조금 더 빠르게 처리하기 위해서 consumer를 늘리기로 결정했다고 가정하겠습니다.

그러나 이미 메시지들은 기존에 존재하던 consumer의 메모리로 적재되어 있는 상황입니다.

새롭게 생성된 consumer는 queue에서 가져갈 ready 상태인 메시지가 없다면 idle한 상태가 될 것이고 메시지 처리 속도에는 큰 변화가 없게 됩니다.

 

prefetch count를 기본 설정값으로 사용하고 싶지 않다면?

prefetch count는 AbstractMessageListenerContainer에 기본값 250으로 설정되어 있습니다.

AbstractMessageListenerContainer의 구현체 중 하나인 SimpleMessageListenerContainer를 factory에 등록해서 사용하면 되겠습니다.

    @Bean
    public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchOneContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(simpleRabbitListenerContainerFactory, connectionFactory);
        simpleRabbitListenerContainerFactory.setPrefetchCount(1);
        return simpleRabbitListenerContainerFactory;
    }

prefetch count를 1로 설정한 containerFactory를 bean을 등록했습니다.

해당 bean을 사용하는 consumer는 메시지를 round-robin처럼 분배받게 되겠네요.

@RabbitListener(queues = "worker", containerFactory = "prefetchOneContainerFactory")
@AllArgsConstructor
public class WorkerQueueConsumer

생성한 container를 consumer의 RabbitListener에 설정해줍니다.

 

정상적으로 설정되었는지 보기 위해 확인해봅시다.

변경된 prefetch count

consumer의 prefetch count가 1로 설정되었습니다.

또한 prefetch count에 따라서 unacked 상태인 메시지도 500 -> 2로 변경된 걸  볼 수 있겠습니다.

 

prefetch count 값에 따라 어플리케이션 성능에 많은 영향이 있을 것이므로 적절한 값을 찾아 설정하는게 중요할 것으로 보입니다.

 

결론

오늘은 요청한 작업을 즉시 수행하지 않고 작업이 완료될 때까지 대기하지 않기 위해 work queue를 구성한 뒤 다수의 consumer에게 메시지를 스케줄링하여 처리하도록 설정해보았습니다.

또한 prefetch count 설정에 따라 consumer에게 메시지를 분배하는 방법이 달라지고 어플리케이션 성능에 영향을 줄 수 있다는 점도 확인해보았습니다.

 

다음에는 exchange를 사용하여 다수의 consumer에게 메시지를 라우팅하는 publish/subscribe 패턴에 대해서 글을 작성해보려고 합니다.

그럼 안녕~

728x90
반응형