본문 바로가기
서버 개발/Spring Kafka 2.3.3 레퍼런스 번역

4.1.3. Receiving Messages - (1) MessageListenerContainer

by 그린코드 2019. 11. 18.

카프카 메시지를 수신하는 방법 두 가지

1. MessageListenerContainer Configuring

2. @KafkaListener 어노테이션을 사용하여, 메시지 리스너를 구현

 

Message Listeners

message listener를 위해 제공되는 8가지 인터페이스는 다음과 같다.

 

public interface MessageListener<K, V> { 
    void onMessage(ConsumerRecord<K, V> data);
}

public interface AcknowledgingMessageListener<K, V> { 
    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}

public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 
    void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}

public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 
    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}

public interface BatchMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data);
}

public interface BatchAcknowledgingMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}

public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}

public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}

 

 

Message Listener Containers의 두 가지 구현체

1. KafkaMessageListenerContainer

2. ConcurrentMessageListenerContainer

 

KafkaMessageListenerContainer는 모든 토픽과 파티션으로부터 메시지를 single thread로 받는다. ConcurrentMessageListenerContainer는 multi-threaded consumption을 위해, 하나 혹은 더 많은 KafkaMessageListenerContainer 인스턴스들에게 기능을 위임한다. (concurrency 속성 = KMLC 인스턴스의 수)

 

2.2.7 버전부터, listener container에 RecordInterceptor를 추가할 수 있는데, interceptor는 listener가 호출되기 이전에, record를 검사 혹은 수정할 수 있게 한다. 만약 interceptor가 null을 리턴하면, listener는 호출되지 않는다. (마치 Spring batch의 processor 역할) Interceptor는 batch listener를 사용하는 경우에는 호출되지 않는다.

 

2.3 버전부터 CompositeRecordInterceptor로 여러 개의 interceptor가 호출되게 할 수도 있다.

 

Using KafkaMessageListenerContainer

KafkaMessageListenrContainer는 두 가지 생성자를 사용하여 만들어질 수 있다.

 

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties)

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties,
                    TopicPartitionOffset... topicPartitions)

 

둘 다 ConsumerFactory, ContainerProperties (토픽 혹은 파티션 정보를 가짐)를 생성자 파라미터로 가지는데, 두 번째 생성자는 ConcurrentMessageListenerContainer에 의해 컨슈머 인스턴스 간에 TopicPartitionOffset을 분배하기 위해 사용된다.

 

ContainerProperties의 생성자는 다음과 같다.

 

public ContainerProperties(TopicPartitionOffset... topicPartitions)

public ContainerProperties(String... topics)

public ContainerProperties(Pattern topicPattern)

 

첫 번째 생성자는, 어떤 파티션을 사용할 것인지(consumer의 assign 메소드), 최초 오프셋 값을 어떤 값으로 가질지 등을 명시적으로 지시하기 위해 TopicPartitionOffset의 배열을 인자로 가진다. initial offset값이 양수면 절대적인 오프셋 값이다. 음수는 파티션의 현재 마지막 오프셋에 의해 결정되는 상대적인 값이다. (consumer.seekToEnd() + initialOffset) 두 번째 생성자는 토픽의 배열을 인자로 받는데, 카프카는 기본적으로 group.id 속성에 기반하여 파티션을 할당한다. 세 번째 생성자는 패턴을 인자로 받고, 토픽을 선택하기 위해 정규식을 사용한다.

 

public TopicPartitionOffset(String topic, int partition, Long offset, boolean relativeToCurrent) {
	this.topicPartition = new TopicPartition(topic, partition);
	this.offset = offset;
	this.relativeToCurrent = relativeToCurrent;
	this.position = null;
}

 

참고로 TopicPartitionOffset 생성자 중, relativeToCurrent 불린 값을 인자로 받는 생성자가 있다. 만약 이 값이 'true'이면 initial offset 값은 컨슈머의 현재 위치 기준 상대적으로 계산된다. 오프셋은 컨테이너가 시작될 때 적용된다. 

 

MessageListener를 컨테이너에 할당하기 위해서, 컨테이너를 만들 때, ContainerProps.setMessageListener 메서드를 사용할 수 있다. 예시는 아래와 같다.

 

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

 

DefaultKafkaConsumerFactory를 만들 때, 위와 같이 설정에 의해 key/value deserializer가 주어지는 properties를 인자로 받는 생성자를 이용해야 한다. 혹은 생성자에서 deserializer 인스턴스를 보내도록 해야, 모든 컨슈머가 동일한 인스턴스를 공유할 수 있다. 또 다른 방법으로  Supplier<Deserializer>를 이용하는 방법(v2.3 후로 가능)이 있는데, 이 방법을 활용하면 각 컨슈머마다 분리된 deserializer를 가질 수 있다.

 

Using ConcurrentMessageListenerContainer

ConcurrentMessageListenerContainer의 생성자는 아래와 같다.

 

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                            ContainerProperties containerProperties)
                            
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties,
                    TopicPartitionOffset... topicPartitions)

 

ConcurrentKafkaListenerContainerFactory.class
- private Integer concurrency;
ConcurrentMessageListenerContainer.class
- private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>();

 

ConcurrentMessageListenerContainer는 concurrency 속성을 가진다. 예를 들어 concurrecy값이 3이면 3개의 KafkaMessageListenerContainer인스턴스가 만들어진다. 두 번째 생성자가 사용되면, ConcurrentMessageListenerContainer는 TopicPartition인스턴스를 KafkaMessageListenerContainer에게 위임한다.

 

만약 6개의 TopicPartition 인스턴스가 있고, concurrency가 3이면, 각 컨테이너는 2개의 파티션을 갖는다. 그리고 동일한 concurrency에서 5개의 TopicPartition 인스턴스가 있다면, 2개의 컨테이너는 2개의 파티션을 갖고, 나머지 1개의 컨테이너는 1개의 파티션을 갖는다. 만약 concurrency가 TopicPartition의 수보다 크면, 각 컨테이너가 1개의 파티션을 갖도록 concurrency는 하향 조정된다.

 

버전 1.3부터 MessageListenerContainer는 metrics 메서드를 제공한다. 버전 2.3부터는 카프카 컨슈머가 poll() 메서드를 호출하는 동안, 메인 루프를 sleep할 수 있도록, ContainerProperties에서 idleBetweenPools 옵션을 제공한다. 실제적인 sleep interval은 컨슈머의 'max.poll.interval.ms' 설정과 '현재 레코드의 배치 프로세스 시간' 중 작은 값으로 결정된다.

 

Committing Offsets

오프셋을 커밋하는 방법으로 여러 가지 옵션이 제공된다. 만약 컨슈머의 속성 중 'enable.auto.commit=true'이면, 카프카는 설정에 따라 자동으로 오프셋을 커밋한다. 만약 설정이 'false'이면 컨테이너는 여러 'AckMode'를 세팅할 수 있는데, 디폴트 AckMode는 'BATCH'이다. 버전 2.3부터 'enable.auto.commit=false'로 변경되었다. 컨슈머의 poll() 메서드는 하나 혹은 그 이상의 ConsumerRecords를 반환한다. MessageListener는 각 레코드를 위해 호출된다. 컨테이너의 AckMode종류는 아래와 같다.

 

  • RECORD: 레코드가 처리된 후, 리스너가 리턴할 때 오프셋 커밋
  • BATCH: poll() 메서드에 의해 처리된 모든 레코드가 리턴될 때 오프셋 커밋
  • TIME: poll() 메서드에 의해 처리된 모든 레코드가 리턴되고, 마지막 커밋 이후의 시간이 ackTime을 넘으면 오프셋 커밋
  • COUNT: poll() 메소드에 의해 처리된 모든 레코드가 리턴되고, 마지막 커밋 이후 ackCount만큼의 레코드를 받으면 오프셋 커밋
  • COUNT_TIME: TIME과 COUNT 둘 중의 하나의 조건이 만족되면 오프셋 커밋
  • MANUAL: 메시지 리스너가 acknowledge() 함수를 호출하고 나면, BATCH 모드처럼 처리
  • MANUAL_IMMEDIATE: 메시지 리스너가 acknowledge() 함수가 호출하면, 즉시 오프셋 커밋

 

버전 2.3부터 Acknowledgment 인터페이스에 2가지 optional method가 (nack) 추가되었다.

 

  • nack(long sleep)
  • nack(int index, long sleep)

 

첫 번째 함수는 record listner에서 사용되고, 두 번째 함수는 batch listener에 의해 사용된다. 리스너에서 잘못된 메서드를 호출하면 IllegalStateException이 발생한다.

 

record listener에서 nack() 함수가 호출되면 펜딩되었던 오프셋이 커밋되고, 마지막 poll 이후로 남아있던 레코드가 버려진다. 해당 파티션에서 실패 혹은 처리되지 못한 레코드를 next poll()에서 재전송 하기 위해 seek이 수행된다. 컨슈머 스레드는 재전송 전에 sleep 인자로 세팅한 만큼 잠시 멈출 수 있다. 이것은 컨테이너가 SeekToCurrentErrorHandler로 구성되었을 때 예외를 던지는 기능과 비슷하다.

 

batch listener를 사용하면, 배치 안에서 실패가 발생되는 index를 지정할 수 있다. nack() 함수가 호출될 때, 지정한 index 이전의 레코드의 오프셋만 커밋 되고, 해당 파티션에서 실패 혹은 처리되지 못한 레코드를 next poll()에서 재전송 하기 위해 seek이 수행된다. 이것은 SeekToCurrentBatchErrorHandler보다 개선된 것으로, 재전송을 위해 전체 배치를 seek할 수 있다.

 

 

 

* Spring Kafka 2.3.3 Reference를 참고하였고, 이해를 높이기 위해서 일부 문장을 정리 + 의역하였습니다. 번역이 처음이라 잘못 번역된 부분이 있을 수 있습니다! 댓글로 남겨주시면 바로 수정하겠습니다^^

댓글