41

I just started using Airflow, can anyone enlighten me how to pass a parameter into PythonOperator like below:

t5_send_notification = PythonOperator(
    task_id='t5_send_notification',
    provide_context=True,
    python_callable=SendEmail,
    op_kwargs=None,
    #op_kwargs=(key1='value1', key2='value2'),
    dag=dag,
)

def SendEmail(**kwargs):
    msg = MIMEText("The pipeline for client1 is completed, please check.")
    msg['Subject'] = "xxxx"
    msg['From'] = "xxxx"
    ......
    s = smtplib.SMTP('localhost')
    s.send_message(msg)
    s.quit()

I would like to be able to pass some parameters into the t5_send_notification's callable which is SendEmail, ideally I want to attach the full log and/or part of the log (which is essentially from the kwargs) to the email to be sent out, guessing the t5_send_notification is the place to gather those information.

Thank you very much.

2 Answers 2

70
  1. Pass a dict object to op_kwargs
  2. Use the keys to access their value from kwargs dict in your python callable

    def SendEmail(**kwargs):
        print(kwargs['key1'])
        print(kwargs['key2'])
        msg = MIMEText("The pipeline for client1 is completed, please check.")
        msg['Subject'] = "xxxx"
        msg['From'] = "xxxx"
        ......
        s = smtplib.SMTP('localhost')
        s.send_message(msg)
        s.quit()
    
    
    t5_send_notification = PythonOperator(
        task_id='t5_send_notification',
        provide_context=True,
        python_callable=SendEmail,
        op_kwargs={'key1': 'value1', 'key2': 'value2'},
        dag=dag,
    )
    
3
  • 1
    that didn't work. it should be like: SendEmail(key1, key2, **kwargs):
    – Amin
    Commented Mar 17, 2020 at 4:18
  • @Amin which version of the airflow you are using? the reason why I asked is that you might be using python3 as the latest versions of airflow support python3 much better than a year ago, but still there are lots of people using python2 for airflow dev.
    – Ryan Yuan
    Commented Mar 17, 2020 at 4:49
  • 4
    If it has something to do with python version please mention it in the answer.
    – Amin
    Commented Mar 18, 2020 at 5:45
21

PythonOperator have a named parameter op_kwargs and accepts dict object.

have

t5_send_notification = PythonOperator(
    task_id='t5_send_notification',
    provide_context=True,
    python_callable=SendEmail,
    op_kwargs={"my_param":'value1'},
    dag=dag,
)

def SendEmail(my_param,**kwargs):
    print(my_param) #'value_1'
    msg = MIMEText("The pipeline for client1 is completed, please check.")
    msg['Subject'] = "xxxx"
    msg['From'] = "xxxx"
    ......
    s = smtplib.SMTP('localhost')
    s.send_me
3
  • 2
    Thank you Ethan, your code of op_kwargs={my_param='value1'}, reports error.
    – mdivk
    Commented Feb 27, 2019 at 1:09
  • 1
    {key : value} Commented Jul 11, 2019 at 16:03
  • Do you have the entire error? Is the key in quotes as well? Commented Jul 12, 2019 at 19:18

Not the answer you're looking for? Browse other questions tagged or ask your own question.