Workflow with airflow

Airflow is an open source project started at Airbnb. It is a tool to orchestrate the desire flow of your application dynamically which is readily scalable to infinity because of it modular architecture and message queuing mechanism.

It can be also understood as advance cron application which executes the tasks when their dependencies are fulfilled. And can even retry the task, if failed, for a certain number of time configured for it.

This is how a airflow tasks pipeline looks like:

sd.png

In the above example each block represents task and some of the task are connected to other tasks reflecting their dependencies and relationship.

Let’s say you need to develop and application which helps your customer find some common products available online at some selected e-commerce platform and generate the report then send them. For this purpose you can design a workflow where one task is designed to collect data from e-commerce platform, another task to categorise the data based on their type and so on.

These tasks are created in a python file called DAG(Directed Acyclic Graph) file. A DAG can have arbitrary number of tasks. And one DAG represents a single logical workflow.

DAG example:

    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from datetime import datetime, timedelta

    args = {
        'owner': 'owner',
        'start_date': datetime.today()
    }

    dag = DAG(
        dag_id='common_products',
        default_args=args,
        schedule_interval=timedelta(1)
    )

    init = BashOperator(
        task_id='init',
        bash_command='python /opt/jobs/init.py',
        dag=dag
    )

    data_config = BashOperator(
        task_id='data_config',
        bash_command='python /opt/jobs/data_config.py',
        dag=dag
    )

    platform_a = BashOperator(
        task_id='platform_a',
        bash_command='python /opt/jobs/collect_data.py platform_a',
        dag=dag
    )

    platform_b = BashOperator(
        task_id='platform_b',
        bash_command='python /opt/jobs/collect_data.py platform_b',
        dag=dag
    )

    platform_c = BashOperator(
        task_id='platform_c',
        bash_command='python /opt/jobs/collect_data.py platform_c',
        dag=dag
    )

    platform_d = BashOperator(
        task_id='platform_d',
        bash_command='python /opt/jobs/collect_data.py platform_d',
        dag=dag
    )

    categorise_data = BashOperator(
        task_id='categorise_data',
        bash_command='python /opt/jobs/categorise_data.py',
        dag=dag
    )

    find_most_common = BashOperator(
        task_id='find_most_common',
        bash_command='python /opt/jobs/find_most_common.py',
        dag=dag
    )

    compare_price = BashOperator(
        task_id='compare_price',
        bash_command='python /opt/jobs/compare_price.py',
        dag=dag
    )

    generate_report = BashOperator(
        task_id='generate_report',
        bash_command='/usr/bin/python /opt/jobs/generate_report.py',
        dag=dag
    )

    # setup the logical flow beetween each tasks
    data_config.set_upstream(init)
    platform_a.set_upstream(data_config)
    platform_b.set_upstream(data_config)
    platform_c.set_upstream(data_config)
    platform_d.set_upstream(data_config)
    categorise_data.set_upstream(platform_a)
    categorise_data.set_upstream(platform_b)
    categorise_data.set_upstream(platform_c)
    categorise_data.set_upstream(platform_d)
    find_most_common.set_upstream(categorise_data)
    compare_price.set_upstream(find_most_common)
    generate_report.set_upstream(compare_price)

DAG file can be saved in airflow default dag directory ~/airflow/dags. In the dag configuration line schedule_interval=timedelta(1) will tell airflow scheduler to execute this flow once everyday.

This is how this DAG will look like.

Screen Shot 2017-08-24 at 10.49.42 PM.png

Airflow has very elegant interface to monitor the workflow and see the log for individual task, really nice.

This is a very basic flow about how airflow can be used. Potentially it can be utilised to design any kind of workflow regardless of its complexity.

 
5
Kudos
 
5
Kudos

Now read this

Javascript promises

Most of the processes in javascript are asynchronous–meaning they are handled independently from the main program and run in background. If you have coded an AJAX call, probably have used promises as well. Promises–an object acts as... Continue →