Skip to content

NimTechnology

Trình bày các công nghệ CLOUD một cách dễ hiểu.

  • Kubernetes & Container
    • Docker
    • Kubernetes
      • Ingress
    • Helm Chart
    • Isito-EnvoyFilter
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Vault
    • Longhorn – Storage
    • VictoriaMetrics
    • MetalLB
    • Kong Gateway
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Spinnaker
    • Jenkins
    • Harbor
    • TeamCity
    • Git
      • Bitbucket
  • Coding
    • Terraform
      • GCP – Google Cloud
      • AWS – Amazon Web Service
    • Golang
    • Laravel
    • Python
    • Jquery & JavaScript
    • Selenium
  • Log & Monitor
    • DataDog
    • Prometheus
    • Grafana
    • ELK
      • Kibana
      • Logstash
  • BareMetal
    • NextCloud
  • Toggle search form

[Kafka-connect] Single Message Transform: lesson 9 – Cast

Posted on February 27, 2022February 27, 2022 By nim No Comments on [Kafka-connect] Single Message Transform: lesson 9 – Cast

Bài này chúng ta sẽ chỉ custom type data khi nào source connector

Contents

  • 1) Setup the components.
  • 2) Practice.

1) Setup the components.

Phần setup kafka, bạn có thể coi lại link này he:
Setup the needing components of Kafka

Bạn nhớ delele container cũ và run lại docker-compose nhé:
docker-compose down -v
docker-compose up -d

2) Practice.

reference link:
https://github.com/confluentinc/demo-scene/blob/master/kafka-connect-single-message-transforms/day9.adoc

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-voluble-datagen-day9-00/config \
    -d '{
        "connector.class"                             : "io.mdrogalis.voluble.VolubleSourceConnector",
        "genkp.day9-transactions.with"                : "#{Internet.uuid}",
        "genv.day9-transactions.cost.with"            : "#{Commerce.price}",
        "genv.day9-transactions.units.with"            : "#{number.number_between '\''1'\'','\''99'\''}",
        "genv.day9-transactions.txn_date.with"        : "#{date.past '\''10'\'','\''DAYS'\''}",
        "genv.day9-transactions.card_type.with"       : "#{Business.creditCardType}",
        "genv.day9-transactions.customer_remarks.with": "#{BackToTheFuture.quote}",
        "genv.day9-transactions.item.with"            : "#{Beer.name}",
        "topic.day9-transactions.throttle.ms"         : 1000
    }'
>>>>>
curl -s "http://localhost:8081/subjects/day9-transactions-value/versions/latest" | jq '.schema|fromjson[]'
>>>>>

Bạn thấy kết quả

chúng ta thấy trong schema định dạng data type đểu là string
Nếu chúng ta tạo 1 sink connector
type của các column đều là text.

[Kafka-connect] Single Message Transform: lesson 8 – TimestampConverter
Như ở bài 8 thì chúng ta custom data type trong quá trình sink data. Nghĩa schema type vẫn dữ nguyên.

Giờ chúng ta chúng ta custom type từ source connector luôn.
Khi đó type lưu trong schema đã được change.

Now we will use the Cast Single Message Transform to cast the two fields currently held as string to their correct numeric types.

Here we’ll apply the fix to the source connector. Note that we’re using a new target topic (day9-01) because otherwise you’ll quite rightly get an error (io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema for subject "day9-transactions-value"; error code: 409).

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-voluble-datagen-day9-01/config \
    -d '{
        "connector.class"                                : "io.mdrogalis.voluble.VolubleSourceConnector",
        "genkp.day9-01-transactions.with"                : "#{Internet.uuid}",
        "genv.day9-01-transactions.cost.with"            : "#{Commerce.price}",
        "genv.day9-01-transactions.units.with"           : "#{number.number_between '\''1'\'','\''99'\''}",
        "genv.day9-01-transactions.txn_date.with"        : "#{date.past '\''10'\'','\''DAYS'\''}",
        "genv.day9-01-transactions.card_type.with"       : "#{Business.creditCardType}",
        "genv.day9-01-transactions.customer_remarks.with": "#{BackToTheFuture.quote}",
        "genv.day9-01-transactions.item.with"            : "#{Beer.name}",
        "topic.day9-01-transactions.throttle.ms"         : 1000,
        "transforms"                                     : "castTypes",
        "transforms.castTypes.type"                      : "org.apache.kafka.connect.transforms.Cast$Value",
        "transforms.castTypes.spec"                      : "cost:float32,units:int16"
        }'
curl -s "http://localhost:8081/subjects/day9-01-transactions-value/versions/latest" | jq '.schema|fromjson[]'
bạn thấy là type trong schema đã được thay đổi.

Giờ tạo sink connector:

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day9-01/config \
    -d '{
          "connector.class"    : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"     : "jdbc:mysql://mysql:3306/demo",
          "connection.user"    : "mysqluser",
          "connection.password": "mysqlpw",
          "topics"             : "day9-01-transactions",
          "tasks.max"          : "4",
          "auto.create"        : "true",
          "auto.evolve"        : "true"}'

Giờ kiêm trả database:

docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
describe `day9-01-transactions`;
OK như này là ngon rồi ha
Apache Kafka, Kafka Connect

Post navigation

Previous Post: [Kafka-connect] Single Message Transform: lesson 8 – TimestampConverter – convert the type of the fields from string to timestamp or date,…
Next Post: [Kafka-connect] Single Message Transform: lesson 10 – ReplaceField – U can drop, keep and renamed field in kafka message.

More Related Articles

[Kafka-connect] Streaming the data of MySQL throughs Kafka-connect and Debezium plugin. Apache Kafka
[Kafka-connect] Solving the error or issues on Kafka-connect Kafka Connect
[Kafka-Connect] Overwrite value of Key on Mongo Source Connector – Mongodb Kafka Connect
[Kafka-connect] Single Message Transform: lesson 8 – TimestampConverter – convert the type of the fields from string to timestamp or date,… Apache Kafka
[Kafka-connect] Single Message Transform: lesson 7 – TimeStampRouter and MessageTimestampRouter – Custom format topic name with timestamp Apache Kafka
[Kafka-connect] Streaming the data of Postgresql through Kafka-connect and Debezium plugin. Apache Kafka

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Tham Gia Group DevOps nhé!
Để Nim có nhiều động lực ra nhiều bài viết.
Để nhận được những thông báo mới nhất.

Recent Posts

  • Experiences for IP Addresses Shortage on EKS Clusters March 29, 2023
  • [Talisman] Discover the sensitive information in your code. March 28, 2023
  • [Prometheus/Grafana] Install Prometheus and Grafana on ubuntu. March 27, 2023
  • [Kong Gateway] WebSocket connection failed March 26, 2023
  • [Nextcloud] Can’t download files to have a size bigger than 2Gi on NextCloud – RaspBerry March 24, 2023

Archives

  • March 2023
  • February 2023
  • January 2023
  • December 2022
  • November 2022
  • October 2022
  • September 2022
  • August 2022
  • July 2022
  • June 2022
  • May 2022
  • April 2022
  • March 2022
  • February 2022
  • January 2022
  • December 2021
  • November 2021
  • October 2021
  • September 2021
  • August 2021
  • July 2021
  • June 2021

Categories

  • BareMetal
    • NextCloud
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Git
      • Bitbucket
    • Harbor
    • Jenkins
    • Spinnaker
    • TeamCity
  • Coding
    • Golang
    • Jquery & JavaScript
    • Laravel
    • Python
    • Selenium
    • Terraform
      • AWS – Amazon Web Service
      • GCP – Google Cloud
  • Kubernetes & Container
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Docker
    • Helm Chart
    • Isito-EnvoyFilter
    • Kong Gateway
    • Kubernetes
      • Ingress
    • Longhorn – Storage
    • MetalLB
    • Vault
    • VictoriaMetrics
  • Log & Monitor
    • DataDog
    • ELK
      • Kibana
      • Logstash
    • Grafana
    • Prometheus
  • Uncategorized
  • Admin

Copyright © 2023 NimTechnology.