Python script scheduling in airflow

Each Answer to this Q is separated by one/two green lines.

Hi everyone,

I need to schedule my python files(which contains data extraction from sql and some joins) using airflow. I have successfully installed airflow into my linux server and webserver of airflow is available with me. But even after going through documentation I am not clear where exactly I need to write script for scheduling and how will that script be available into airflow webserver so I could see the status

As far as the configuration is concerned I know where the dag folder is located in my home directory and also where example dags are located.

Note: Please dont mark this as duplicate with How to run bash script file in Airflow as I need to run python files lying in some different location.

Please find the configuration in Airflow webserver as :

enter image description here

Below is the screenshot of dag folder in AIRFLOW_HOME dir

enter image description here

Also find the below screenshot for DAG creation screenshot and Missing DAG error

enter image description here

enter image description here

After i select the simple DAG following error of missing DAG is populated

enter image description here

You should probably use the PythonOperator to call your function. If you want to define the function somewhere else, you can simply import it from a module as long as it’s accessible in your PYTHONPATH.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from my_script import my_python_function

dag = DAG('tutorial', default_args=default_args)

PythonOperator(dag=dag,
               task_id='my_task_powered_by_python',
               provide_context=False,
               python_callable=my_python_function,
               op_args=['arguments_passed_to_callable'],
               op_kwargs={'keyword_argument':'which will be passed to function'})

If your function my_python_function was in a script file /path/to/my/scripts/dir/my_script.py

Then before starting Airflow, you could add the path to your scripts to the PYTHONPATH like so:

export PYTHONPATH=/path/to/my/scripts/dir/:$PYTHONPATH

More information here:
https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html

Default args and other considerations as in the tutorial: https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html

You can also use bashoperator to execute python scripts in Airflow. You can put your scripts in a folder in DAG folder. If your scripts are somewhere else, just give a path to those scripts.

    from airflow import DAG
    from airflow.operators import BashOperator,PythonOperator
    from datetime import datetime, timedelta

    seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                      datetime.min.time())

    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': seven_days_ago,
        'email': ['[email protected]'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
      }

    dag = DAG('simple', default_args=default_args)
t1 = BashOperator(
    task_id='testairflow',
    bash_command='python /home/airflow/airflow/dags/scripts/file1.py',
    dag=dag)

Airflow parses all Python files in $AIRFLOW_HOME/dags (in your case /home/amit/airflow/dags). And that python script should retrun a DAG object back as shown in answer from “postrational”.
Now when it is being reported as missing that means there is some issue in Python code and Airflow could not load it. Check airflow webserver or scheduler logs for more details, as stderr or stdout goes there.

  1. Install airflow using Airflow official documentation. Its good idea to install in python virtual environment.
    http://python-guide-pt-br.readthedocs.io/en/latest/dev/virtualenvs/
  2. When we start airflow first time using

airflow webserver -p <port>

It loads examples dags automatically, It can be disable in
$HOME/airflow/airflow.cfg

`load_examples = False`
  1. Create dags folder in $HOME/airflow/, put tutorial.py file in dags folder from https://airflow.incubator.apache.org/tutorial.html
  2. Do some experiments, make changes in tutorial.py. If you are giving schedule_interval as cron syntax, then 'start_date' : datetime(2017, 7, 7)

    'start_date': datetime.now()
    

    dag = DAG('tutorial', default_args=default_args,schedule_interval="@once")
    or
    dag = DAG('tutorial', default_args=default_args,schedule_interval="* * * * *") # schedule each minute

  3. start airflow: $ airflow webserver -p <port>

  4. start scheduler: $ airflow scheduler


The answers/resolutions are collected from stackoverflow, are licensed under cc by-sa 2.5 , cc by-sa 3.0 and cc by-sa 4.0 .