Yebali

Kafka Producer/Consumer 만들기 (feat. Kotlin) 본문

Kafka

Kafka Producer/Consumer 만들기 (feat. Kotlin)

예발이 2022. 2. 1. 17:33

카프카를 실습하기 위한 간단한 프로듀서와 컨슈머를 만들어서 실험해보자.

토픽 만들기

3개의 파티션을 가진 'test'라는 이름의 토픽을 생성한다.

bin/kafka-topics.sh \
--create \
--bootstrap-server 13.124.252.159:9092 \
--partitions 3 \
--topic test

프로듀서

Kotlin프로젝트를 새로 만들어 아래 의존성을 추가해준다.

dependencies {
    implementation("org.apache.kafka:kafka-clients:2.8.1")
    implementation("org.slf4j:slf4j-simple:1.7.35")
    testImplementation(kotlin("test"))
}

 

"testMessage"라는 메시지를 브로커로 보내는 프로듀서 코드이다.

class SimpleProducer {
    companion object {
        const val TOPIC_NAME = "test"
        const val BOOTSTRAP_SERVERS = "13.124.252.159:9092"
    }
}

fun main(args: Array<String>) {
    val logger = LoggerFactory.getLogger(SimpleProducer.javaClass)

    // 프로듀서 설정
    val configs = Properties()
    configs[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = SimpleProducer.BOOTSTRAP_SERVERS
    configs[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
    configs[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name

    val producer = KafkaProducer<String, String>(configs)

    val messageValue = "testMessage"
    val record = ProducerRecord<String, String>(SimpleProducer.TOPIC_NAME, messageValue)

    producer.send(record) // 즉각적인 전송이 아닌 내부에서 배치로 묶어서 브로커에 전송함.
    logger.info("{}", record)
    producer.flush() // 프로듀서 내부 버퍼에 가지고 있는 레코드 배치를 브로커로 전송한다.

    producer.close()
}

해당 코드를 실행시키면 아래 사진처럼 프로듀서의 설정들을 볼 수 있다. (잘렸지만 밑에 더 많은 정보가 출력된다.)

 

코드 실행 후, CLI 명령어를 통해 "testMessage"가 카프카 브로커에 적재되었음을 확인할 수 있다.

bin/kafka-console-consumer.sh \
--bootstrap-server=13.124.252.159:9092 \
--topic test \
--from-beginning

'testMessage' 메세지가 카프카 브로커에 적재되었다.

컨슈머

프로듀서와 마찬가지로 Kotlin프로젝트를 새로 만들어 아래의 의존성을 추가해준다.

dependencies {
    implementation("org.apache.kafka:kafka-clients:2.8.1")
    implementation("org.slf4j:slf4j-simple:1.7.35")
    testImplementation(kotlin("test"))
}
class SimpleConsumer {
    companion object {
        const val TOPIC_NAME = "test"
        const val BOOTSTRAP_SERVERS = "13.124.252.159:9092"
        const val GROUP_ID = "test-group"
    }
}
fun main(args: Array<String>) {
    val configs = Properties()
    val logger = LoggerFactory.getLogger(SimpleConsumer.javaClass)

    // 카프카 컨슈머 설정
    configs[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = SimpleConsumer.BOOTSTRAP_SERVERS
    configs[ConsumerConfig.GROUP_ID_CONFIG] = SimpleConsumer.GROUP_ID
    configs[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
    configs[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name

    val consumer = KafkaConsumer<String, String>(configs)
    consumer.subscribe(listOf(SimpleConsumer.TOPIC_NAME))

    while (true) {
        val records = consumer.poll(Duration.ofSeconds(1)) // 카프카 브로커에 있는 데이터를 가져온다.
        records.map {
            logger.info("{}", it)
        }
    }
}

해당 코드를 실행시키고, 프로듀서의 코드를 실행하면 아래 사진처럼 프로듀서에서 보낸 메시지를 컨슈머가 수신한 것을 알 수 있다.

 

브로커는 괜찮을까?

위의 실습에서 사용한 브로커의 사양은 EC2 t3a.nano (vCPU 2개, RAM 0.5G, vRAM 4G, SSD 20G) 사양이라 처리량이 많아지면 병목이 생길지 안 생길지 궁금했다.

 

프로듀서가 메시지를 계속 보내게끔 코드를 수정하고 테스트를 해봤다.

 

class SimpleProducer {
    companion object {
        const val TOPIC_NAME = "test"
        const val BOOTSTRAP_SERVERS = "13.124.252.159:9092"
    }
}

fun main(args: Array<String>) {
    val logger = LoggerFactory.getLogger(SimpleProducer.javaClass)

    val configs = Properties()

    configs[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = SimpleProducer.BOOTSTRAP_SERVERS
    configs[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
    configs[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name

    val producer = KafkaProducer<String, String>(configs)

    while (true) {
        val messageValue = UUID.randomUUID().toString()
        val record = ProducerRecord<String, String>(SimpleProducer.TOPIC_NAME, messageValue)

        producer.send(record) // 즉각적인 전송이 아닌 내부에서 배치로 묶어서 브로커에 전송함.
        producer.flush() // 프로듀서 내부 버퍼에 가지고 있는 레코드 배치를 브로커로 전송한다.
    }

    producer.close()
}

코드 수정 후 컨슈머 콘솔 창에서는 수많은 메시지가 찍혔다.
그 와중에 메시지에 별도의 키를 설정하지 않았기 때문에 메시지들은 각 partition에 Round-Robin방식으로 적재되고, 컨슈머는 차근차근 poll() 받고 있다.

 인스턴스는 평균 사용률 5~10% 내외로 아주 평화로웠다.

iostat를 통해 확인한 파일 입출력도 가끔 튀긴 해도 크게 리소스를 많이 먹진 않는 것 같았다.

메시지의 크기가 커지고 토픽의 개수, 프로듀서/컨슈머의 개수가 커진다면 ec2를 좀 더 괴롭힐 수 있을 것 같다.

 

소스코드

Producer

 

GitHub - yebali/simple-kafka-producer

Contribute to yebali/simple-kafka-producer development by creating an account on GitHub.

github.com

Consumer

 

GitHub - yebali/simple-kafka-consumer

Contribute to yebali/simple-kafka-consumer development by creating an account on GitHub.

github.com

 

'Kafka' 카테고리의 다른 글

Kafka Streams 사용하기 (feat. Kotlin)  (0) 2022.02.02
Kafka 토픽 삭제하기  (0) 2022.02.01
Kafka 토픽 생성하기  (0) 2022.02.01
Kafka 설치하기 (feat. EC2)  (0) 2022.01.31
Spring Kafka (스프링 카프카)  (0) 2022.01.30