본문 바로가기

IT 기술/코드샘플

debezium kafka

1. kafka설치

wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz


2. kafka 설정


주키퍼
 /usr/local/src/kafka_2.13-3.6.0/bin/zookeeper-server-start.sh -daemon /usr/local/src/kafka_2.13-3.6.0/config/zookeeper.properties

브로커
 /usr/local/src/kafka_2.13-3.6.0/bin/kafka-server-start.sh -daemon /usr/local/src/kafka_2.13-3.6.0/config/server.properties

export KAFKA_HEAP_OPTS="-Xmx200m -Xms200m" 메모리설정


3. 커넥터 설치(mysql ver)

tar xvf debezium-connector-mysql-2.4.0.Final-plugin.tar.gz
vi config/connect-distributed.properties 에 최하단부 참조할 디렉토리 경로로 설정

4. 타겟db의 my.cnf에 binlog 활성화

log_bin=C:/Program Files/MariaDB 10.7/data/mysql-bin.log
max_binlog_size=100M
binlog_format=ROW



5. 커넥터에 post요청으로 커넥터 생성

curl --location --request POST 'http://localhost:8083/connectors' --header 'Content-Type: application/json' --data-raw '{
  "name": "source-test-connectossa1",
  "config": {
    "connector.class": "io.debeziuhttp://m.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "192.168.0.216",
    "database.port": "3306",
    "database.user": "id",
    "database.password": "password",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.allowPublicKeyRetrieval": "true",
    "database.include.list": "dbname",
    "topic.prefix":"db_",
    "database.history.kafka.bootstrap.servers": "localhost:9092",
    "database.history.kafka.topic": "dbhistory.testdb",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true",
    "transforms": "unwrap,addTopicPrefix",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.addTopicPrefix.regex":"(.*)",
    "transforms.addTopicPrefix.replacement":"$1",
     "schema.history.internal.kafka.topic":"testoks",
     "schema.history.internal.kafka.bootstrap.servers":"localhost:9092"
  }
}'



토픽조회
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [토픽명] --from-beginning


토픽리스트
kafka-topics.sh --bootstrap-server localhost:9092 --list


'IT 기술 > 코드샘플' 카테고리의 다른 글

Debezium 커넥터 샘플(1)  (0) 2024.01.24
debezium 커넥터 설정  (0) 2023.11.28
AWS KMS in JAVA  (0) 2022.09.19
AWS JAVA 1.8 설치  (0) 2022.02.28
AWS 초기세팅(계정생성, 개인키공개키 로그인)  (0) 2022.02.25