Apache Airflow Primer

--

A short summary/overview of Apache Airflow with code examples.

Photo by Christopher Burns on Unsplash

AirWho?

Apache Airflow is essentially a scheduling tool. One defines workflows that are used to trigger individual pieces of code you want to run on your machine/system. It also provides you with a set of CLI tools + a neat UI to monitor, trigger and debug processes.

You’d define several tasks you want to be executed in a simple Python script and assign them to a DAGDirected Acyclic Graph.

The codebase of Airflow is 100% Python, which makes it easy to understand, but it can be implemented in any language of course.

Let’s look at an example DAG:

# simple DAG
default_arguments = {
'owner': 'cl',
'email': 'love@coffee.com',
'start_date': datetime(2021, 3, 1),
'retries': 2
}
etl_dag = DAG(
dag_id="etl_pipeline",
default_args=default_arguments
)

And how to run it from the CLI

# airflow run <dag_id> <task_id> <start_date>
ariflow etl_pipeline transform_data 2021-03-01

DAGs

Directed

  • components in a DAG depend on previous components to be executed

Acyclic

  • DAGs do not repeat — they run through, get stuck or fail, but don’t loop

Graphs

  • A Graph is the actual set of components in the workflow

Operators

Operators represent a single task in Airflow that run independently and usually do not share info.

Let’s look at an example Operator:

RandomOperator(task_id='some_task_id', dag=some_dag)

Look Out!

  1. Sometimes Operators don’t run in the same environment.
  2. Operators also might require quite a few environment variables to be set in advance.
  3. Tasks that require elevated privileges might be difficult to run.

Some Operator Types

from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
# Bash
bash_task = BashOperator(
task_id='bash_example',
bash_command='echo "Ye doin\\' it!"',
dag=some_dag
)
# Python
def squared(x):
print(x**2)
python_op_kwargs = {
"x": 10
}
python_task = PythonOperator(
task_id='pyton_example',
python_callable=squared,
op_kwargs=python_op_kwargs,
dag=some_dag
)
# Email
email_task = EmailOperator(
task_id='email_example',
to='pear@son.com',
subject='Stuff you need to see',
html_content='<h1>you won\\'t believe this</h1>',
files='things.json',
dag=some_dag
)

Defining Workflows

Use the bitshift operator in Python to specify workflows in Airflow.

# assume we have defined task1 and task2 with Operators
task1 >> task2 # >> & << are bitshift operators and define up/downstream tasks
# also possible, up- & downstream in one line
task1 >> task3 << task2
# or a longer chain
task1 >> task2 >> task3 >> task4

Advanced Operators

These simple Operators are pretty straight forwards, however, there are also some more advanced types. Any scenario you can imagine, there is probably already an Operator for it — just check the Apache Docs. For example, we can add Docker support directly in Airflow or implement conditional logic within our Airflow scripts.

Here is an example for the latter:

from airflow.operators.python_operators import BranchPythonOperatordef branch_logic(**kwargs):
if kwargs["ds"] > "2020-12-31":
return "pre_2021_task"
else:
return "post_2020_task"
branch_task = BranchPythonOperator(
task_id='branch_example',
python_callable=branch_logic,
provide_context=True,
dag=some_dag
)
branch_task >> pre_2021_task
branch_task >> post_2020_task

Sensors

Sensors can run multiple times in a DAG and are based on the BashSensorOperator

You can, for example, ping an API, check the presence of a certain file in the system, or check if there is data available in a database before continuing.

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.sensors.file_sensor import FileSensor
import datetime as dt
report_dag = DAG(
dag_id='report',
schedule_interval="0 0 * * *"
)
precheck = FileSensor(
task_id='check_for_file',
filepath='necessary_file.csv',
start_date=dt.datetime(2020, 2, 20),
mode='reschedule',
dag=report_dag
)
generate_report_task = BashOperator(
task_id='generate_report',
bash_command='report.sh',
start_date=dt.datetime(2020, 2, 20),
dag=report_dag
)
precheck >> generate_report_task

Executors

These run your actual tasks.

The default is the SequentialExecutor that, you guessed it, runs one task after the other. This takes the longest and is useful for debugging, however, in production one usually uses something a bit more efficient.

LocalExecutor - runs on one single system and can start as many concurrent tasks as allowed. Also, treats each task as a process. It’s a neat choice for single production environments since it can make use of all the available resources.

CeleryExecutor - Celery is a general queuing system, like a backend task manager, written in Python. It can be used to run multiple instances of Airflow as worker systems that take care of different sets of tasks. This is a little harder to set up but can be scaled afterward to accommodate higher system loads or add specialized worker systems. Requirements are a working Celery config, as well as a method to exchange DAGs between the systems.

# check the set Executor by running in your CLI
airflow list_dags
# >>> ... {__init__.py:100} - Using executor XYZ

Debugging & Testing

The most common issues with Airflow are:

  1. DAGs that fail to run (when scheduled)
  2. DAGs that fail to load
  3. Syntactical errors

1 — If DAGs don’t run on a schedule you can:

  • check if the scheduler is running properly
  • in your CLI run airflow scheduler and see if that fixes things

Also, there might just not be enough resources to run more tasks. In this case, you can:

  • change the executor type
  • add resources to the system
  • add a worker system
  • or try to reschedule your DAGs

2 — If DAGs don’t load you can:

  • first, check what DAGs are available with airflow list_dags
  • check if your DAG is in the correct folder (inspect airflow.cfg to find the dag_folder)

3 — If there are syntactical issues you can:

  • run airflow list_dags and you'll get debugging info about why something might be broken
  • you can also run the Python script where your DAG is defined in the CLI and it will give you error messages if something’s broken

SLA & Reporting

A service-level agreement (SLA) is a commitment between a service provider and a client. Particular aspects of the service — quality, availability, responsibilities — are agreed between the service provider and the service user. — Wikipedia

In Airflow this concerns the run time of a task or DAG.

Every time we don’t make the cutoff time a log is created and an email is sent out to the responsible person. Also, SLA misses can be viewed in Airflow’s UI.

import datetime as dt# you can pass the `sla` parameter into your operators
generate_report_task = BashOperator(
task_id='generate_report',
bash_command='report.sh',
start_date=dt.datetime(2020, 2, 20),
sla=dt.timedelta(seconds=5),
dag=report_dag
)

For reporting purposes, there are also many other options one can use in Airflow.

Here are a few:

default_args = {
"start_date": dt.datetime(2020, 2, 20),
"sla": dt.timedelta(seconds=5),
"email": ["pear@son.com", "fix@this.com"]
"email_on_failure": True,
"email_on_retry": False,
"email_on_success": False
}
report_dag = DAG(
dag_id="report",
schedule_interval="0 0 * * *"
)

Templates

We can use the Jinja as a templating language to pass variables while keeping the rest of our infrastructure the same.

If you are unfamiliar with Jinja, you can get a quick overview here:

Python Jinja

Jinja — Jinja Documentation (2.11.x)

Here are two simple examples of how you can use Jinja in Airflow:

# this is how you use a simple expression in Jinja
# use the {{ ... }} double-curly braces with spacing,
# then put your reference inside
template_command = """
echo "generating report {{ params.file }} at {{ ds }}"
"""
generate_report_task = BashOperator(
task_id="generate_report_info",
bash_command=template_command,
params={"file": "ma_lit_report.pdf"},
dag=report_dag
)
# this is how you use a statement in Jinja
# use {% ... %}, then put your code inside
template_command = """
<% for file in params.file %>
echo "generating report {{ file }} with config: {{ conf }}";
<% endfor %>
"""
generate_report_task = BashOperator(
task_id="generate_report_info",
bash_command=template_command,
params={
"files": ["ma_lit_report.pdf", "even_better_report.pdf"]
},
dag=report_dag
)

In Airflow we get some pre-configured things to work within our templates.

For example, there will be built-in runtime variables like ds, ds_nodash, prev_ds, prev_ds_nodash, dag & conf (ds = datestamp)

Jinja also provides us with the macros option, which gives us additional features like more advanced timestamp options. You can check the additional resources for those.

Command the Airflow Line

# checkout how many DAGs are defined in your workspace
# also tells you about which executor is being used
airflow list_dags
# run Airflow GUI in your browser
airflow webserver -p 9090
# check airflow config
cat airflow/airflow.cfg
# run full dag
airflow tigger_dag -e <date> <dag_id>
# run specific task - you can also pass a `test` parameter
# when testing you can pass a `-1` instead of a real date
airflow [test] <dag_id> <task_id> <date>

Pipelines

And here is a small sample pipeline in which I use Airflow’s XComs functionality to pass values between tasks.

from airflow.models import DAG
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.email_operator import EmailOperator
from dags.process import scrape_coffee_data, process_coffee_data # custom code
import datetime as dt
import json
# Update the default arguments and apply them to the DAG.default_args = {
"start_date": dt.datetime(2021, 1, 1),
"sla": dt.timedelta(hours=1),
"email": ["me@me.com"]
"email_on_failure": True,
"email_on_retry": False,
"email_on_success": False
}

dag = DAG(dag_id="buy_coffee", default_args=default_args)
sense_coffee_supplies = FileSensor(
task_id="sense_coffee_supplies,
filepath="/home/repl/supplies_low.tmp",
poke_interval=60*60,
dag=dag
)
remove_tmp_files_task = BashOperator(
task_id="remove_tmp_files",
bash_command="rm -f /home/repl/*.tmp",
dag=dag
)
scrape_data_task = PythonOperator(
task_id="scrape_coffee_data",
python_callable=scrape_coffee_data,
provide_context=True,
dag=dag
)
process_data_task = PythonOperator(
task_id="process_coffee_data",
python_callable=process_coffee_data,
provide_context=True,
dag=dag
)
def load_json(**kwargs):
with open("./resources/available_coffees.json") as f:
available_coffees = json.load(f)
return available_coffeesload_coffee_task = PythonOperator(
task_id="load_coffee_data",
python_callable=load_json,
provide_context=True,
dag=dag
)
email_subject="""
<h1>
It's time to top off the coffee supplies
</h1>
<p>
Here is a list of the available coffees:
{{ task_instance.xcom_pull(task_ids='load_coffee_data').get('coffees') }}
</p>
<br>
This message was created on {{ ds }}
"""
email_reminder_task = EmailOperator(
task_id="email_reminder",
subject="BUY NEW COFFEE",
html_content=email_content,
dag=dag
)
chill_out_task = DummyOperator(task_id="chill_out", dag=dag)def check_sunday(**kwargs):
dt = dt.datetime.strptime(kwargs["execution_date"], "%Y-%m-%d")
if (dt.weekday() < 6):
return "email_reminder_task"
else:
return "chill_out_task"
branch_task = BranchPythonOperator(
task_id="check_if_sunday",
python_callable=check_sunday,
provide_context=True,
dag=dag
)

sense_coffee_supplies >> remove_tmp_files_task >> scrape_data_task
scrape_data_task >> process_data_task >> load_coffee_dataload_coffee_data >> branch_task >> [email_reminder_task, chill_out_task]

Resources

Apache Docs

Airflow XCOM : The Ultimate Guide — Marc Lamberti

Python Airflow — Return result from PythonOperator

DataCamp

👋 Join FAUN today and receive similar stories each week in your inbox! Get your weekly dose of the must-read tech stories, news, and tutorials.

Follow us on Twitter 🐦 and Facebook 👥 and Instagram 📷 and join our Facebook and Linkedin Groups 💬

If this post was helpful, please click the clap 👏 button below a few times to show your support for the author! ⬇

--

--