Apache Airflow Primer
A short summary/overview of Apache Airflow with code examples.
AirWho?
Apache Airflow is essentially a scheduling tool. One defines workflows that are used to trigger individual pieces of code you want to run on your machine/system. It also provides you with a set of CLI tools + a neat UI to monitor, trigger and debug processes.
You’d define several tasks you want to be executed in a simple Python script and assign them to a DAG — Directed Acyclic Graph.
The codebase of Airflow is 100% Python, which makes it easy to understand, but it can be implemented in any language of course.
Let’s look at an example DAG:
# simple DAG
default_arguments = {
'owner': 'cl',
'email': 'love@coffee.com',
'start_date': datetime(2021, 3, 1),
'retries': 2
}etl_dag = DAG(
dag_id="etl_pipeline",
default_args=default_arguments
)
And how to run it from the CLI
# airflow run <dag_id> <task_id> <start_date>
ariflow etl_pipeline transform_data 2021-03-01
DAGs
Directed
- components in a DAG depend on previous components to be executed
Acyclic
- DAGs do not repeat — they run through, get stuck or fail, but don’t loop
Graphs
- A Graph is the actual set of components in the workflow
Operators
Operators represent a single task in Airflow that run independently and usually do not share info.
Let’s look at an example Operator:
RandomOperator(task_id='some_task_id', dag=some_dag)
Look Out!
- Sometimes Operators don’t run in the same environment.
- Operators also might require quite a few environment variables to be set in advance.
- Tasks that require elevated privileges might be difficult to run.
Some Operator Types
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator# Bash
bash_task = BashOperator(
task_id='bash_example',
bash_command='echo "Ye doin\\' it!"',
dag=some_dag
)# Python
def squared(x):
print(x**2)python_op_kwargs = {
"x": 10
}python_task = PythonOperator(
task_id='pyton_example',
python_callable=squared,
op_kwargs=python_op_kwargs,
dag=some_dag
email_task = EmailOperator(
task_id='email_example',
to='pear@son.com',
subject='Stuff you need to see',
html_content='<h1>you won\\'t believe this</h1>',
files='things.json',
dag=some_dag
)
Defining Workflows
Use the bitshift operator in Python to specify workflows in Airflow.
# assume we have defined task1 and task2 with Operators
task1 >> task2 # >> & << are bitshift operators and define up/downstream tasks# also possible, up- & downstream in one line
task1 >> task3 << task2# or a longer chain
task1 >> task2 >> task3 >> task4
Advanced Operators
These simple Operators are pretty straight forwards, however, there are also some more advanced types. Any scenario you can imagine, there is probably already an Operator for it — just check the Apache Docs. For example, we can add Docker support directly in Airflow or implement conditional logic within our Airflow scripts.
Here is an example for the latter:
from airflow.operators.python_operators import BranchPythonOperatordef branch_logic(**kwargs):
if kwargs["ds"] > "2020-12-31":
return "pre_2021_task"
else:
return "post_2020_task"branch_task = BranchPythonOperator(
task_id='branch_example',
python_callable=branch_logic,
provide_context=True,
dag=some_dag
)branch_task >> pre_2021_task
branch_task >> post_2020_task
Sensors
Sensors can run multiple times in a DAG and are based on the BashSensorOperator
You can, for example, ping an API, check the presence of a certain file in the system, or check if there is data available in a database before continuing.
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.sensors.file_sensor import FileSensor
import datetime as dtreport_dag = DAG(
dag_id='report',
schedule_interval="0 0 * * *"
)precheck = FileSensor(
task_id='check_for_file',
filepath='necessary_file.csv',
start_date=dt.datetime(2020, 2, 20),
mode='reschedule',
dag=report_dag
)generate_report_task = BashOperator(
task_id='generate_report',
bash_command='report.sh',
start_date=dt.datetime(2020, 2, 20),
dag=report_dag
)precheck >> generate_report_task
Executors
These run your actual tasks.
The default is the SequentialExecutor
that, you guessed it, runs one task after the other. This takes the longest and is useful for debugging, however, in production one usually uses something a bit more efficient.
LocalExecutor
- runs on one single system and can start as many concurrent tasks as allowed. Also, treats each task as a process. It’s a neat choice for single production environments since it can make use of all the available resources.
CeleryExecutor
- Celery is a general queuing system, like a backend task manager, written in Python. It can be used to run multiple instances of Airflow as worker systems that take care of different sets of tasks. This is a little harder to set up but can be scaled afterward to accommodate higher system loads or add specialized worker systems. Requirements are a working Celery config, as well as a method to exchange DAGs between the systems.
# check the set Executor by running in your CLI
airflow list_dags
# >>> ... {__init__.py:100} - Using executor XYZ
Debugging & Testing
The most common issues with Airflow are:
- DAGs that fail to run (when scheduled)
- DAGs that fail to load
- Syntactical errors
1 — If DAGs don’t run on a schedule you can:
- check if the scheduler is running properly
- in your CLI run
airflow scheduler
and see if that fixes things
Also, there might just not be enough resources to run more tasks. In this case, you can:
- change the executor type
- add resources to the system
- add a worker system
- or try to reschedule your DAGs
2 — If DAGs don’t load you can:
- first, check what DAGs are available with
airflow list_dags
- check if your DAG is in the correct folder (inspect
airflow.cfg
to find thedag_folder
)
3 — If there are syntactical issues you can:
- run
airflow list_dags
and you'll get debugging info about why something might be broken - you can also run the Python script where your DAG is defined in the CLI and it will give you error messages if something’s broken
SLA & Reporting
A service-level agreement (SLA) is a commitment between a service provider and a client. Particular aspects of the service — quality, availability, responsibilities — are agreed between the service provider and the service user. — Wikipedia
In Airflow this concerns the run time of a task or DAG.
Every time we don’t make the cutoff time a log is created and an email is sent out to the responsible person. Also, SLA misses can be viewed in Airflow’s UI.
import datetime as dt# you can pass the `sla` parameter into your operators
generate_report_task = BashOperator(
task_id='generate_report',
bash_command='report.sh',
start_date=dt.datetime(2020, 2, 20),
sla=dt.timedelta(seconds=5),
dag=report_dag
)
For reporting purposes, there are also many other options one can use in Airflow.
Here are a few:
default_args = {
"start_date": dt.datetime(2020, 2, 20),
"sla": dt.timedelta(seconds=5),
"email": ["pear@son.com", "fix@this.com"]
"email_on_failure": True,
"email_on_retry": False,
"email_on_success": False
}report_dag = DAG(
dag_id="report",
schedule_interval="0 0 * * *"
)
Templates
We can use the Jinja as a templating language to pass variables while keeping the rest of our infrastructure the same.
If you are unfamiliar with Jinja, you can get a quick overview here:
Jinja — Jinja Documentation (2.11.x)
Here are two simple examples of how you can use Jinja in Airflow:
# this is how you use a simple expression in Jinja
# use the {{ ... }} double-curly braces with spacing,
# then put your reference inside
template_command = """
echo "generating report {{ params.file }} at {{ ds }}"
"""generate_report_task = BashOperator(
task_id="generate_report_info",
bash_command=template_command,
params={"file": "ma_lit_report.pdf"},
dag=report_dag
)# this is how you use a statement in Jinja
# use {% ... %}, then put your code inside
template_command = """
<% for file in params.file %>
echo "generating report {{ file }} with config: {{ conf }}";
<% endfor %>
"""generate_report_task = BashOperator(
task_id="generate_report_info",
bash_command=template_command,
params={
"files": ["ma_lit_report.pdf", "even_better_report.pdf"]
},
dag=report_dag
)
In Airflow we get some pre-configured things to work within our templates.
For example, there will be built-in runtime variables like ds
, ds_nodash
, prev_ds
, prev_ds_nodash
, dag
& conf
(ds = datestamp)
Jinja also provides us with the macros
option, which gives us additional features like more advanced timestamp options. You can check the additional resources for those.
Command the Airflow Line
# checkout how many DAGs are defined in your workspace
# also tells you about which executor is being used
airflow list_dags# run Airflow GUI in your browser
airflow webserver -p 9090# check airflow config
cat airflow/airflow.cfg# run full dag
airflow tigger_dag -e <date> <dag_id># run specific task - you can also pass a `test` parameter
# when testing you can pass a `-1` instead of a real date
airflow [test] <dag_id> <task_id> <date>
Pipelines
And here is a small sample pipeline in which I use Airflow’s XComs functionality to pass values between tasks.
from airflow.models import DAG
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.email_operator import EmailOperator
from dags.process import scrape_coffee_data, process_coffee_data # custom code
import datetime as dt
import json# Update the default arguments and apply them to the DAG.default_args = {
"start_date": dt.datetime(2021, 1, 1),
"sla": dt.timedelta(hours=1),
"email": ["me@me.com"]
"email_on_failure": True,
"email_on_retry": False,
"email_on_success": False
}
dag = DAG(dag_id="buy_coffee", default_args=default_args)sense_coffee_supplies = FileSensor(
task_id="sense_coffee_supplies,
filepath="/home/repl/supplies_low.tmp",
poke_interval=60*60,
dag=dag
)remove_tmp_files_task = BashOperator(
task_id="remove_tmp_files",
bash_command="rm -f /home/repl/*.tmp",
dag=dag
)scrape_data_task = PythonOperator(
task_id="scrape_coffee_data",
python_callable=scrape_coffee_data,
provide_context=True,
dag=dag
)process_data_task = PythonOperator(
task_id="process_coffee_data",
python_callable=process_coffee_data,
provide_context=True,
dag=dag
)def load_json(**kwargs):
with open("./resources/available_coffees.json") as f:
available_coffees = json.load(f) return available_coffeesload_coffee_task = PythonOperator(
task_id="load_coffee_data",
python_callable=load_json,
provide_context=True,
dag=dag
)email_subject="""
<h1>
It's time to top off the coffee supplies
</h1>
<p>
Here is a list of the available coffees:
{{ task_instance.xcom_pull(task_ids='load_coffee_data').get('coffees') }}
</p><br>
This message was created on {{ ds }}
"""email_reminder_task = EmailOperator(
task_id="email_reminder",
subject="BUY NEW COFFEE",
html_content=email_content,
dag=dag
)chill_out_task = DummyOperator(task_id="chill_out", dag=dag)def check_sunday(**kwargs):
dt = dt.datetime.strptime(kwargs["execution_date"], "%Y-%m-%d")
if (dt.weekday() < 6):
return "email_reminder_task"
else:
return "chill_out_task"branch_task = BranchPythonOperator(
task_id="check_if_sunday",
python_callable=check_sunday,
provide_context=True,
dag=dag
)
sense_coffee_supplies >> remove_tmp_files_task >> scrape_data_taskscrape_data_task >> process_data_task >> load_coffee_dataload_coffee_data >> branch_task >> [email_reminder_task, chill_out_task]
Resources
Airflow XCOM : The Ultimate Guide — Marc Lamberti
Python Airflow — Return result from PythonOperator
👋 Join FAUN today and receive similar stories each week in your inbox! ️ Get your weekly dose of the must-read tech stories, news, and tutorials.
Follow us on Twitter 🐦 and Facebook 👥 and Instagram 📷 and join our Facebook and Linkedin Groups 💬