1

I want to read postgres table into a data frame for a data engineering pipeline. I am using Airflow to scheduled these tasks. I have created a connection in Airflow called postgres_product_db and tried to use get_pandas_df to fetch the records .

db_hook = PostgresHook('postgres_product_db')
fetch_item = db_hook.get_pandas_df(request)

But it is throwing error as

UnicodeDecodeError: 'ascii' codec can't decode byte 0xc3 in position 3: ordinal not in range(128)

full error log :

 [2022-04-05, 06:46:27 UTC] {taskinstance.py:1774} ERROR - Task failed with exception
    Traceback (most recent call last):
      File "/home/azureuser/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 174, in execute
        return_value = self.execute_callable()
      File "/home/azureuser/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 188, in execute_callable
        return self.python_callable(*self.op_args, **self.op_kwargs)
      File "/home/azureuser/airflow/dags/foodstar_store1_pricing_update_program.py", line 81, in fetch_inventory
        inventory = db_hook.get_pandas_df(request)
      File "/home/azureuser/.local/lib/python3.8/site-packages/airflow/hooks/dbapi.py", line 138, in get_pandas_df
        return psql.read_sql(sql, con=conn, params=parameters, **kwargs)
      File "/home/azureuser/.local/lib/python3.8/site-packages/pandas/io/sql.py", line 566, in read_sql
        return pandas_sql.read_query(
      File "/home/azureuser/.local/lib/python3.8/site-packages/pandas/io/sql.py", line 2094, in read_query
        data = self._fetchall_as_list(cursor)
      File "/home/azureuser/.local/lib/python3.8/site-packages/pandas/io/sql.py", line 2108, in _fetchall_as_list
        result = cur.fetchall()
    UnicodeDecodeError: 'ascii' codec can't decode byte 0xc3 in position 3: ordinal not in range(128)

in normal scenario, to overcome this , I used to set the encoding

conn=psycopg2.connect(
database="xxxxx", user='xxxxx', password='xxxx', host='xxxx.xxx.xxx.xx.xxx', port='5432')
conn.set_client_encoding('UNICODE')
cur=conn.cursor()

But not able to find any option to set the client_encoding in PostgresHook. There is an option of Extra in the connection ,where I tried to set as { encode: 'UNICODE' } . but this also throws error. Can anyone help here ?

1 Answer 1

2

client_encoding is run time config. This means you should embedded this to your SQL statement:

db_hook = PostgresHook('postgres_product_db')
sql="SET client_encoding = 'UTF8'; SELECT col FROM my_table "
fetch_item = db_hook.get_pandas_df(sql=sql)

You didn't ask about it but for cases where PostgresOperator applies then the usage can be:

from airflow.providers.postgres.operators.postgres import PostgresOperator

op = PostgresOperator(
    task_id="my_task",
    sql=sql,
    runtime_parameters={'set_client_encoding': 'UNICODE'},
)

This functionality was added in PR which is available for apache-airflow-providers-postgres>=4.1.0

Not the answer you're looking for? Browse other questions tagged or ask your own question.