Skip to content

Authoring DAGs

A DAG (directed acyclic graph) is a workflow: a set of tasks plus the dependency edges between them. kiok lets you author the same DAG three ways — YAML, Python code, or Java code — and all three compile to one internal definition, the DagSpec.

Three authoring formats, one model

flowchart LR
  Y["YAML file"] --> C
  P["Python code<br/>(kiok SDK)"] --> C
  J["Java code<br/>(kiok SDK)"] --> C
  C["DagSpec"] --> S["metadata store<br/>(KMS-encrypted RocksDB)"]

Whichever format you choose, kiok validates the result (no cycles, no dangling requires, valid cron, non-empty task bodies) and stores the compiled DagSpec. The scheduler, the worker driver, and the admin UI all work off that single representation — they never see the original YAML/Python/Java.

YAML

The most direct format. A dag: header and a tasks: list:

dag:
  id: daily_etl
  schedule: "0 2 * * *"   # optional 5-field cron; omit for manual-only
  catchup: false
tasks:
  - id: extract
    type: shell
    script: |
      #!/bin/bash
      echo extracting
  - id: load
    type: shell
    requires: [extract]
    script: |
      #!/bin/bash
      echo loading

Python

For a workflow whose shape is computed rather than hand-written, author it in Python with the kiok Python SDK. Every module-level Dag instance becomes a registered DAG:

from kiok import Dag

REGIONS = ["us-east", "us-west", "eu-central", "ap-south"]

dag = Dag("regional_etl", schedule="0 3 * * *")
dag.task("setup", script="#!/bin/bash\necho prepare")

# fan-out: one branch per region — a loop, not copy-pasted blocks
transforms = []
for r in REGIONS:
    key = r.replace("-", "_")
    dag.task("extract_" + key, requires=["setup"],
             script="#!/bin/bash\necho pull %s" % r)
    dag.task("transform_" + key, task_type="python", requires=["extract_" + key],
             script="print('normalize %s')" % r)
    transforms.append("transform_" + key)

# fan-in: the dependency list is computed
dag.task("merge", requires=transforms, script="#!/bin/bash\necho merge")

kiok compiles a Python DAG by running python3 -m kiok.compile <file>, which imports the module and emits each Dag as a DagSpec.

Java

For type-safe authoring with IDE support and compile-time checking, implement the KiokDag interface with the kiok Java SDK. kiok loads every KiokDag class it finds in a bundle jar and calls define():

public class DailyEtlDag implements KiokDag {
    @Override
    public Dag define() {
        Dag dag = new Dag("daily_etl").schedule("0 2 * * *");
        dag.task("extract")
           .shell("#!/bin/bash\necho extracting");
        dag.task("load").requires("extract")
           .shell("#!/bin/bash\necho loading");
        return dag;
    }
}

The Dag and Task builders are fluent, and a loop over a list builds a fan-out graph exactly as the Python example does.

Why code over YAML

YAML is declarative and perfect for a fixed, hand-sized workflow. Code wins when the DAG is generated:

  • Loops — produce N parallel branches from a list instead of copy-pasting N YAML blocks.
  • Computed dependencies — a fan-in task's requires list is built from data, so adding an upstream needs no edit at the fan-in.
  • Abstraction — a helper function/method defines a task group once and reuses it.
  • Type safety (Java) — a typo'd task id or dependency fails at compile time, before the DAG reaches the cluster.

DAG identity

Every DAG has a human-facing name (the id you write — need not be unique) and a globally-unique id derived from where it came from:

  • manual → the name itself.
  • git<repo>/<source path> slugified.
  • bundle<bundle name>/<source path> slugified.

So two DAGs may share a name as long as they originate from different repos or bundles. Run history is keyed by the unique id, so re-pushing the same DAG file continues its existing history.

Getting DAGs into the cluster

  • Manual — register a single YAML/Python/Java DAG via the admin UI or submit.sh register.
  • Git Sync — the leader pulls DAGs from git repositories on an interval. See Git Sync.
  • Bundles — upload a zip of DAG definitions for air-gapped clusters. See DAG Bundles.