0

all!

I'm trying to create a task group for each list of params dynamically generated in my DAG. Everything goes fine until I try to use the expanded arguments as an input for a KubernetesPodOperator inside a task group. What am I missing?

Sample code:

`with airflow.DAG( DAG_ID, default_args=default_args, tags=tags_, schedule_interval=SCHEDULE_INTERVAL, is_paused_upon_creation=False, catchup=False, max_active_tasks=POOL_SIZE, max_active_runs=1, ) as dag:

@task
def fetch_tables(dbtables, **context):

    user_tables = ['table01', 'table02', ... ]
    context['ti'].xcom_push(key='ok_tables', value=list(set(user_tables).intersection(set(dbtables))))
    context['ti'].xcom_push(key='missing_tables', value=list(set(dbtables).difference(set(user_tables))))


@task
def gen_params(**context):

    ok_tables = context['ti'].xcom_pull(key='ok_tables', task_ids='fetch_tables')

    return [ 
        { 
            'env_vars' : {
                "JOB_DBTABLE": tabela,
                "JOB_TAB_INDEX": f"{tab_index}",
            }
        } for tab_index, tabela in enumerate(ok_tables) ]

@task_group
def ingestao_task_group(job_params):
   
    comando_submit = KubernetesPodOperator(
        task_id="comando_submit",
        retries=10,
        name=f'{POD_NAME_PREFIX}-driver',
        image="mycustomimage:latest",
        in_cluster=True,
        namespace="airflow",
        service_account_name='airflow-worker',
        image_pull_policy="Always",
        random_name_suffix=True,
        pool=POOL_NAME,
        env_vars=job_params,
    )


job_params = gera_params()
fetch_tables(eval(DBTABLES_JSON)) >> job_params
ingestao_task_group.expand(job_params=job_params)

`

I got the error below:

Broken DAG: [/some/path/to/mydag.py] Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 324, in init env_vars = convert_env_vars(env_vars) if env_vars else [] File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py", line 82, in convert_env_vars raise AirflowException(f"Expected dict or list, got {type(env_vars)}") airflow.exceptions.AirflowException: Expected dict or list, got <class 'airflow.models.expandinput.MappedArgument'>

I tried to extract the mapped argument in a previous task before using inside the KubernetesPodOperator:

    @task_group
    def ingestao_task_group(job_params):
       
        @task
        def extract_job_params(job_params):

            print('#'*20, 'job_params', job_params.get('env_vars'))

            return job_params.get('env_vars')
        
        # extracted_job_params = extract_job_params(job_params=job_params)

        comando_submit = KubernetesPodOperator(
            [...]
            env_vars=extract_job_params(job_params=job_params),
        )

But then I got this instead:

Broken DAG: [.../ dag-teste.py]
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 324, in __init__
    env_vars = convert_env_vars(env_vars) if env_vars else []
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py", line 82, in convert_env_vars
    raise AirflowException(f"Expected dict or list, got {type(env_vars)}")
airflow.exceptions.AirflowException: Expected dict or list, got <class 'airflow.models.xcom_arg.PlainXComArg'>

0

Browse other questions tagged or ask your own question.