Kafka
Kafka Streams 사용하기 (feat. Kotlin)
예발이
2022. 2. 2. 00:13
카프카 스트림즈는 토픽에 적재된 데이터를 실시간으로 변환하여 다른 토픽에 적재하는 라이브러리이다.
'정확히 한번 전달', '장애 허용 시스템'등 데이터 처리 안정성이 매우 뛰어나 카프카 클러스터를 운영하면서 실시간 스트림 처리를 해야 한다면 카프카 스트림즈를 1순위로 고려하는 것을 권장하고 있다.
예제에서 만들어볼 카프카 스트림즈는
0~9 사이의 임의의 숫자들이 적재된 'random.number' 토픽으로부터 메시지를 읽어
5보다 큰 숫자들만 'over5.number' 토픽에 적재하는 기능을 하는 스트림즈 애플리케이션이다.
토픽 생성하기
bin/kafka-topics.sh \
--create \
--bootstrap-server 13.124.252.159:9092 \
--partitions 3 \
--topic random.number
bin/kafka-topics.sh \
--create \
--bootstrap-server 13.124.252.159:9092 \
--partitions 3 \
--topic over5.number
스트림즈 애플리케이션 만들기
아래의 'kafka-streams' 의존성을 추가한다.
dependencies {
implementation("org.apache.kafka:kafka-streams:2.8.1")
implementation("org.slf4j:slf4j-simple:1.7.35")
}
스트림즈 DSL기능 중 'filter()'를 사용하여 5보다 큰 메시지 값만 'over5.number' 토픽에 적재한다.
class SimpleStreams {
companion object {
const val APPLICATION_NAME = "streams-application" // 스트림즈 애플리이션은 아이디 값을 기준으로 병렬처리 한다.
const val BOOTSTRAP_SERVERS = "13.124.252.159:9092"
const val SOURCE_TOPIC = "random.number"
const val SINK_TOPIC = "over5.number"
}
}
fun main(args: Array<String>) {
val props = Properties()
val logger = LoggerFactory.getLogger(SimpleStreams.javaClass)
props[StreamsConfig.APPLICATION_ID_CONFIG] = SimpleStreams.APPLICATION_NAME
props[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = SimpleStreams.BOOTSTRAP_SERVERS
// 스트림 처리를 위한 메시지 키 직렬화/역직렬화 방식
props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String()::class.java
// 스트림 처리를 위한 메시지 값 직렬화/역직렬화 방식
props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.String()::class.java
val builder = StreamsBuilder()
val stream = builder.stream<String, String>(SimpleStreams.SOURCE_TOPIC)
// 메시지 값이 5 이상이면 SINK_TOPIC에 저장
val filteredStream = stream.filter { _, value ->
logger.info("{}", value)
Integer.parseInt(value) > 5
}
filteredStream.to(SimpleStreams.SINK_TOPIC)
val streams = KafkaStreams(builder.build(), props)
streams.start()
}
프로듀서
0~9 사이의 숫자를 임의로 만들어 'random.number'토픽에 저장한다.
class SimpleProducer {
companion object {
const val TOPIC_NAME = "random.number"
const val BOOTSTRAP_SERVERS = "13.124.252.159:9092"
}
}
fun main(args: Array<String>) {
val configs = Properties()
val logger = LoggerFactory.getLogger(SimpleProducer.javaClass)
configs[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = SimpleProducer.BOOTSTRAP_SERVERS
configs[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
configs[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
val producer = KafkaProducer<String, String>(configs)
while (true) {
val messageValue = (0..10).random().toString()
val record = ProducerRecord<String, String>(SimpleProducer.TOPIC_NAME, messageValue)
producer.send(record) // 즉각적인 전송이 아닌 내부에서 배치로 묶어서 브로커에 전송함.
logger.info("{}", record)
producer.flush() // 프로듀서 내부 버퍼에 가지고 있는 레코드 배치를 브로커로 전송한다.
}
producer.close()
}
컨슈머
5보다 큰 메시지 값들이 저장된 'over5.number'토픽의 데이터를 받아온다.
class SimpleConsumer {
companion object {
const val TOPIC_NAME = "over5.number"
const val BOOTSTRAP_SERVERS = "13.124.252.159:9092"
const val GROUP_ID = "test-group"
}
}
fun main(args: Array<String>) {
val configs = Properties()
val logger = LoggerFactory.getLogger(SimpleConsumer.javaClass)
configs[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = SimpleConsumer.BOOTSTRAP_SERVERS
configs[ConsumerConfig.GROUP_ID_CONFIG] = SimpleConsumer.GROUP_ID
configs[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
configs[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
val consumer = KafkaConsumer<String, String>(configs)
consumer.subscribe(listOf(SimpleConsumer.TOPIC_NAME))
while (true) {
val records = consumer.poll(Duration.ofSeconds(1)) // 카프카 브로커에 있는 데이터를 가져온다.
records.map {
logger.info("{}", it)
}
}
}
결과
프로듀서는 0~9 사이의 임의의 숫자를 열심히 만들어서 'random.number'토픽에 적재한다.
스트림즈 어플리케이션은 'random.number'토픽에 적재된 메시지를 가져와 처리한다.
컨슈머는 스트림즈 어플리케이션이 'random.number' 토픽에 적재된 메시지를 처리하여 'over5.number'토픽에 적재한 메시지를 가져온다. 컨슈머가 가져오는 값들은 메시지 값(value)이 5보다 큰 것을 볼 수 있다.