일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 | 31 |
- API
- kafka
- mirror maker2
- entity graph
- Spring Data JPA
- AWS
- topic생성
- consumer
- CI
- Entity
- Spring JPA
- offsetdatetime
- bean
- Streams
- git
- centos7
- transactionaleventlistener
- K8s
- CodePipeline
- Kubernetes
- cd
- Kotlin
- spring kafka
- JPA
- PAGING
- QueryDSL
- producer
- ECS
- mysql
- spring
- Today
- Total
Yebali
Kafka Producer/Consumer 만들기 (feat. Kotlin) 본문
카프카를 실습하기 위한 간단한 프로듀서와 컨슈머를 만들어서 실험해보자.
토픽 만들기
3개의 파티션을 가진 'test'라는 이름의 토픽을 생성한다.
bin/kafka-topics.sh \
--create \
--bootstrap-server 13.124.252.159:9092 \
--partitions 3 \
--topic test
프로듀서
Kotlin프로젝트를 새로 만들어 아래 의존성을 추가해준다.
dependencies {
implementation("org.apache.kafka:kafka-clients:2.8.1")
implementation("org.slf4j:slf4j-simple:1.7.35")
testImplementation(kotlin("test"))
}
"testMessage"라는 메시지를 브로커로 보내는 프로듀서 코드이다.
class SimpleProducer {
companion object {
const val TOPIC_NAME = "test"
const val BOOTSTRAP_SERVERS = "13.124.252.159:9092"
}
}
fun main(args: Array<String>) {
val logger = LoggerFactory.getLogger(SimpleProducer.javaClass)
// 프로듀서 설정
val configs = Properties()
configs[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = SimpleProducer.BOOTSTRAP_SERVERS
configs[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
configs[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
val producer = KafkaProducer<String, String>(configs)
val messageValue = "testMessage"
val record = ProducerRecord<String, String>(SimpleProducer.TOPIC_NAME, messageValue)
producer.send(record) // 즉각적인 전송이 아닌 내부에서 배치로 묶어서 브로커에 전송함.
logger.info("{}", record)
producer.flush() // 프로듀서 내부 버퍼에 가지고 있는 레코드 배치를 브로커로 전송한다.
producer.close()
}
해당 코드를 실행시키면 아래 사진처럼 프로듀서의 설정들을 볼 수 있다. (잘렸지만 밑에 더 많은 정보가 출력된다.)
코드 실행 후, CLI 명령어를 통해 "testMessage"가 카프카 브로커에 적재되었음을 확인할 수 있다.
bin/kafka-console-consumer.sh \
--bootstrap-server=13.124.252.159:9092 \
--topic test \
--from-beginning
컨슈머
프로듀서와 마찬가지로 Kotlin프로젝트를 새로 만들어 아래의 의존성을 추가해준다.
dependencies {
implementation("org.apache.kafka:kafka-clients:2.8.1")
implementation("org.slf4j:slf4j-simple:1.7.35")
testImplementation(kotlin("test"))
}
class SimpleConsumer {
companion object {
const val TOPIC_NAME = "test"
const val BOOTSTRAP_SERVERS = "13.124.252.159:9092"
const val GROUP_ID = "test-group"
}
}
fun main(args: Array<String>) {
val configs = Properties()
val logger = LoggerFactory.getLogger(SimpleConsumer.javaClass)
// 카프카 컨슈머 설정
configs[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = SimpleConsumer.BOOTSTRAP_SERVERS
configs[ConsumerConfig.GROUP_ID_CONFIG] = SimpleConsumer.GROUP_ID
configs[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
configs[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
val consumer = KafkaConsumer<String, String>(configs)
consumer.subscribe(listOf(SimpleConsumer.TOPIC_NAME))
while (true) {
val records = consumer.poll(Duration.ofSeconds(1)) // 카프카 브로커에 있는 데이터를 가져온다.
records.map {
logger.info("{}", it)
}
}
}
해당 코드를 실행시키고, 프로듀서의 코드를 실행하면 아래 사진처럼 프로듀서에서 보낸 메시지를 컨슈머가 수신한 것을 알 수 있다.
브로커는 괜찮을까?
위의 실습에서 사용한 브로커의 사양은 EC2 t3a.nano (vCPU 2개, RAM 0.5G, vRAM 4G, SSD 20G) 사양이라 처리량이 많아지면 병목이 생길지 안 생길지 궁금했다.
프로듀서가 메시지를 계속 보내게끔 코드를 수정하고 테스트를 해봤다.
class SimpleProducer {
companion object {
const val TOPIC_NAME = "test"
const val BOOTSTRAP_SERVERS = "13.124.252.159:9092"
}
}
fun main(args: Array<String>) {
val logger = LoggerFactory.getLogger(SimpleProducer.javaClass)
val configs = Properties()
configs[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = SimpleProducer.BOOTSTRAP_SERVERS
configs[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
configs[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
val producer = KafkaProducer<String, String>(configs)
while (true) {
val messageValue = UUID.randomUUID().toString()
val record = ProducerRecord<String, String>(SimpleProducer.TOPIC_NAME, messageValue)
producer.send(record) // 즉각적인 전송이 아닌 내부에서 배치로 묶어서 브로커에 전송함.
producer.flush() // 프로듀서 내부 버퍼에 가지고 있는 레코드 배치를 브로커로 전송한다.
}
producer.close()
}
코드 수정 후 컨슈머 콘솔 창에서는 수많은 메시지가 찍혔다.
그 와중에 메시지에 별도의 키를 설정하지 않았기 때문에 메시지들은 각 partition에 Round-Robin방식으로 적재되고, 컨슈머는 차근차근 poll() 받고 있다.
인스턴스는 평균 사용률 5~10% 내외로 아주 평화로웠다.
iostat를 통해 확인한 파일 입출력도 가끔 튀긴 해도 크게 리소스를 많이 먹진 않는 것 같았다.
메시지의 크기가 커지고 토픽의 개수, 프로듀서/컨슈머의 개수가 커진다면 ec2를 좀 더 괴롭힐 수 있을 것 같다.
소스코드
GitHub - yebali/simple-kafka-producer
Contribute to yebali/simple-kafka-producer development by creating an account on GitHub.
github.com
GitHub - yebali/simple-kafka-consumer
Contribute to yebali/simple-kafka-consumer development by creating an account on GitHub.
github.com
'Kafka' 카테고리의 다른 글
Kafka Streams 사용하기 (feat. Kotlin) (0) | 2022.02.02 |
---|---|
Kafka 토픽 삭제하기 (0) | 2022.02.01 |
Kafka 토픽 생성하기 (0) | 2022.02.01 |
Kafka 설치하기 (feat. EC2) (0) | 2022.01.31 |
Spring Kafka (스프링 카프카) (0) | 2022.01.30 |