Kafka - Kafka Streams API(카프카 스트림즈) - 1

2021. 4. 24. 02:49 Apache Kafka/Apache Kafka

Kafka - Kafka Streams API(카프카 스트림즈)

 

카프카는 대규모 메시지를 저장하고 빠르게 처리하기 위해 만들어진 제품이다. 초기 사용 목적과는 다른 뛰어난 성능에 일련의 연속된 메시지인 스트림을 처리하는 데도 사용이 되기 시작했다. 이러한 스트림을 카프카는 Kafka Streams API를 통해 제공한다.

 

설명하기 앞서 우선 스트림 프로세싱과 배치 프로세싱의 차이점이란 무엇일까? 스트림 프로세싱(Stream Processing)은 데이터들이 지속적으로 유입되고 나가는 과정에서 이 데이터에 대한 일련의 처리 혹은 분석을 수행하는 것을 의미한다. 즉, 스트림 프로세싱은 실시간 분석(Real Time Analysis)이라고 불리기도 한다. 스트림 프로세싱과는 대비되는 개념으로 배치(Batch)처리 또는 정적 데이터(Data-at-rest)처리를 들 수 있다. 배치 및 정적 데이터 처리란 위의 스트림 프로세싱과는 다르게 데이터를 한번에 특정 시간에 처리한다라는 특징이 있다. 주로 사용자의 요청이 몰리지 않는 새벽 시간대에 많이 수행하기도 한다.(물론 무조건 그렇다고는 할 수 없다) 그렇지만 사실 뭐가 좋고 나쁨은 이야기 할 수 없다. 스트림과 배치의 서로의 장단점이 있기 때문이다. 하지만 요즘은 역시나 실시간 데이터 처리가 각광받고 있는 사실은 숨길 수 없다.

 

<특정 이벤트 발생시 바로바로 애플리케이션에서 데이터처리를 하고 있다.>

 

그렇다면 스트림 프로세싱의 장점은 무엇이 있을까? 우선은 애플리케이션이 이벤트에 즉각적으로 반응을 하기 때문에 이벤트 발생과 분석,조치에 있어 거의 지연시간이 발생하지 않는다. 또한 항상 최신의 데이터를 반영한다라는 특징이 있다. 그리고 데이터를 어디에 담아두고 처리하지 않기 때문에 대규모 공유 데이터베이스에 대한 요구를 줄일 수 있고, 인프라에 독립적인 수행이 가능하다.

 

 

상태 기반과 무상태 스트림 처리

스트림 프로세싱에는 상태 기반과 무상태 스트림 처리가 있다. 차이점이란 실시간 데이터 처리를 위하여 이전에 분석된 데이터의 결과가 필요한지이다. 이렇게 상태를 유지할 스트림 프로세싱은 이벤트를 처리하고 그 결과를 저장할 상태 저장소가 필요하다. 이와는 반대로 무상태 스트림 처리는 이전 스트림의 처리 결과와 관계없이 현재 데이터로만 처리를 한다.

 

카프카 스트림즈(Kafka Streams API)

 

카프카 스트림즈는 카프카에 저장된 데이터를 처리하고 분석하기 위해 개발된 클라이언트 라이브러리이다. 카프카 스트림즈는 이벤트 시간과 처리 시간을 분리해서 다루고 다양한 시간 간격 옵션을 지원하기에 실시간 분석을 간단하지만 효율적으로 진행할 수 있다. 우선 카프카 스트림즈에 대해 설명하기 전에 용어 정리를 할 필요가 있을 것같다.

 

용어 

설명 

스트림(Stream) 

스트림은 카프카 스트림즈 API를 사용해 생성된 토폴로지로, 끊임없이 전달되는 데이터 세트를 의미한다. 스트림에 기록되는 단위는 key-value 형태이다. 

스트림 처리 애플리케이션(Stream Processing Application) 

카프카 스트림 클라이언트를 사용하는 애플리케이션으로서, 하나 이상의 프로세서 토폴로지에서 처리되는 로직을 의미한다. 프로세서 토폴로지는 스트림 프로세서가 서로 연결된 그래프를 의미한다. 

스트림 프로세서(Stream Processor) 

프로세서 토폴로지를 이루는 하나의 노드를 말하여 여기서 노드들은 프로세서 형상에 의해 연결된 하나의 입력 스트림으로부터 데이터를 받아서 처리한 다음 다시 연결된 프로세서에 보내는 역할을 한다. 

 소스 프로세서(Source Processor)

위쪽으로 연결된 프로세서가 없는 프로세서를 말한다. 이 프로세서는 하나 이상의 카프카 토픽에서 데이터 레코드를 읽어서 아래쪽 프로세서에 전달한다. 

싱크 프로세서(Sink Processor) 

토폴로지 아래쪽에 프로세서 연결이 없는 프로세서를 뜻한다. 상위 프로세서로부터 받은 데이터 레코드를 카프카 특정 토픽에 저장한다. 

 

 

카프카 스트림즈 아키텍쳐

카프카 스트림즈에 들어오는 데이터는 카프카 토픽의 메시지이다. 각 스트림 파티션은 카프카의 토픽 파티션에 저장된 정렬된 메시지이고, 해당 메시지는 key-value형태이다. 또한 해당 메시지의 키를 통해 다음 스트림(카프카 토픽)으로 전달된다. 위의 그림에서 보듯 카프카 스트림즈는 입력 스트림의 파티션 개수만큼 태스크를 생성한다. 각 태스크에는 입력 스트림(카프카 토픽) 파티션들이 할당되고, 이것은 한번 정해지면 입력 토픽의 파티션에 변화가 생기지 않는 한 변하지 않는다. 마지 컨슈머 그룹 안의 컨슈머들이 각각 토픽 파티션을 점유하는 것과 비슷한 개념이다.

 

 

자바를 이용한 간단한 파이프 예제 프로그램

지금부터 진행할 예제는 이전 포스팅에서 구성했던 카프카 클러스터 환경에서 진행한다. 만약 클러스터 구성이 되어 있지않다면 밑의 링크를 참조해 구성하면 될것 같다.

▶︎▶︎▶︎Kafka - Kafka(카프카) cluster(클러스터) 구성 및 간단한 CLI사용법

스프링 부트 애플리케이션 하나를 생성한다. 그리고 필요한 라이브러리 의존성을 추가한다. 로그를 사용하기 위하여 롬복 라이브러리를 추가하였다. 혹시 이클립스 환경에서 롬복을 사용하는 방법을 알고 싶다면 밑의 링크를 참조하자.

▶︎▶︎▶︎Mac OS - Eclipse & Lombok(롬복 사용방법)

 

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.4</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>2.0.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.0.1</version>
        </dependency>

 

이번에 만들 프로그램은 간단히 한쪽 토픽에 입력된 값을 다른 쪽 토픽으로 옮기는 역할을 수행한다. 

 

import java.util.Properties;
import java.util.concurrent.CountDownLatch;
 
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
 
import lombok.extern.slf4j.Slf4j;
 
@Slf4j
public class Pipe {
    
    public static void main(String[] args) {
        
        /*
         * 카프카 스트림 파이프 프로세스에 필요한 설정값
         * StreamsConfig의 자세한 설정값은
         * https://kafka.apache.org/10/documentation/#streamsconfigs 참고
         */
        Properties props = new Properties();
        //카프카 스트림즈 애플리케이션을 유일할게 구분할 아이디
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
        //스트림즈 애플리케이션이 접근할 카프카 브로커정보
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        //데이터를 어떠한 형식으로 Read/Write할지를 설정(키/값의 데이터 타입을 지정) - 문자열
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        //데이터의 흐름으로 구성된 토폴로지를 정의할 빌더
        final StreamsBuilder builder = new StreamsBuilder();
        
        //streams-*input에서 streams-*output으로 데이터 흐름을 정의한다.
        /*
         * KStream<String, String> source = builder.stream("streams-plaintext-input");
           source.to("streams-pipe-output");
         */
        builder.stream("streams-plaintext-input").to("streams-pipe-output");
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        //만들어진 토폴로지 확인
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        try {
          streams.start();
          System.out.println("topology started");
      
        } catch (Throwable e) {
          System.exit(1);
        }
        System.exit(0);
    }
}

우선 Properties 객체를 이용하여 카프카 스트림즈가 사용할 설정값을 입력한다. 우선 StreamsConfig.APPLICATION_ID_CONFIG는 카프카 클러스터 내에서 스트림즈 애플리케이션을 구분하기 위한 유일한 아이디 값을 설정하기 위한 값이다. 그리고 그 밑에 카프카 클러스터 구성이 된 인스턴스를 리스트로 작성한다. 또한 위에서 설명했듯이 카프카 스트림즈는 키-값 형태로 데이터의 흐름이 이루어지기에 해당 키-값을 어떠한 타입으로 직렬화 할 것인지 선택한다. 현재는 String 타입으로 지정하였다. 그 다음은 스트림 토폴로지를 구성해준다. 토폴로지는 StreamsBuilder 객체를 통하여 구성한다. 위의 소스는 "streams-plaintext-input"이라는 토픽에서 데이터를 읽어 "streams-pipe-output"이라는 토픽으로 데이터를 보내는 토폴로지를 구성하였다. 최종적으로 StreamsBuilder.build()를 호출하여 토폴로지 객체를 만든다. 위의 Topology.describe()를 호출하면 해당 토폴로지가 어떠한 구성으로 이루어졌는지 로그로 상세하게 볼 수 있다.

 

22:07:24.234 [main] INFO com.kafka.exam.streams.Pipe - Topology info = Topologies:

   Sub-topology: 0

    Source: KSTREAM-SOURCE-0000000000 (topics: [streams-plaintext-input])

      --> KSTREAM-SINK-0000000001

    Sink: KSTREAM-SINK-0000000001 (topic: streams-pipe-output)

      <-- KSTREAM-SOURCE-0000000000

 

그 다음 KafkaStreams 객체를 생성한다. 매개변수로 토폴로지와 설정 객체를 전달한다.  마지막으로 KafkaStreams.start();를 호출하여 카프카 스트림을 시작한다. 해당 예제에서는 명시하지 않았지만 스트림즈를 종료하려면 close() 해주는 코드도 삽입해야한다.

 

 

카프카가 제공하는 콘솔 프로듀서,컨슈머를 이용하여 위의 스트림즈 코드를 수행해보았다. input 토픽에서 스트림즈 애플리케이션이 메시지를 꺼내서 output 토픽으로 잘 전달한것을 볼 수 있다.(>"hi hello") 컨슈머 실행시 세팅한 설정은 직관적으로 해석 가능하므로 따로 설명하지는 않는다.

 

방금은 단순히 메시지를 받아 다른 쪽 토픽으로 전달만 하는 예제이지만 다음 해볼 예제는 중간에 데이터를 처리하여 output 토픽으로 보내는 행 분리 예제 프로그램이다.

 

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
 
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
 
import lombok.extern.slf4j.Slf4j;
 
@Slf4j
public class LineSplit {
    public static void main(String[] args) {
        /*
         * 카프카 스트림 파이프 프로세스에 필요한 설정값
         * StreamsConfig의 자세한 설정값은
         * https://kafka.apache.org/10/documentation/#streamsconfigs 참고
         */
        Properties props = new Properties();
        //카프카 스트림즈 애플리케이션을 유일할게 구분할 아이디
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
        //스트림즈 애플리케이션이 접근할 카프카 브로커정보
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        //데이터를 어떠한 형식으로 Read/Write할지를 설정(키/값의 데이터 타입을 지정) - 문자열
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        //데이터의 흐름으로 구성된 토폴로지를 정의할 빌더
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<String, String> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
            .to("streams-linesplit-output");
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        //만들어진 토폴로지 확인
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
 
        try {
          streams.start();
          System.out.println("topology started");
        } catch (Throwable e) {
          System.exit(1);
        }
        System.exit(0);
    }
}

설정 값은 동일하다 하지만 토폴로지에 로직이 추가된 것을 볼 수 있다. 우선 "streams-plaintext-input" 토픽으로 들어온 메시지를 KStream객체로 만들어준다. 그리고 flatMapValues()를 이용하여 데이터를 공백 단위로 쪼개는 것을 볼 수 있다. 여기서 flatMapValues()를 쓴 이유는 여기서 전달되는 value는 하나의 스트링 객체인데 이것을 List<String>으로 변환을 하였다. 이것을 flatMapValues()를 이용해 리스트의 각 요소를 스트링으로 flat하는 메소드이다. 즉, 메시지를 받아서 flatMapValues()를 통해 새로운 데이터 값이 만들어 졌다. 그러면 .to()에 전달되는 데이터 스트림은 공백단위로 짤린 단어 스트링이 전달될 것이다. 이전 예제와는 다르게 단순 데이터 전달만이 아니라 전달 이전에 적절한 처리를 하나 추가 해준 것이다. 만약 키와 값 모두를 새롭게 만들어서 사용하고 싶다면 flatMap() 메소드를 이용하면 된다. 그 밖에 메소드들을 알고 싶다면 공식 홈페이지를 참고하길 바란다.

 

22:49:08.941 [main] INFO com.kafka.exam.streams.LineSplit - Topology info = Topologies:

   Sub-topology: 0

    Source: KSTREAM-SOURCE-0000000000 (topics: [streams-plaintext-input])

      --> KSTREAM-FLATMAPVALUES-0000000001

    Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])

      --> KSTREAM-SINK-0000000002

      <-- KSTREAM-SOURCE-0000000000

    Sink: KSTREAM-SINK-0000000002 (topic: streams-linesplit-output)

      <-- KSTREAM-FLATMAPVALUES-0000000001

 

토폴로지 정보를 보아도 중간에 Processor라는 이름으로 하나의 처리 프로세스가 추가된 것을 볼 수 있다.

 

 

hi hello 라는 메시지를 보냈는데 hi / hello 로 짤려서 출력된 것을 볼 수 있다. 

 

필자는 챗봇을 개발하고 있는데 카프카 스트림즈를 이용하여 간단히 사용자 질문 로그들을 분석하여 형태소 분리된 형태로 데이터를 저장할 수도 있을 것 같아서 간단히 예제로 짜보았다. NoriAnalyzer는 필자가 Nori 형태소 분석기를 이용한 간단히 형태소분석 유틸 클래스를 만든것이다.

 

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
 
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
 
import lombok.extern.slf4j.Slf4j;
import rnb.analyzer.nori.analyzer.NoriAnalyzer;
 
@Slf4j
public class LineSplit {
    public static void main(String[] args) {
        /*
         * 카프카 스트림 파이프 프로세스에 필요한 설정값
         * StreamsConfig의 자세한 설정값은
         * https://kafka.apache.org/10/documentation/#streamsconfigs 참고
         */
        Properties props = new Properties();
        //카프카 스트림즈 애플리케이션을 유일할게 구분할 아이디
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
        //스트림즈 애플리케이션이 접근할 카프카 브로커정보
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        //데이터를 어떠한 형식으로 Read/Write할지를 설정(키/값의 데이터 타입을 지정) - 문자열
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        //데이터의 흐름으로 구성된 토폴로지를 정의할 빌더
        final StreamsBuilder builder = new StreamsBuilder();
        //Nori 형태소 분석기를 이용한 유틸클래스
        NoriAnalyzer analyzer = new NoriAnalyzer();
        
        KStream<String, String> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(analyzer.analyzeForString(value).split(" ")))
            .to("streams-linesplit-output");
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        //만들어진 토폴로지 확인
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
 
        try {
          streams.start();
          System.out.println("topology started");
        } catch (Throwable e) {
          System.exit(1);
        }
    }
}

 

 

이렇게 추후에는 형태소분리된 모든 단어가 아니라 특정 명사만 추출하여 사용자들의 질문중 가장 많이 포함된 명사들을 뽑아내 데이터를 분석할 수도 있을 것같다.

 

지금까지의 예제는 바로 무상태 스트림 프로세싱이다. 다음 예제는 이전 데이터 처리에 대한 상태를 참조하여 데이터를 처리하는 단어 빈도수 세기 프로그램 예제이다. 

 

import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
 
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.state.KeyValueStore;
 
import lombok.extern.slf4j.Slf4j;
import rnb.analyzer.nori.analyzer.NoriAnalyzer;
 
@Slf4j
public class WordCounter {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
      
        final StreamsBuilder builder = new StreamsBuilder();
        
        NoriAnalyzer analyzer = new NoriAnalyzer();
      
        KStream<String, String> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(analyzer.analyzeForString(value).split(" ")))
                //return하는 value가 키가 된다.
                .groupBy(new KeyValueMapper<String, String, String>() {
                  @Override
                  public String apply(String key, String value) {
                    log.info("key = {},value = {}",key,value);
                    return value;
                  }
                })
                //count()를 호출하여 해당 키값으로 몇개의 요소가 있는지 체크한다. 그리고 해당 데이터를 스토어에(KeyValueStore<Bytes,byte[]> counts-store) 담고
		  //변경이 있는 KTable에 대해서만 결과값을 리턴한다.

                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
                .toStream()
                .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
      
        final Topology topology = builder.build();
        System.out.println(topology.describe());
      
        final KafkaStreams streams = new KafkaStreams(topology, props);
      
        try {
          streams.start();
        } catch (Throwable e) {
          System.exit(1);
        }
    }
}

설정값은 동일하다. 조금 달라진 것은 데이터처리부이다. 우선은 입력받은 데이터를 형태소분리하여 토큰 단위로 스트림을 만든다. 그리고 groupBy를 이용하여 토큰값을 키로 하여 스트림을 그룹핑한다. 그 다음 count()를 통해 같은 키값으로 몇개의 요소가 있는 지를 KeyValueStore에 담고, 만약 키값스토어의 값이 변경이 있을 때만 다운스트림으로 내려준다.(key == null은 무시한다) 처음에는 해당 단어:빈도수가 출력이 되지만 이후에는 이 단어가 또 들어오지 않는다면 출력되지 않고 해당 단어가 들어오면 키값 저장소에 빈도수가 변경이 되므로 다운스트림으로 내려준다. 이말은 무엇이냐면 즉, 키값스토어가 이전 데이터 처리의 상태를 담고 있는 저장소가 되어 이후 데이터에서 참조되어 사용되는 것이다.(사실 더 자세한 것은 API문서를 봐야할 듯하다.)

 

 

 

일부 출력이 안된 것들은 토픽의 버퍼가 원하는 용량에 도달하지 못해 아직 출력하지 못한 데이터들이다. 이렇게 상태를 유지하여 스트림 프로세스를 구성할 수도 있다. 

 

지금까지 아주 간단하게 카프카 스트림즈 API에 대해 다루었다. 사실 실무에서 이용할 수 있을 만큼의 예제는 아니지만 카프카가 이렇게 실시간 데이터 스트림에도 이용될 수 있다는 것을 알았다면 추후에 API문서를 참조하여 카프카를 응용할 수 있을 것같다.

 

출처: https://coding-start.tistory.com/138?category=790331 [코딩스타트]