Second Brain: Crafted, Curated, Connected, Compounded on 10月02日 21:10
Apache Airflow:工作流的调度与监控
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

Apache Airflow是一个开源平台,用于开发、调度和监控批处理工作流。它以Python代码定义工作流,支持动态生成、扩展和参数化,并提供Web界面进行管理。Airflow通过丰富的调度和执行语义,支持复杂管道的定义、回填以及错误后的部分重跑,从而提高效率。尽管Airflow主要面向任务和工作流管理,而非数据资产,但其“工作流即代码”的理念使其成为DevOps和数据团队管理批处理任务的有力工具。最近,Airflow也开始支持声明式方法,如Airflow DAG Factory Pattern。

⭐ Airflow的核心在于其“工作流即代码”的理念,所有工作流都用Python定义。这带来了动态生成、可扩展性以及通过Jinja模板引擎实现灵活的参数化。这种方式使得工作流可以被版本控制、多人协作开发,并编写测试,从而提高了开发效率和代码质量。

🚀 Airflow擅长批处理工作流的调度和监控,通过Python的Operator(如PythonOperator、DockerOperator、KubernetesPodOperator)可以连接各种技术栈。其丰富的调度功能包括定时执行、回填历史数据以及在解决错误后重跑部分任务,极大地提升了工作流的执行效率和可靠性。

⚠️ 尽管Airflow功能强大,但它主要围绕任务(jobs)而非数据资产(Data Assets)进行编排,这可能导致数据处理的复杂性。与专注于数据资产的编排工具(如Dagster)相比,Airflow的命令式(imperative)而非声明式(declarative)方法可能不适合所有场景。然而,Airflow已开始引入声明式方法,如DAG Factory Pattern。

💡 在实际应用中,需要注意避免常见的错误,例如将DAG文件夹与主Airflow基础设施仓库耦合,导致修改DAG需要重启Airflow;以及不当的日志文件夹配置可能导致性能问题。确保DAG支持回填且为可扩展性做准备,是长期维护和高效运行的关键。

As outlined in Orchestrators, the scheduling and monitoring of workflows stand as pivotal choices. While there are numerous Orchestrators, Airflow emerges as the most prevalent and acclaimed.

Traditionally, ETL tools such as Microsoft SQL Server Integration Services (SSIS) dominated the scene, serving as hubs for data transformation and cleaning, as well as Normalization processes.

However, contemporary architectures demand more. The value of code and data transformation logic now extends beyond their immediate functional use, proving essential to other data-informed individuals within an organization.

I highly recommend delving into Maxime Beauchemin’s piece on Functional Data Engineering — a modern paradigm for batch data processing for a deeper understanding of modern data pipelines.

# What is Apache Airflow?

Apache Airflow™ is an open-source platform for developing, scheduling, and monitoring batch-oriented workflows.

Airflow’s extensible Python framework enables you to build workflows connecting with virtually any technology. A web interface helps manage the state of your workflows. Airflow is deployable in many ways, varying from a single process on your laptop to a distributed setup to support even the biggest workflows.

Apache Airflow Core, which includes webserver, scheduler, CLI and other components that are needed for minimal Airflow installation. Read the documentation »

# Paradigms: Workflows as code

The main characteristic of Airflow workflows is that all workflows are defined in Python code. “Workflows as code” serves several purposes:

    Dynamic: Airflow pipelines are configured as Python code, allowing for dynamic pipeline generation.Extensible: The Airflow™ framework contains operators to connect with numerous technologies. All Airflow components are extensible to easily adjust to your environment.Flexible: Workflow parameterization is built-in leveraging the Jinja templating engine.

# Why Airflow?

Airflow is a batch workflow orchestration platform. Unlike others such as Dagster, Kestra, which focus on data-aware orchestration, Airflow is mainly to manage tasks and workflows.

The Airflow framework contains operators to connect with many technologies and is easily extensible to connect with a new technology. If your workflows have a clear start and end, and run at regular intervals, they can be programmed as an Airflow DAG.

If you prefer coding over clicking, Airflow is the tool for you. Workflows are defined as Python code which means:

    Workflows can be stored in version control so that you can roll back to previous versionsWorkflows can be developed by multiple people simultaneouslyTests can be written to validate functionalityComponents are extensible and you can build on a wide collection of existing components

Rich scheduling and execution semantics enable you to easily define complex pipelines, running at regular intervals. Backfilling allows you to (re-)run pipelines on historical data after making changes to your logic. And the ability to rerun partial pipelines after resolving an error helps maximize efficiency.

Read more Airflow Documentation .

# Why not Airflow

Because the data we work with are persited Data Assets and the only component of the data stack that does not work with assets, is Airflow. It orients around jobs. In another words, it’s imperative instead of declarative.


From Data Council talk by dagster | The Data Engineering Impedance Mismatch | Dagster Blog

A better way of working that workflows is with the assets in mind. Check out Dagster for more.

# Update on Declarative

Since 2024-08-13, Airflow also support a declarative approach with the Airflow DAG Factory Pattern.

# We are all using Airflow wrong

# Airflow Commands

To obtain the current executors (Supported: LocalExecutor, CeleryExecutor, KubernetesExecutor, LocalKubernetesExecutor, CeleryKubernetesExecutor):

1
airflow config get-value core executor

# Airflow Parameterization

see load_type running the test.py. Also, see that we use Jinja Template here with {{params.load_type}}:

 1 2 3 4 5 6 7 8 910111213141516171819202122232425
with DAG(    dag_id="sleep_hosp",    schedule="@once",    description="developing and testing",    tags=[workspace_name],    params={        "load_type": "init"    }) as dag:    sleep = KubernetesPodOperator(        **common_k8s_pod_operator_params,        name="sleep_hosp",        task_id="sleep_hosp",        # arguments=[        #     "python dags-logic/test.py --setup={{ params.load_type }} && echo 'setup successfully'"        # ],        arguments=[            "python dags-logic/test.py --setup={{params.load_type}} && echo 'setup successfully'" ],        )    (        sleep    )

In test.py we can then work with the parameter:

 1 2 3 4 5 6 7 8 910111213141516
import argparsedef setup_lzn_tables(param):    print(f"Print params: {param}")if __name__ == "__main__":    parser = argparse.ArgumentParser(        description="Run specific functions in the script."    )    parser.add_argument("--setup", required=True)    args = parser.parse_args()    if args.setup:        setup_lzn_tables(args.setup)

Warning

The window with entering the paramters only comes after you click on the trigger DAGj

More on Params — Airflow Documentation.

# Template Reference

From Templates reference:

Variable Type Description
{{ params }} dict[str, Any] The user-defined params. This can be overridden by the mapping

passed to trigger_dag -c if dag_run_conf_overrides_params

is enabled in airflow.cfg.

# Airflow Operators

# PythonOperator

The Airflow PythonOperator is optimal when the business logic and code are housed within the Airflow DAG directory. The PythonOperator facilitates the import and execution of these components.

123456789
airflow    \__dags        \_classification_workflow.py        \_ tweet_classification            \_preprocess.py            \_predict.py            \_ __init__.py    \__logs    \__airflow.cfg

# Pros

    Ideal when the code is in the same repository as Airflow.User-friendly and straightforward.Efficient for smaller teams.

# Cons

    Tightly couples Airflow code with business logic.Changes in business logic necessitate Airflow code redeployment.Sharing a single Airflow instance across multiple projects becomes challenging.Limited to Python code.

# DockerOperator

Caution Advised

The DockerOperator is becoming obsolete. It’s recommended to opt for the . As highlighted in this StackOverflow discussion, “The real answer is to use the KubernetesPodOperator. DockerOperator will soon lose its functionality with the phasing out of dockershim.”

The DockerOperator in Airflow manages business logic and code within a Docker image. Upon execution:

    Airflow fetches the designated image.Initiates a container.Executes the given command.Requires an active Docker daemon.
12345678
DockerOperator(    dag=dag,    task_id='docker_task',    image='gs://project-predict/predict-api:v1',    auto_remove=True,    docker_url='unix://var/run/docker.sock',    command='python extract_from_api_or_something.py')

# Pros

    Effective for cross-functional teams.Compatible with non-Python projects.Ideal for Docker-centric infrastructures.

# Cons

    Requires Docker on the worker machine.High resource demand on the worker machine when running multiple containers.

# KubernetesPodOperator

The KubernetesPodOperator places business logic and code within a Docker image. During execution,

Airflow initiates a worker pod, which then retrieves and executes commands from the specified Docker image.

123456
KubernetesPodOperator(        task_id='classify_tweets',        name='classify_tweets',        cmds=['python', 'app/classify.py'],        namespace='airflow',        image='gcr.io/tweet_classifier/dev:0.0.1')

# Pros

    Facilitates collaboration across different functional teams.Enables sharing a single Airflow instance across various teams without complications.Decouples DAGs from business logic.

# Cons

Presents complexity in infrastructure due to its reliance on Docker and Kubernetes.

# Common Mistakes

Here are frequent errors observed in DevOps and Data teams when implementing Airflow:

    DAG Folder Location: Often, the DAG folder is part of the main Airflow infrastructure repository. This necessitates a full Airflow restart for any DAG modification, potentially causing job failures and inconvenience. Ideally, your DAG folder should be located separately, as configured in your Airflow.cfg.Local Log Folder Configuration: A common oversight I encountered around 2015 or 2016 was configuring the log folder locally. In my case, this resulted in an Ec2 instance crash due to log overload after six months. This issue persists in some setups today.Non-Backfillable DAGs: One of Airflow’s advantages is its ability to rerun failed past DAGs without disrupting the current data. Ensuring your DAGs support easy maintenance and backfilling is crucial.Lack of Scalability Preparation: Initially, as you deploy a few DAGs, you might not notice scalability issues. However, as the number of DAGs increases (20, 30, 100+), you’ll observe longer wait times in the scheduling phase. While adjusting configurations in your airflow.cfg might help, involving DevOps for scalability solutions might become necessary.

For a comprehensive discussion, refer to Mistakes I Have Seen When Data Teams Deploy Airflow.

Read more on GitHub - jghoman/awesome-apache-airflow: Curated list of resources about Apache Airflow.


Origin:
References: Data Orchestrators Dagster Apache Airflow Wiki Why using Airflow Features of Airflow, OLAP
Created:

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Apache Airflow 工作流编排 数据管道 DevOps Python 调度 监控 Batch Processing Workflow Orchestration Data Pipelines Scheduling Monitoring
相关文章