Kafka - Spring Cloud Stream Kafka Streams API(스프링 클라우드 스트림 카프카 스트림즈 API)

2021. 4. 24. 03:02 Apache Kafka/Apache Kafka

kafka is a distributed streaming platform

이번 포스팅은 Spring Cloud Stream 환경에서의 kafka Streams API입니다. 물론 이전 포스팅들에서 자바코드로 카프카 스트림즈를 다루어봤지만 이번에는 스프링 클라우드 스트림즈 환경에서 진행합니다. 카프카 스트림즈에 대한 설명은 이전 포스팅에서 진행하였기에 개념적인 설명은 하지 않고 코드레벨로 다루어보겠습니다. 혹시나 카프카 스트림즈를 모르시는 분이 있으시다면 아래 링크를 참조하시길 바랍니다.

지금부터 진행될 예제 및 설명은 모두 Spring Cloud Stream Reference를 참조하였습니다. 번역 중 오역이 있을 수 있습니다.

▶︎▶︎▶︎Kafka - Kafka Streams API(카프카 스트림즈) - 1

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>kafka-streams-exam</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-streams-exam</name>
    <description>Demo project for Spring Boot</description>
 
    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Greenwich.SR1</spring-cloud.version>
    </properties>
 
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
 
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
 
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
 
</project>

 

의존성을 추가해줍니다.

@SpringBootApplication
@EnableBinding(KStreamProcessor.class)
public class WordCountProcessorApplication {
 
    @StreamListener("input")
    @SendTo("output")
    public KStream<?, WordCount> process(KStream<?, String> input) {
        return input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, value) -> value)
                .windowedBy(TimeWindows.of(5000))
                .count(Materialized.as("WordCounts-multi"))
                .toStream()
                .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
    }
 
    public static void main(String[] args) {
        SpringApplication.run(WordCountProcessorApplication.class, args);
    }
}

해당 코드를 레퍼런스에서 소개하는 카프카 스트림즈의 예제코드이다 토픽에서 메시지를 받아서 적절하게 데이터를 가공하여 특정 토픽으로 데이터를 내보내고 있다 내부 로직은 마치 Java1.8의 Stream와 비슷한 코드같이 보인다 메소드를 보면 매개변수와 반환타입이 모두 KStream객체다 나가고 들어노는 객체의 타입은 하나로 고정되어있으므로 비지니스로직에 조금이라도 더 집중할 수 있을 것 같다.

 

Configuration Option

Kafka Streams Properties는 모두 spring.cloud.stream.kafka.streams.binder 접두어가 붙는다. 

 

  • brokers - broker URL
  • zkNodes - zookeeper URL
  • serdeError - 역 직렬화 오류 처리 유형. logAndContinue,logAndFail,sendToDlq 
  • applicationId - 애플리케이션에서 사용하는 카프카 스트림 아이디값. 모든 카프카 스트림의 아이디값은 유일해야한다.

Producer Properties

  • keySerde - key serde, 키의 직렬화 옵션(default value = none)
  • valueSerde -  value serde, 값의 직렬화 옵션 (default value = none)
  • userNativeEncoding - native 인코딩 활성화 플래그(default value = false)

Consumer Properties

  • keySerde - key serde, 키의 역직렬화 옵션(default value = none)
  • valueSerde - value serde, 값의 역직렬화 옵션(default value = none)
  • materializedAs - KTable 타입을 사용할 때 상태저장소를 구체화한다.(default value = none)
  • useNativeDecoding - native 디코드 활성화 플래그(default value = false)
  • dlqName - DLQ 토픽 이름(default value = none)

기타 다른 설정들은 레퍼런스 참고하시길 바랍니다.

/**
 * in,out channel defined
 * 즉, 카프카 토픽에 메시지를 쓰는 발신 채널과
 * 카프카 토픽에서 메시지를 읽어오는 수신 채널을 정의해주는 것이다.
 * 꼭 @Input & @Output 어노테이션을 하나씩 넣을 필요는 없다.
 * 필요한 채널수만큼 정의가능하다.
 * 
 * 런타임동안에는 스프링이 구현체를 제공해준다.
 * @author yun-yeoseong
 *
 */
public interface ExamProcessor {
    String INPUT = "exam-input";
    String OUTPUT = "exam-output";
    String OUTPUT2 = "exam-output2";
    
    /**
     * @Input 어노테이션 설명
     * 메시지 시스템에서 메시지를 읽어오는 입력채널 
     * 매겨변수로 채널이름을 넣을 수 있다. 넣지 않으면 기본으로
     * 메소드 이름으로 들어간다.
     * @return
     */
    @Input(INPUT)
    SubscribableChannel inboundChannel();
    
    @Input("streams-input")
    KStream<?, String> inboundChannel2();
    
    @Input("streams-input2")
    KStream<?, String> inboundChannel3();
    /**
     * @Output 어노테이션 설명
     * 메시지 시스템으로 메시지를 보내는 출력채널
     * 매겨변수로 채널이름을 넣을 수 있다. 넣지 않으면 기본으로
     * 메소드 이름으로 들어간다.
     * @return
     */
    @Output(OUTPUT)
    MessageChannel outboundChannel();
    
    @Output(OUTPUT2)
    MessageChannel outboundChannel2();
    
    @Output("streams-output")
    KStream<?, String> outboundChannel3();
    
}

카프카 스트림즈를 스프링 클라우드 스트림에서 사용하려면 input&output을 KStream을 반환타입으로 채널을 정의한다. 

@Slf4j
@EnableBinding(ExamProcessor.class)
@SpringBootApplication
public class KafkaStreamsExamApplication {
 
    public static void main(String[] args) {
        SpringApplication.run(KafkaStreamsExamApplication.class, args);
    }
    
    @StreamListener(target= ExamProcessor.INPUT,condition="headers['producerName']=='yeoseong'")
    public void receive(@Payload Exam exam) {
        log.info("Only Header value yeoseong = {}",exam.toString());
    }
    
    @StreamListener("streams-input")
    @SendTo("streams-output")
    public KStream<?, String> streams(KStream<?, String> input){
        return input.flatMapValues(new ValueMapper() {
            @Override
            public Object apply(Object value) {
                // TODO Auto-generated method stub
                String valueStr = (String)value;
                System.out.println(valueStr);
                return Arrays.asList(valueStr.split(" "));
            }
        });
    }
    
    @StreamListener("streams-input2")
    public void streams2(KStream<?, String> input) {
        System.out.println("streams2");
        input.foreach( (key,value) -> System.out.println(value));
    }
}

위에서 보이듯이 sink processor라면 그 밑으로 더 이상 processor가 붙지 않으므로 void 반환타입으로 데이터를 입력받아 적절한 처리를 하면된다. 하지만 밑으로 스트림이 더 붙는 스트림들은 반환타입으로 KStream을 반환해야한다. 지금까지 간단하게 Kafka Streams API를 Spring Cloud Stream에서 사용해보았다. kafka streams DSL의 자세한 문법은 곧 포스팅할 카프카 스트림즈 API에 대한 글을 참고하시거나 카프카 레퍼런스를 참고하시면 될듯합니다. 



출처: https://coding-start.tistory.com/140?category=790331 [코딩스타트]