VPC 환경에서 이용 가능합니다.

목차

Cloud Data Streaming Service 사용하기

Cloud Data Streaming Service는 설치형 상품으로 별도의 이용 신청/해지가 없으며 클러스터 생성 시 계약이 이루어집니다.

클러스터가 삭제되면 계약이 종료됩니다.

Cloud Data Streaming Service 클러스터 생성하기

Cloud Data Streaming Service 메뉴에서 Cluster 생성 버튼을 클릭합니다.

Cloud Data Streaming Service 클러스터 설정 정보 입력하기

① 클러스터 이름을 입력합니다.

② Application 버전을 선택합니다.

③ Application 버전을 선택하는 화면입니다.

④ ACG 정보입니다.

  • Cloud Data Streaming Service에서 사용되는 ACG는 cdss-클러스터 코드로 자동으로 생성됩니다.

⑤ Kafka Broker Port입니다.

  • 9092으로 설정되며 변경할 수 없습니다.

⑥ Kafka Broker TLS Port입니다.

  • 9093으로 설정되며 변경할 수 없습니다.

⑦ Zookeeper Port입니다.

  • 2181으로 설정되며 변경할 수 없습니다.

⑧ CMAK Port입니다.

  • 9000으로 설정되며 변경할 수 없습니다.

⑨ CMAK에 접속하기 위한 ID를 입력합니다.

⑩ CMAK 접속용 Password를 입력합니다.

⑪ CMAK 접속용 Password를 한 번 더 입력합니다.

⑫ 노드 설정 정보를 입력하기 위해 다음을 클릭합니다.

Cloud Data Streaming Service 노드 설정하기

① OS를 선택합니다.

② VPC를 선택합니다.

③ 매니저 노드 서버의 Subnet을 선택합니다.

  • Public subnet을 선택할 수 있습니다.

④ 매니저 노드 서버 개수의 정보입니다.

  • 1로, 값을 변경할 수 없습니다.

⑤ 매니저 노드 서버 타입을 선택합니다.

  • 선택 가능한 서버 타입이 나타납니다.

⑥ Broker 노드 서버의 Subnet을 선택합니다.

  • Private subnet을 선택할 수 있습니다.

⑦ Broker 노드 개수를 입력합니다.

  • 최소 3대부터 최대 10대까지 생성이 가능합니다.
  • 기본 설정은 3대입니다.

⑧ Broker 노드 서버 타입을 선택합니다.

  • 선택 가능한 서버 타입이 나타납니다.

⑨ Broker 노드 스토리지 용량입니다.

  • OS 스토리지가 아닌 별도의 BlockStorage를 사용합니다.
  • BlockStorage는 계정당 네이버 클라우드 플랫폼 전체 서비스에서 최대 2000GB까지 사용 가능합니다.

⑩ 최종 정보를 확인하기 위해 다음을 클릭합니다.

입력 정보 최종 확인하기

① 입력된 정보가 맞는지 확인합니다.

② 클러스터 생성을 위해 클러스터 생성 버튼을 클릭합니다.

Cloud Data Streaming Service 클러스터 생성 확인

① Cloud Data Streaming Service 클러스터 생성이 시작되었습니다.

  • 생성 시간은 수 분에서 수십 분 정도 소요될 수 있습니다.

Cloud Data Streaming Service 클러스터 관리하기

생성된 Cloud Data Streaming Service의 삭제, 노드 추가, CMAK 접속, CMAK 접속 패스워드 초기화, CMAK 접속 도메 설정 변경, 서비스 재시작 기능입니다.

생성된 클러스터 확인

① 클러스터 생성이 완료되면 클러스터 요약 정보가 나타나며 해당 클러스터를 선택하면 하단의 상세 정보가 나타납니다.

② 클러스터의 상세 정보 정보를 확인할 수 있습니다.

  • ACG 정보와 CMAK 바로 가기, 암호화 통신에 사용 될 인증서를 다운받을 수 있습니다.
  • 클러스터가 선택되면 상단의 메뉴 삭제/클러스터 관리/클러스터 재시작 메뉴가 활성화됩니다.

③ 선택 클러스터 삭제 메뉴입니다.

④ 클러스터 관리 메뉴입니다

⑤ 선택 클러스터 재시작 메뉴입니다.

클러스터 삭제 메뉴

① 클러스터를 선택 후 삭제 메뉴를 선택 하면 선택된 클러스터 리스트를 확인하고, 최종 삭제를 진행 할지 여부를 선택합니다.

② 실제 클러스터를 삭제 할지 여부를 선택합니다.

클러스터 관리 메뉴

① 클러스터를 선택 후 클러스터 관리 메뉴를 선택 시 브로커 노드 추가, CMAK 접속, CMAK 접속 패스워드 초기화, CMAK 접속 도메인 설정 변경 메뉴가 나타납니다.

브로커 노드 추가

브로커 노드 추가 메뉴를 선택하면 노드를 추가할 수 있는 화면이 나타납니다.

② 최초 현재 클러스터의 브로커 노드 수가 나타나며 최대 10대까지 증설을 선택할 수 있습니다.

  • 예: 추가할 노드 수 1 선택 시 기존 3대에 1대의 브로커 노드가 추가됩니다.

③ 브로커 노드 증설을 실행합니다.

① 노드 증설이 시작되면 서버 생성이 진행되며 서버 상태가 변경중으로 표시됩니다.

CMAK 접속

① 바로가기 버튼을 통해 CMAK에 접속할 수 있습니다.

  • CMAK 접속 버튼을 클릭하기 위해서는 CMAK 접속 도메인 설정 변경을 통해 Public 도메인을 활성화해야 합니다.

CMAK 접속 패스워드 초기화

① CMAK 접속용 Password를 잃어 버렸을 경우에 Password를 초기화합니다.

② 새로운 Password를 입력 후 확인을 선택하면 새로운 Password로 CMAK에 접속할 수 있습니다. 적용 시간은 수 분가량 소요될 수 있습니다.

CMAK 접속 도메인 설정 변경

① CMAK 접속 도메인 설정을 변경합니다.

② 현재, Public 도메인이 비활성화 상태일 경우, Public 도메인을 활성화 할 수 있습니다.

③ 현재, Public 도메인이 활성화 상태일 경우, Public 도메인을 비활성화 할 수 있습니다.

  • 접속 도메인을 비활성화 할 경우, 외부에서의 CMAK 접속이 차단됩니다.

클러스터 재시작

① 모든 서비스(CMAK, Kafka & Zookeeper), Kafka & Zookeeper, CMAK 재시작을 할 수 있는 메뉴입니다.

  • 서비스 재시작 시, 데이터 유실, 서비스 지연이 발생할 수 있니다.

모든 서비스 재시작

① 모든 서비스(Kafka & Zookeeper & CMAK)를 재시작합니다.

Kafka and Zookeeper 재시작

① Kafka & Zookeeper를 재시작합니다.

CMAK 재시작

① CMAK를 재시작합니다.

Cloud Data Streaming Service 활용하기

Cloud Data Streaming Service를 활용하는 방법을 알아봅니다.

CMAK에 접속하여, Topic을 만들고, Broker 노드들의 정보를 확인하고, Cloud Data Streaming Service Cluster에 데이터를 저장하고 조회합니다.

CMAK 접속하기

CMAK에 접속하기 위해선 Public 도메인을 활성화 해야 합니다.

① 클러스터 관리 -> CMAK 접속 도메인 설정 변경 버튼을 클릭합니다.

② "확인" 버튼을 눌러 Public 도메인을 활성화합니다.

CMAK 바로가기 버튼을 눌러 CMAK에 접속합니다.

① 클러스터 생성 과정에서 입력했던 ID와 Password를 입력합니다.

① 생성한 클러스터에 대한 정보를 확인할 수 있습니다.

① Topic에 대한 정보를 확인할 수 있습니다.

② 브로커 노드에 대한 정보를 확인할 수 있습니다.

  • Topic에 대한 정보

  • 브로커 노드에 대한 정보

Topic 만들기

① Create 버튼을 눌러 Topic을 만듭니다.

① Topic의 이름을 입력합니다.

② Topic의 Partitions를 입력합니다.

③ Topic의 Replication Factor를 입력합니다.

④ 그 외 Topic의 설정들을 입력합니다.

⑤ Create 버튼을 눌러 Topic을 생성합니다.

생성된 Topic 확인 하가

① Topic의 정보를 확인할 수 있습니다.

② Topic 삭제, Partition 추가, 재분배, 설정 변경 등의 조작을 수행할 수 있습니다.

③ 각 브로커 노드에 대한 생성된 Partition 정보를 확인할 수 있습니다.

④ 각 Partition의 Leader와 Replication이 어떤 브로커 노드에 위치하는지 확인할 수 있습니다.

Cloud Data Streaming Service 사용

위와 같은 구조로 Producer VM, Consumer VM을 생성하여 앞에서 생성한 Topic에 대해 데이터를 저장, 조회를 해보겠습니다.

Producer VM, Consumer VM 생성 및 세팅하기

  • CentOS 7.3을 기준으로 가이드를 작성하였습니다.

① Cloud Data Streaming Service 클러스터를 생성한 VPC와 동일한 VPC를 선택합니다.

② VM에 접속, 설치를 위해 Public Subnet을 선택합니다.

  • 해당 VM에 공인 IP를 할당해야 접속할 수 있습니다.

Apache Kafka, Java, Python을 활용해 데이터를 전송, 저장해보겠습니다.

Java 설치

yum install java -y

Apache Kafka 설치

wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz
# 압축을 풀어줍니다.
tar -zxvf kafka_2.12-2.4.0.tgz

브로커 노드 정보 확인하기

① Broker 노드 정보 - 상세 보기 버튼을 클릭합니다.

① 브로커 노드와 암호화 없이 통신을 하는데 사용됩니다.

② 브로커 노드와 암호화 통신에 사용됩니다.

③ 암호화 통신에 이용되는 hosts 파일을 수정하는데 사용됩니다.

브로커 노드의 ACG를 수정합니다.

① 프로토콜을 선택합니다.

② 접근 소스 정보에 Producer VM, Consumer VM의 비공인 IP를 입력합니다.

③ 허용할 포트를 등록합니다.

  • 9092-9093을 입력합니다.

④ ACG에 대한 메모를 등록합니다.

추가를 클릭하면 아래 리스트에 입력한 정보가 보입니다.

입력한 규칙을 한번 더 확인한 후 적용을 클릭하면 해당 규칙이 ACG에 적용됩니다.

Cloud Data Streaming Service에 데이터 전송 및 조회하기

Apache Kafka 활용하여 데이터 전송 및 조회하기

앞에서 생성, 세팅한 Producer VM에 접속하여 아래 명령어를 실행합니다.

cd kafka_2.12-2.4.0
./bin/kafka-console-producer.sh --broker-list [broker.list] --topic [topic]
# [broker.list]에 앞에서 확인했던 브로커 노드 정보의 PlainText 복사본을 입력합니다.
# [topic]에 CMAK에서 생성한 Topic을 입력합니다.
# 예시) ./bin/kafka-console-producer.sh --broker-list 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092 --topic test

전송하고 싶은 메시지 입력하면 브로커 노드들에 해당 메시지들이 저장됩니다.

  • 종료를 원할 경우, ctrl + c를 입력합니다.

앞에서 생성, 세팅한 Consumer VM에 접속하여 아래 명령어를 실행합니다.

cd kafka_2.12-2.4.0
./bin/kafka-console-consumer.sh --bootstrap-server [bootstrap.server] --topic [topic] --from-beginning
# [bootstrap.server]에 앞에서 확인했던 브로커 노드 정보의 PlainText 복사본을 입력합니다.
# [topic]에 앞 단계 Producer VM에서 입력한 Topic을 입력합니다.
# 예시) ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092 --topic test --from-beginning
  • --from-beginning 명령어 사용 시, 해당 Topic에 대한 데이터를 처음부터 모두 조회합니다.
  • --from-beginning 명령어를 사용하지 않을 시, 데이터를 조회한 순간부터 입력되는 데이터에 대해서만 조회합니다.

Java 활용하여 데이터 전송 및 조회하기

Java Application을 활용하여 데이터를 전송합니다.

  • IntelliJ IDEA를 사용하여 진행하였습니다.

File -> New -> Project 후, Maven을 선택해 Project를 만듭니다.

pom.xml 파일 수정하기
  • 프로젝트 종속성, Java 버전 및 패키징 메서드를 정의하는 pom.xml 파일을 수정합니다.
  • 사용자 환경에 따라 pom.xml 파일이 상이할 수 있습니다.

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <packaging>jar</packaging>
     <groupId>org.example</groupId>
     <artifactId>maventest</artifactId>
     <version>1.0-SNAPSHOT</version>
     <url>http://maven.apache.org</url>
    
     <dependencies>
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
             <!-- Apache Kafka version in Cloud Data Streaming Service -->
             <version>2.4.0</version>
         </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-simple</artifactId>
             <version>1.7.21</version>
         </dependency>
     </dependencies>
    
     <build>
         <plugins>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-compiler-plugin</artifactId>
                 <version>3.3</version>
                 <configuration>
                     <source>1.8</source>
                     <target>1.8</target>
                 </configuration>
             </plugin>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
                 <version>2.3</version>
                 <configuration>
                     <transformers>
                         <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
                         <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                             <mainClass>KafkaMain</mainClass>
                         </transformer>
                     </transformers>
                 </configuration>
    
                 <executions>
                     <execution>
                         <phase>package</phase>
                         <goals>
                             <goal>shade</goal>
                         </goals>
                     </execution>
                 </executions>
             </plugin>
         </plugins>
     </build>
    </project>
    
KafkaMain.java 작성하기
  • Java Application 실행 시, argument로 produce/consume 여부, Topic, Broker lists를 전달합니다.

    public class KafkaMain {
     public static void main(String[] args) throws IOException {
         String topicName = args[1];
         String brokers = args[2];
    
         switch(args[0]){
             case "produce":
                 Producer.produce(brokers, topicName);
                 break;
             case "consume":
                 Consumer.consume(brokers, topicName);
                 break;
             default:
                 System.out.println("Wrong arguments");
                 break;
         }
         System.exit(0);
     }
    }
    
Producer.java 작성하기
  • 예시로 0~99의 숫자를 전송하였습니다.

    public class Producer {
     public static void produce(String brokers, String topicName) throws IOException {
         // Create Producer
         KafkaProducer<String, String> producer;
         // Configure
         Properties properties = new Properties();
         properties.setProperty("bootstrap.servers", brokers);
         properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
         properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
    
         producer = new KafkaProducer<>(properties);
    
         for(int i=0;i<100;i++){
             ProducerRecord record = new ProducerRecord<String, String>(topicName, Integer.toString(i));
             producer.send(record);
         }
         producer.close();
     }
    }
    
Consumer.java 작성하기
public class Consumer {
    public static int consume(String brokers, String topicName) {
        // Create Consumer
        KafkaConsumer<String, String> consumer;
        // Configure
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", brokers);
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("group.id", "consumer_group");
        properties.setProperty("auto.offset.reset", "earliest");

        consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Arrays.asList(topicName));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(500); // wait for 500ms
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
                System.out.println(record.value());
            }
        }
    }
}

작성한 Java code를 git에 저장 후, VM에서 git clone 명령어를 통해 code를 다운받습니다.

git clone 자신의 git repository

해당 Java Application을 빌드하기 위해선 Maven 설치가 필요합니다.

yum install maven -y

다운 받은 Java code 폴더로 이동 후, jar 파일을 빌드합니다.

cd kafkatest
mvn clean package

jar 파일을 빌드하고 나면, target 폴더와 target 폴더 내에 jar 파일이 생성됩니다. target 폴더로 이동 후, jar 파일을 실행합니다.

cd target

# 데이터 전송하기
java -jar kafkatest-1.0-SNAPSHOT.jar produce [topic] [broker.list]
# [topic]에 CMAK에서 만든 Topic을 입력합니다.
# [broker.list]에 앞에서 확인했던 브로커 노드 정보의 PlainText 복사본을 입력합니다.
예시) java -jar kafkatest-1.0-SNAPSHOT.jar produce test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092


# 데이터 조회하기
java -jar kafkatest-1.0-SNAPSHOT.jar consume [topic] [broker.list]
# [topic]에 CMAK에서 만든 Topic을 입력합니다.
# [broker.list]에 앞에서 확인했던 브로커 노드 정보의 PlainText 복사본을 입력합니다.
예시) java -jar kafkatest-1.0-SNAPSHOT.jar consume test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092

Python 활용하여 데이터 전송 및 조회하기

  • Python 2.7.5 버전에서 진행하였습니다.
  • Python에서 Kafka를 활용하기 위해선 kafka-python package 설치가 필요합니다.
    # pip 설치
    curl -LO https://bootstrap.pypa.io/get-pip.py 
    python get-pip.py
    # kafka-python package 설치
    pip install kafka-python
    

KafkaMain.py 작성하기

import sys
from kafka import KafkaProducer, KafkaConsumer
from json import dumps
import time


def produce(topicName, brokerLists):
        producer = KafkaProducer(bootstrap_servers=brokerLists,
                         value_serializer=lambda x:
                         dumps(x).encode('utf-8'))
        for i in range(100):
                producer.send(topicName, i)

def consume(topicName, brokerLists):
        consumer = KafkaConsumer(topicName, bootstrap_servers=brokerLists,
                        group_id="test")

        for msg in consumer:
               print(msg)


action=sys.argv[1]
topicName=sys.argv[2]
brokerLists=sys.argv[3].split(',')

if action == 'produce':
    produce(topicName, brokerLists)
elif action == 'consume':
    consume(topicName, brokerLists)
else:
    print('wrong arguments')

KafkaMain.py 파일을 실행하여 produce 하기

  • 예시로 0~99의 숫자를 전송하였습니다.
    python KafkaMain.py produce [topic] [broker.list]
    # [topic]에 CMAK에서 만든 Topic을 입력합니다.
    # [broker.list]에 앞에서 확인했던 브로커 노드 정보의 PlainText 복사본을 입력합니다.
    # 예시) python KafkaMain.py produce test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092
    

KafkaMain.py 파일을 실행하여 consume 하기

python KafkaMain.py consume [topic] [broker.list]
# [topic]에 CMAK에서 만든 Topic을 입력합니다.
# [broker.list]에 앞에서 확인했던 브로커 노드 정보의 PlainText 복사본을 입력합니다.
# 예시) python KafkaMain.py consume test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092

통신 구간 암호화

Apache Kafka 클라이언트와 Apache Kafka 브로커 노드 간에 통신 구간을 암호화하는 방법입니다.

전체적인 프로세스 요약은 다음과 같습니다(① ~ ③의 과정은, 클러스터 생성 시 자동으로 실행됩니다).

① 매니저 노드에서 자체 인증서를 만듭니다.

② 각 브로커 노드에서, 인증서를 생성하고, 인증서 서명 요청을 만듭니다.

③ ②에서 만든 인증서 서명 요청을 매니저 노드가 서명합니다.

④ 클라이언트에서는 ①에서 만든 인증서를 다운로드하고, 인증서에 대한 정보를 갖고 있는 truststore를 생성합니다.

⑤ 암호화 통신에 대한 설정 파일을 작성합니다.

⑥ hosts 파일 수정하기

인증서 다운로드

① 다운로드 버튼을 클릭하여 인증서를 다운로드합니다.

① 확인 버튼을 눌러 인증서를 다운로드합니다.

다운받은 인증서 파일을 Producer, Consumer VM으로 복사합니다.

  • Producer, Consumer VM의 /root 경로에 ca-cert라는 이름으로 저장합니다.

Truststore 생성하기

keytool -keystore kafka.client.truststore.jks -alias mytruststore -import -file ca-cert

① Enter keystore password: password를 입력합니다.

② Re-enter new password: password를 재입력합니다.

③ Trust this certificate? [no]: yes를 입력합니다.

④ ls 명령어를 통해 kafka.client.truststore.jks 파일이 생성된 것을 확인할 수 있습니다.

암호화 설정 파일 작성하기

# /root/kafka_2.12-2.4.0 폴더에 암호화 설정 파일을 생성합니다.
cd kafka_2.12-2.4.0
vi client-auth.properties
security.protocol=SSL
ssl.truststore.location=/root/kafka.client.truststore.jks
ssl.truststore.password=[password]

위의 내용을 입력한 client-auth.properties 파일을 작성합니다.

  • [password]에는 kafka.client.truststore.jks 생성 시 입력했던 password를 입력합니다.

hosts 파일 수정하기

암호화 통신을 하기 위해선 hosts 파일(/etc/hosts)을 수정해야 합니다. /etc/hosts 파일은, 리눅스에서 DNS 서버보다 먼저 호스트명을 IP로 변환하여 주는 파일입니다.

vi /etc/hosts

/etc/hosts 파일에 앞에서 확인했던 브로커 노드 정보의 hosts 파일 정보 복사본을 추가합니다.

예시)

127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.2.24 yeakafka2-d-2am
192.168.2.25 yeakafka2-d-2an
192.168.2.26 yeakafka2-d-2ao

Cloud Data Streaming Service에 통신 구간 암호화하여 데이터 전송하기

Producer VM에서 아래의 명령어를 실행합니다.

./bin/kafka-console-producer.sh --broker-list [broker.list] --topic [topic] --producer.config client-auth.properties
# [broker.list]에 앞에서 확인했던 브로커 노드 정보의 TLS 복사본을 입력합니다.
# [topic]에 CMAK에서 생성한 Topic을 입력합니다.
# 예시) ./bin/kafka-console-producer.sh --broker-list yeakafka2-d-2am:9093,yeakafka2-d-2an:9093,yeakafka2-d-2ao:9093 --topic test --producer.config client-auth.properties

전송하고 싶은 메시지 입력하면 브로커 노드들에 해당 메시지들이 저장됩니다.

  • 종료를 원할 경우, ctrl + c를 입력합니다.

Cloud Data Streaming Service에 통신 구간 암호화하여 저장된 데이터 조회하기

Consumer VM에서 아래의 명령어를 실행합니다.

cd kafka_2.12-2.4.0
./bin/kafka-console-consumer.sh --bootstrap-server [bootstrap.server] --topic [topic] --consumer.config client-auth.properties
# [bootstrap.server]에 앞에서 확인했던 브로커 노드 정보의 TLS 복사본을 입력합니다.
# [topic]에 앞 단계 Producer VM에서 입력한 Topic을 입력합니다.
# 예시) ./bin/kafka-console-consumer.sh --bootstrap-server yeakafka2-d-2am:9093,yeakafka2-d-2an:9093,yeakafka2-d-2ao:9093 --topic test --consumer.config client-auth.properties

""에 대한 건이 검색되었습니다.

    ""에 대한 검색 결과가 없습니다.

    처리중...