Change Data Capture Using Chango CDC
Chango CDC is Change Data Capture application to catch CDC data of database and send CDC data to Chango.
Build and Package Chango CDC
Embedded Debezium is used by Chango CDC
. So you need to choose which Debezium version will be used for your database, see the release notice of Debezium.
After that, choose the concrete maven dependency version of Debezium connector.
For example, maven dependency version of Debezium connector for PostgreSQL can be found here.
Package Chango CDC
distribution with Debezium maven dependency version. Please note that Java 11 and Maven 3 are required to build Chango CDC
.
git clone -b branch-1.1.3 https://github.com/cloudcheflabs/chango-cdc.git
cd chango-cdc;
export CHANGO_CDC_VERSION=1.1.3
export DEBEZIUM_VERSION=1.9.7.Final
./package-dist.sh \
--version=${CHANGO_CDC_VERSION} \
--debezium.version=${DEBEZIUM_VERSION} \
;
Install Chango CDC
You may use Chango CDC
package which was built for yourself previously.
For this example, the pre-built Chango CDC
will be used. Download Chango CDC.
curl -L -O https://github.com/cloudcheflabs/chango-cdc/releases/download/1.1.3/chango-cdc-1.1.3-debezium-1.9.7.Final-linux-x64.tar.gz
Untar and move to Chango CDC directory.
tar zxvf chango-cdc-1.1.3-debezium-1.9.7.Final-linux-x64.tar.gz
cd chango-cdc-1.1.3-debezium-1.9.7.Final-linux-x64/
Configure Chango CDC
Modify conf/configuation.yml
. For example, it looks like this.
chango:
token: eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiJjZGM0MTFkYzIzMzg2NjU0ZGZkYTdjYjk4OTMzNjA1NWNiNyIsImV4cCI6MTcwNjY1OTE5OSwiaWF0IjoxNzAxMzU2NDEyfQ.-WjO6mpNV5QM5t1jwLmBD8tBuRNOxrzcREU6RqLJtHGD0u_TGi28NWG9lFYA-ZKQ-nDwGbr6Nf_MXaUeeO2VAw
dataApiUrl: http://chango-private-1.chango.private:80
schema: cdc_db
table: student
batchSize: 10000
interval: 1000
tx: true
debezium:
connector: |-
name=postgres-connector
connector.class=io.debezium.connector.postgresql.PostgresConnector
offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
offset.storage.file.filename=/tmp/chango-cdc/offset-student.dat
offset.flush.interval.ms=60000
topic.prefix=cdc
schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory
schema.history.internal.file.filename=/tmp/chango-cdc/schemahistory-student.dat
database.server.name=postgresql-server
database.server.id=pg-1
database.hostname=localhost
database.port=5432
database.user=anyuser
database.password=anypassword
database.dbname=studentdb
table.include.list=public.student
chango.token
: Chango Credential which is necessary to accessChango Data API
. See Get Chango Credential.chango.dataApiUrl
:Chango Data API
endpoint URL.chango.schema
: Schema of Iceberg catalog in Chango.chango.table
: Iceberg Table in Chango.
In this example, we will use PostgreSQL from which CDC data will be caught and sent to Chango.
debezium.connector
: Properties of Debezium connector for PostgreSQL.
If you want to use another database, consult Debezium Connectors.
NOTE: If
chango.tx
is set totrue
, then you need to install transactional chango streaming beforehand.
Install PostgreSQL
PostgreSQL will be installed as docker container.
Assumed that docker and docker compose are installed on your machine, create docker compose file.
cat <<EOF > docker-compose.yml
version: "3.5"
services:
postgres:
container_name: postgres
image: debezium/postgres:9.6
ports:
- 5432:5432
environment:
- POSTGRES_DB=studentdb
- POSTGRES_USER=anyuser
- POSTGRES_PASSWORD=anypassword
EOF
Run docker compose.
And then, enter the docker container of PostgreSQL to create a table.
Connect PostgreSQL database in it.
Create table student
.
CREATE TABLE public.student
(
id integer NOT NULL,
address character varying(255),
email character varying(255),
name character varying(255),
CONSTRAINT student_pkey PRIMARY KEY (id)
);
Create Iceberg Table
You need to create Iceberg table in Chango using hive clients like Superset
which connects to Chango Spark Thrift Server
.
-- create iceberg schema.
CREATE SCHEMA IF NOT EXISTS iceberg.cdc_db;
-- create iceberg table.
CREATE TABLE iceberg.cdc_db.student (
address string,
email string,
id long,
name string,
op string,
ts TIMESTAMP_LTZ
)
USING iceberg
;
-- add hidden partitions.
ALTER TABLE iceberg.cdc_db.student ADD PARTITION FIELD day(ts);
In addition to the original fields of PostgreSQL table, fields ts
and op
are required for partitioning and small files compaction.
If fields ts
and op
exist in the original PostgreSQL table, then _
will be appended to the original fields of PostgreSQL table.
Take a note that the type of field id
in original PostgreSQL table is integer
, but the type of field id
in Iceberg table in Chango is long
.
NOTE: The sequence of table column names in lower case must be alphanumeric in ascending order.
Run Chango CDC
NOTE: Before running Chango CDC, make sure that Chango Data API and Chango Streaming Tx are installed in Chango.
Move to Chango CDC directory, and run Chango CDC.
You can check the log file /tmp/chango-cdc/chango-cdc.log
.
If you want to stop Chango CDC, run the following.
Run PostgreSQL CUD Queries
In PostgreSQL docker container, run the CUD queries.
INSERT INTO STUDENT(ID, NAME, ADDRESS, EMAIL) VALUES('1','Kidong Lee','Seoul','kidong@example.com');
UPDATE STUDENT SET EMAIL='kidong2@example.com', NAME='Kidong2 Lee' WHERE ID = 1;
DELETE FROM STUDENT WHERE ID = 1;
Check if CDC data has been saved in Iceberg table in Chango with running the following query in Superset
.