Data Tranformation Using Chango Query Exec
Chango Query Exec
is a REST application to execute trino and spark SQL ETL queries requested through REST call to transform
data in Chango.
Chango Query Exec
is simpler way to transform data in Chango. You just send trino and spark SQL queries for ETL jobs to Chango Query Exec
explicitly through REST using, for example curl
.
Such ETL jobs using Chango Query Exec
can be integrated with Workflow like Azkaban
with ease.
Date Functions
There are date functions supported by Chango Query Exec
.
nowInMillis()
Current Time in Millis.
CREATE TABLE IF NOT EXISTS iceberg.iceberg_db.ctas
AS
SELECT
*
FROM anycatalog.anyschema.anytable
WHERE ts < #{ nowInMillis() }
;
nowFormatted(String format)
Current date with date format.
CREATE TABLE IF NOT EXISTS iceberg.iceberg_db.ctas
AS
SELECT
*
FROM anycatalog.anyschema.anytable
WHERE dateAt < '#{ nowFormatted("YYYY-MM-dd") }'
;
nowPlusInMillis(int years, int months, int days, int hours, int minutes, int weeks)
Add time amount to current date.
CREATE TABLE IF NOT EXISTS iceberg.iceberg_db.ctas
AS
SELECT
*
FROM anycatalog.anyschema.anytable
WHERE where ts < #{ nowPlusInMillis(0, 0, 1, 0, 0, 0) }
;
nowPlusFormatted(int years, int months, int days, int hours, int minutes, int weeks, String format)
Formatted plus date time.
CREATE TABLE IF NOT EXISTS iceberg.iceberg_db.ctas
AS
SELECT
*
FROM anycatalog.anyschema.anytable
WHERE where dateAt >= '#{ nowPlusFormatted(0, 0, 0, 0, 0, 0, "YYYY-MM-dd") }' and dateAt < '#{ nowPlusFormatted(0, 0, 1, 0, 0, 0, "YYYY-MM-dd") }'
;
nowMinusInMillis(int years, int months, int days, int hours, int minutes, int weeks)
Substract time amount from current time.
CREATE TABLE IF NOT EXISTS iceberg.iceberg_db.ctas
AS
SELECT
*
FROM anycatalog.anyschema.anytable
WHERE where ts >= #{ nowMinusInMillis(0, 0, 1, 0, 0, 0) }
;
nowMinusFormatted(int years, int months, int days, int hours, int minutes, int weeks, String format)
Formatted minus date time.
CREATE TABLE IF NOT EXISTS iceberg.iceberg_db.ctas
AS
SELECT
*
FROM anycatalog.anyschema.anytable
WHERE where dateAt >= '#{ nowMinusFormatted(0, 0, 1, 0, 0, 0, "YYYY-MM-dd") }' and dateAt < '#{ nowMinusFormatted(0, 0, 0, 0, 0, 0, "YYYY-MM-dd") }'
;
Send Simple ETL Query
This is a simple ETL query file called exec-queries.sql
.
-- create schema.
CREATE SCHEMA IF NOT EXISTS iceberg.iceberg_db;
-- ctas.
CREATE TABLE IF NOT EXISTS iceberg.iceberg_db.metrics
AS
SELECT
*
FROM postgresql.public.metrics
where format_datetime(ts, 'YYYY-MM-dd HH:mm:ss.SSS') < '#{ nowMinusFormatted(0, 0, 0, 0, 10, 0, "YYYY-MM-dd HH:mm:ss.SSS") }'
limit 10000
;
It will create iceberg schema and CTAS table.
Take a look at the function of nowMinusFormatted(0, 0, 0, 0, 10, 0, "YYYY-MM-dd HH:mm:ss.SSS")
which will be replaced with the date before 10 minutes of current time by Chango Query Exec
.
Send this simple trino query to Chango Query Exec
using curl
.
export ACCESS_TOKEN=<access-token>
curl -XPOST -H "Authorization: Bearer $ACCESS_TOKEN" \
http://<chango-query-exec-endpoint>/v1/trino/exec-query \
-d "uri=<trino-gateway-endpoint>" \
-d "user=<trino-user>" \
-d "password=<trino-password>" \
-d "query=$(base64 -w 0 ./exec-queries.sql)" \
;
<access-token>
: Chango Credential. See Get Chango Credential.<trino-gateway-endpoint>
: Chango Trino Gateway Endpoint without https scheme.<trino-user>
: Trino User.<trino-password>
: Trino Password.<chango-query-exec-endpoint>
: Chango Query Exec Endpoint without http scheme.
Send Query Flow
You may send DAG like query flow to Chango Query Exec
.
Let's create query flow file exec-flow.yaml
.
uri: <trino-gateway-endpoint>
user: <trino-user>
password: <trino-password>
queries:
- id: query-0
description: |-
Query 0 description
depends: NONE
query: |-
-- drop table.
DROP TABLE iceberg.iceberg_db.metrics
- id: query-1
description: |-
Query 1 description
depends: query-0
query: |-
-- create schema.
CREATE SCHEMA IF NOT EXISTS iceberg.iceberg_db;
- id: query-2
description: |-
Query 2 description
depends: query-0
query: |-
-- create schema.
CREATE SCHEMA IF NOT EXISTS iceberg.iceberg_db;
- id: query-3
description: |-
Query 3 description
depends: query-1,query-2
query: |-
-- ctas.
CREATE TABLE IF NOT EXISTS iceberg.iceberg_db.metrics
AS
SELECT
*
FROM postgresql.public.metrics
where format_datetime(ts, 'YYYY-MM-dd HH:mm:ss.SSS') < '#{ nowMinusFormatted(0, 0, 0, 0, 10, 0, "YYYY-MM-dd HH:mm:ss.SSS") }'
limit 10000
;
- id: query-4
description: |-
Query 4 description
depends: query-3
query: |-
-- create schema.
CREATE SCHEMA IF NOT EXISTS iceberg.iceberg_db;
-- ctas.
CREATE TABLE IF NOT EXISTS iceberg.iceberg_db.metrics
AS
SELECT
*
FROM postgresql.public.metrics
where format_datetime(ts, 'YYYY-MM-dd HH:mm:ss.SSS') < '#{ nowMinusFormatted(0, 0, 0, 0, 50, 0, "YYYY-MM-dd HH:mm:ss.SSS") }'
limit 10000
;
This is a query flow example. You don't have to see the details of the individual queries.
You need define unique ids for queries[*].id
, and you may add dependencies with queries[*].depends
.
Send query flow to Chango Query Exec
.