일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
- cd
- API
- Spring Data JPA
- JPA
- bean
- producer
- entity graph
- AWS
- ECS
- mirror maker2
- Kubernetes
- Kotlin
- spring kafka
- topic생성
- CodePipeline
- git
- PAGING
- kafka
- Spring JPA
- consumer
- offsetdatetime
- transactionaleventlistener
- CI
- centos7
- K8s
- mysql
- spring
- QueryDSL
- Streams
- Entity
- Today
- Total
Yebali
Spring Kafka (스프링 카프카) 본문
스프링 카프카
스프링 카프카는 카프카를 스프링 프레임워크에서 효과적으로 사용할 수 있도록 만들어진 라이브러리다.
카프카 클라이언트에서 사용하는 여러 가지 패턴을 미리 제공한다.
ex) 컨슈머를 멀티 스레드로 운영하고 싶다면 concurrency 옵션 하나만 추가하면 쉽게 구현할 수 있다.
스프링 카프카 라이브러리는 어드민, 컨슈머, 프로듀서, 스트림즈 기능을 제공한다.
스프링 카프카의 의존성
implementation("org.springframework.kafka:spring-kafka")
스프링 카프카 프로듀서
스프링 카프카 프로듀서는 ‘카프카 템플릿’이라고 불리는 클래스를 사용하여 데이터를 전송할 수 있다.
카프카 템블릿은 ‘프로듀서 팩토리’ 클래스를 통해 생성하며, 사용하는 방법은 2가지가 있다.
- 스프링 카프카에서 제공하는 기본 카프카 템플릿을 사용
- 직접 사용자가 카프카 템플릿을 프로듀서 팩토리로 생성하는 방법
기본 카프카 템플릿
기본 프로듀서 팩토리를 통해 생성된 카프카 템플릿을 사용한다.
기본 카프카 템플릿을 사용할 때는 application.yaml에 프로듀서 옵션을 넣고 사용할 수 있다.
// application.yaml에서 설정 할 수 있는 프로듀서 옵션 값들
spring.kafka.producer.acks
spring.kafka.producer.batch-size
spring.kafka.producer.bootstrap-servers
spring.kafka.producer.buffer-memory
spring.kafka.producer.client-id
spring.kafka.producer.compression-type
spring.kafka.producer.key-serialzer
spring.kafka.producer.properties.*
spring.kafka.producer.retries
spring.kafka.producer.transaction-id-prefix
spring.kafka.producer.value-serializer
카프카 클라이언트 라이브러리를 사용할 때 bootstrap-servers, key-serializer, value-serializer를 선언하지 않으면 ConfigException이 발생하지만 스프링 카프카에서는 발생하지 않으며 아래 값으로 자동으로 설정된다.
- bootstrap-servers: localhost:9092
- key-serializer : StringSerialiczer
- value-serializer : StringSerialiczer
커스텀 카프카 템플릿
커스텀 카프카 템플릿은 프로듀서 팩토리를 통해 만든 카프카 템플릿 객체를 빈으로 등록하여 사용하는 것이다.
프로서듀에서 필요한 각종 옵션을 선언하여 사용할 수 있기 때문에, 다양한 종류의 카프카 프로듀서 인스턴스를 생성하고 싶다면 이 방식을 사용하면 된다.
@Configuration
class KafkaTemplateConfiguration {
@Bean
public KafkaTemplate<String, Any> customKafkaTemplate() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(roducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
ProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(props);
return new KafkaTemplate<>(pf);
}
}
스프링 카프카 컨슈머
스프링 카프카 컨슈머는 기존 컨슈머를 2개의 타입으로 나누고 커밋을 7가지로 나누었다.
스프링 카프카 컨슈머의 타입
- 레코드 리스너 (MessageListener) : 1개의 호출, 1개의 레코드 처리
- 배치 리스너 (BatchMessagerListener) : 1개의 호출, N개의 레코드 처리
스프링 카프카 컨슈머의 기본 리스너 타입은 레코드 리스너이다.
스프링 카프카 컨슈머의 커밋
기존 카프카 클라이언트에서 컨슈머를 구현할 때 가장 어려운 부분이 커밋을 구현하는 것이다.
스프링 카프카는 사용자가 구현 할만한 커밋 7가지를 미리 구현해놓았다.
스프링 카프카에서는 커밋을 ‘커밋’이라고 부르지 않고 ‘AckMode’라고 부른다.
- RECORD : 레코드 단위로 프로세싱 후 커밋
- BATCH : poll() 메서드로 호출된 레코드가 모두 처리된 이후 커밋 (Default 설정)
- TIME : 특정 시간 이후에 커밋
- COUNT : 특정 개수만큼 레코드가 처리된 이후에 커밋
- COUNT_TIME : TIME, COUNT, 옵션 중 맞는 조건이 하나라도 나올 경우 커밋.
- MANUAL : Acknowledgement.acknowledge() 매서드가 호출되면 다음번 poll() 때 커밋한다.
- MANUAL_IMMEDIATE : Acknowledgement.acknowledge() 매서드가 호출되면 즉시 커밋한다.
Acknowledgement.acknowledge() : 레코드나 배치가 처리되었을 때 호출하는 메서드.
Acknowledgment (Spring for Apache Kafka 2.8.2 API)
리스너를 생성하고 사용하는 방식은 두가지가 있다.
- 기본 리스너 컨테이너를 사용하는 방법
- 컨테이너 팩토리를 사용하여 직접 리스너를 만드는 방법
기본 리스너 컨테이너
기본 리스너 컨테이너는 기본 리스너 컨테이너 팩토리를 통해 생성된 리스너 컨테이너를 사용한다.
기본 리스너 컨테이너를 사용할 때는 application.yaml에 컨슈머와 리스너 옵션을 넣고 사용 할 수 있다.
설정할 수 있는 값은 아래와 같다.
spring.kafka.consumer.auto-commit-interval
spring.kafka.consumer.auto-offset-reset
spring.kafka.consumer.bootstrap-servers
spring.kafka.consumer.client-id
spring.kafka.consumer.enable-auto-commit
spring.kafka.consumer.fetch-max-wait
spring.kafka.consumer.fetch-min-size
spring.kafka.consumer.group-id
spring.kafka.consumer.heartbeat-interval
spring.kafka.consumer.key-deserializer
spring.kafka.consumer.max-poll-records
spring.kafka.consumer.properties.*
spring.kafka.consumer.value-deserializer
spring.kafka.listener.ack-count
spring.kafka.listener.ack-mode
spring.kafka.listener.ack-time
spring.kafka.listener.client-id
spring.kafka.listener.concurrency
spring.kafka.listener.idle-event-interval
spring.kafka.listener.log-container-config
spring.kafka.listener.monitor-interval
spring.kafka.listener.no-poll-threshold
spring.kafka.listener.poll-timeout
spring.kafka.listener.type
주의할 점은 스프링 카프카에서 컨슈머를 래핑 한 리스너 컨테이너가 다양한 옵션을 포함하여 동작한다는 것이다.
기존 카프카 클라이언트에서 사용되지 않는 새로운 옵션들이기 때문에 각 옵션의 종류와 사용법들을 적절히 정의하고 사용해야 한다.
컨테이너 리스너에 설정한 값들이 코드로 컨슈머를 구현할 때 영양을 미치기 때문이다.
커스텀 리스너 컨에티너
서로 다른 설정을 가진 2개 이상의 리스너를 구현하거나 리밸런스 리스너(리밸런스 이벤트가 발생했을 때 동작하는 메서드를 가진 클래스)를 구현하기 위해서는 커스텀 리스너 컨테이너를 사용해야 한다.
카프카 리스너 컨테이너 팩토리(KafkaListenerContainerFactory) 인스턴스를 만들어 빈으로 등록하고 KafkaListener어노테이션에서 커스텀 리스터 컨테이너 팩토리를 등록하면 커스텀 리스너 컨테이너를 사용할 수 있다.
@Configuration
public class ListenerContainerConfiguration {
@Bean
public kafkaListenerContainerFactory<ConcurrentMessageListenrContainer<String, String>> customContainerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIGm StringDeserializer.class);
DefaultKafkaConsumerFacotry cf = new DefaultKafkaConsumerFacotry<>(props);
..
..
..
}
}
@SpringBootApplication
public class SpringConsumerApplication {
@KafkaListener(
topics = "test",
groupId = "test-group",
containerFactory = "customContainerFactory"
)
public void customListenser(String data) {
}
}
'Kafka' 카테고리의 다른 글
Kafka 토픽 생성하기 (0) | 2022.02.01 |
---|---|
Kafka 설치하기 (feat. EC2) (0) | 2022.01.31 |
Kafka Consumer (카프카 컨슈머) (0) | 2022.01.30 |
Kafka Producer (카프카 프로듀서) (0) | 2022.01.30 |
Kafka MirrorMaker2 (카프카 미러메이커2) (0) | 2022.01.30 |