일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
- K8s
- CodePipeline
- Streams
- Kotlin
- centos7
- producer
- AWS
- offsetdatetime
- consumer
- Spring JPA
- Entity
- git
- mirror maker2
- bean
- Kubernetes
- entity graph
- transactionaleventlistener
- ECS
- Spring Data JPA
- spring kafka
- topic생성
- mysql
- API
- PAGING
- QueryDSL
- CI
- cd
- spring
- JPA
- kafka
- Today
- Total
Yebali
Kafka Streams (카프카 스트림즈) 본문
카프카 스트림즈란?
토픽에 있는 데이터를 실시간으로 변환하여 다른 토픽에 적재하는 라이브러리이다.
카프카 스트림즈의 장점
카프카가 공식적으로 지원하는 라이브러리이다. 그리고 카프카와 카프카 스트림즈는 함께 버전업이 되어 호환성이 서로 좋다.
스트림 처리에 필요한 다양한 기능(토픽 생성, 상태 저장, 데이터 조인 등)을 제공한다.
장애가 발생해도 장애 허용 시스템(Fault Tolerant System)을 통해 데이터가 딱 한 번만 처리되도록 보장한다.
'프로듀서 + 컨슈머' vs '스트림즈'
‘프로듀서 + 컨슈머’조합으로 스트림즈와 유사한 것을 만들 수 있다.
하지만 단 한번의 데이터 처리, 장애 허용 시스템 등의 기능들은 ‘프로듀서 + 컨슈머’의 조합으로는 구현하기 어렵다.
만약 소스 토픽(사용하는 토픽)과 싱크 토픽(저장하는 토픽)이 다른 클러스터에 존재하는 경우
스트림즈가 지원하지 않으므로 ‘프로듀서 + 컨슈머’ 조합을 사용해야 한다.
Streams Application은 내부에 1개 이상의 스레드를 생성할 수 있으며, 스레드는 1개 이상의 Task를 가진다.
Task란 Streams Application을 실행하면 생기는 데이터 처리 최소 단위이다.
ex) 3개의 파티션으로 이루어진 토픽을 처리하는 Streams Application은 3개의 task를 가진다.
카프카의 토폴로지
토폴로지(topology)란 2개 이상의 노드와 선으로 이루어진 집합이다.
토폴로지는 아래 두 가지로 구성된다.
- 프로세서(processor) : 카프카 스트림즈에서 토폴로지를 이루는 노드를 말한다.
- 스트림(stream) : 노드와 노드를 이은 선을 말한다.
프로세서는 역할에 따라 3가지 타입으로 나누어진다.
- 소스 프로세서 : 하나 이상의 토픽에서 데이터를 가져오는 역할.
- 스트림 프로세서 : 다른 프로세서가 반환한 데이터를 처리하는 역할.
- 싱크 프로세서 : 데이터를 특정 카프카 토픽에 저장하는 역할
스트림즈 DSL
스트림즈 DSL에는 레코드의 흐름을 추상화한 3가지 개념이 있다.
KStream
레코드의 흐름을 표현한 것으로 메시지 키와 메시지 값으로 구성되어 있다.
KStream으로 데이터를 조회하면 토픽에 존재하는 모든 레코드가 출력된다.
→ 컨슈머로 토픽을 구독하는 것과 같다.
KTable
KStream과 다르게 메시지 키를 기준으로 묶어서 사용한다.
메시지 키를 기준으로 가장 최신 레코드를 사용한다.
GlobalKTable
KTable과 동일하게 메시지 키를 기준으로 묶어서 사용한다.
그러나 KTable로 선언된 토픽은 1개의 파티션이 1개의 태스크에 할당되어 사용되고,
GlobalKTable로 선언된 토픽은 모든 파티션 데이터가 각 태스크에 할당되어 사용된다.
GlobalKTable의 가장 좋은 예는 KStream과 KTable이 데이터 조인(join)하는 것이다.
참고, KStream과 KTable을 조인하려면 반드시 코파티셔닝되어 있어야 한다.
코파티셔닝이란 조인하는 2개의 데이터의 파티션 개수가 동일하고 파티셔닝 전략을 동일하게 맞추는 작업을 말한다.
코파티셔닝하지 않고 실행하면 'TopologyException'이 발생하며 코파티셔닝이 되어있지 않으면 리파티셔닝 하는 과정을 거쳐야 한다.
리파티셔닝이란 기존 토픽으로부터 새로운 토픽에 새로운 메시지 키를 가지도록 재배열하는 과정이다.
GlobalKTable을 사용하면 태스크마다 GlobalKTable로 정의된 모든 데이터를 저장하고 사용하기 때문에
로컬 스토리지 사용량이 증가하고 네트워크, 브로커에 부하가 생긴다.
스트림즈 DSL 주요 옵션
필수 옵션
- bootstrap.servers : 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트 이름.
- application.Id : 스트림즈 애플리케이션을 구분하기 위한 고유한(Unique) 아이디.
선택 옵션
- default.key.serde : 레코드 메시지 키를 직렬화/역직렬화 하는 클래스 지정. (defulat: Serdes.ByteArray)
- default.value.serde : 레코드 메시지 값을 직렬화/역직렬화 하는 클래스 지정. (default: Serdes.ByteArray)
- num.stream.threads : 스트림 프로세싱 실행 시 실행될 스레드 개수. (default: 1)
- state.dir : 상태 기반 데이터 처리를 할 때 데이터를 저장할 디렉토리를 지정한다. (defulat : tmp/kafka-streams)
스트림즈 DSL - stream(), to()
특정 토픽의 데이터를 다른 토픽으로 전달하는 예시.
토픽을 KStream 형태로 가져오려면 stream() 메서드를 사용하면 된다.
KStream의 데이터를 특정 토픽에 저장하려면 to() 메서드를 사용하면 된다.
public static void main() {
// 설정
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONfIG, APPLICATION_NAME);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTREAP_SERVERS);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DAFAULT_VALUE_SERDE_CLASSS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// 'stream_log'로 부터 KStream 객체를 만든다. 'stream()'메서드는 소스 프로세서
// KTable => .table() | GlobalKTable => globalTable()
KStream<String, String> streamLog = builder.stream("stream_log");
// 'stream_log'토픽을 담은 KStream를 'stream_log_copy'토픽에 전송한다. 'to()'메서드는 싱크 프로세서
streamLog.to("stream_log_copy");
// KafkaStreams 인스턴스를 생성하고 실행한다.
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
스트림즈 DSL - filter()
토픽으로 들어온 문자열 데이터 중 문자열의 길이가 5보다 큰 경우면 필터링하는 예시.
filter() 메서드를 사용하면 된다.
public static void main() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONfIG, APPLICATION_NAME);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTREAP_SERVERS);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DAFAULT_VALUE_SERDE_CLASSS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> streamLog = builder.stream("stream_log");
// 레코드의 value의 길이가 5보다 큰 경우로 필터링
KStream<String, String> filteredStream = streamLog.filter(
(key, value) -> value.length() > 5
);
filteredStream.to("stream_log_copy");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
스트림즈 DSL - KTable과 KStream을 join()
KTabler과 KStream은 메시지 키를 기준으로 조인할 수 있다.
public static void main() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONfIG, APPLICATION_NAME);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTREAP_SERVERS);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DAFAULT_VALUE_SERDE_CLASSS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KTable<String, String> addressTable = builder.table("address_table");
KStream<String, String> orderStream = builder.stream("order_stream");
// orderStream과 addressTable을 join
orderStream.join(addressTable,
// KStream과 KTable에서 동일한 메시지 키를 찾은 경우 어떤 데이터를 만들지 정의
(order, address) -> order + " send to " + address)
).to("order_join_stream")
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
프로세서 API
스트림즈 DSL보다 투박하지만 토폴로지를 기준으로 데이터를 처리한다는 관점에서는 동일한 역할을 한다.
단, 프로세서 API에서는 KStream, KTable과 같은 개념이 없다.
토픽으로 들어온 문자열 데이터 중 문자열의 길이가 5보다 큰 경우면 필터링하는 예시.
스트림즈 DSL와 다르게 필터링 로직을 담당할 클래스를 선언해주어야 한다.
public class FilterProcessor implements Processor<String, String> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
// 실질적인 로직이 들어가는 부분.
@Override
public void process(String key, String value) {
if (value.length() > 5) {
// 데이터를 다음 토폴로지로 넘어가도록 한다.
context.forward(key, value)
}
// 데이터가 처리되었음을 선언
context.commit();
}
@Override
public void close() {
}
}
// 토픽에 있는 데이터를 실시간으로 변환하여 다른 토픽에 적재하는 라이브러리.
public class FilterProcessor implements Processor<String, String> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
// 실질적인 로직이 들어가는 부분.
@Override
public void process(String key, String value) {
if (value.length() > 5) {
// 데이터를 다음 토폴로지로 넘어가도록 한다.
context.forward(key, value)
}
// 데이터가 처리되었음을 선언
context.commit();
}
@Override
public void close() {
}
}
'Kafka' 카테고리의 다른 글
Kafka MirrorMaker2 (카프카 미러메이커2) (0) | 2022.01.30 |
---|---|
Kafka의 Topic과 Partition (작성 중) (0) | 2022.01.30 |
Kafka의 ISR이란? (In-Sync-Replica) (0) | 2022.01.30 |
Kafka 토픽 정리 정책 (0) | 2022.01.30 |
Kafka Connect (카프카 커넥트) (0) | 2022.01.30 |