all!
I'm trying to create a task group for each list of params dynamically generated in my DAG. Everything goes fine until I try to use the expanded arguments as an input for a KubernetesPodOperator inside a task group. What am I missing?
Sample code:
`with airflow.DAG( DAG_ID, default_args=default_args, tags=tags_, schedule_interval=SCHEDULE_INTERVAL, is_paused_upon_creation=False, catchup=False, max_active_tasks=POOL_SIZE, max_active_runs=1, ) as dag:
@task
def fetch_tables(dbtables, **context):
user_tables = ['table01', 'table02', ... ]
context['ti'].xcom_push(key='ok_tables', value=list(set(user_tables).intersection(set(dbtables))))
context['ti'].xcom_push(key='missing_tables', value=list(set(dbtables).difference(set(user_tables))))
@task
def gen_params(**context):
ok_tables = context['ti'].xcom_pull(key='ok_tables', task_ids='fetch_tables')
return [
{
'env_vars' : {
"JOB_DBTABLE": tabela,
"JOB_TAB_INDEX": f"{tab_index}",
}
} for tab_index, tabela in enumerate(ok_tables) ]
@task_group
def ingestao_task_group(job_params):
comando_submit = KubernetesPodOperator(
task_id="comando_submit",
retries=10,
name=f'{POD_NAME_PREFIX}-driver',
image="mycustomimage:latest",
in_cluster=True,
namespace="airflow",
service_account_name='airflow-worker',
image_pull_policy="Always",
random_name_suffix=True,
pool=POOL_NAME,
env_vars=job_params,
)
job_params = gera_params()
fetch_tables(eval(DBTABLES_JSON)) >> job_params
ingestao_task_group.expand(job_params=job_params)
`
I got the error below:
Broken DAG: [/some/path/to/mydag.py] Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 324, in init env_vars = convert_env_vars(env_vars) if env_vars else [] File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py", line 82, in convert_env_vars raise AirflowException(f"Expected dict or list, got {type(env_vars)}") airflow.exceptions.AirflowException: Expected dict or list, got <class 'airflow.models.expandinput.MappedArgument'>
I tried to extract the mapped argument in a previous task before using inside the KubernetesPodOperator:
@task_group
def ingestao_task_group(job_params):
@task
def extract_job_params(job_params):
print('#'*20, 'job_params', job_params.get('env_vars'))
return job_params.get('env_vars')
# extracted_job_params = extract_job_params(job_params=job_params)
comando_submit = KubernetesPodOperator(
[...]
env_vars=extract_job_params(job_params=job_params),
)
But then I got this instead:
Broken DAG: [.../ dag-teste.py]
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 324, in __init__
env_vars = convert_env_vars(env_vars) if env_vars else []
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py", line 82, in convert_env_vars
raise AirflowException(f"Expected dict or list, got {type(env_vars)}")
airflow.exceptions.AirflowException: Expected dict or list, got <class 'airflow.models.xcom_arg.PlainXComArg'>