1
\$\begingroup\$

I am a junior data engineer that have 3 years of experience with Python. I write a lot of Python code for my job and I came up with this question I can't solve by my own. I don't have the chance to have seniors developers to help at my job so let me asking you guys. My question relates to simplifying functions. Basically, I learned coding by myself so I used to not even use functions at all at the beginning. Now I implement more and more functions in my code for clarity, modularity and testing purposes. But I come to a point where I don't know to which extent use very simple functions over functions that do a bit more things.

I do a lot of data processing in my job, so I often need to load raw data, clean raw data, apply some processing, make calculations etc. Here is an example function taken from a script I used in a project. After loading the data in a dataframe, the function basically takes the dataframe as an argument, apply some cleaning and returns the dataframe with some of the columns cleaned, so the dataframe can be used later.

def clean_data(df: pd.DataFrame) -> pd.DataFrame:
    """Clean the dataframe
    - drop columns with all NaNs
    - drop duplicated entries
    - drop data with wrong unit
    - convert strings to float (O,2 -> 0.2)
    - clean some NomParametreLims with double parenthesis instead of one
    """
    # drop columns with all NaNs
    df.dropna(how="all", axis=1, inplace=True)
    # drop duplicated entries
    df.drop_duplicates(inplace=True)
    # drop data with wrong unit
    df.drop(df.loc[df["Unit"] == "-"].index, inplace=True)
    # convert strings to float (O,2 -> 0.2)
    df["Rs"] = df["Rs"].apply(string_to_float)
    df["Lq"] = df["Lq"].apply(string_to_float)
    # clean some NomParametreLims with double parenthesis instead of one
    df["NomParametreLims"] = df["NomParametreLims"].str.strip().str.replace("((", "(").str.replace("))", ")")
    return df

The first problem I see is that the function takes the whole dataframe as an argument whilst it only needs some columns to clean (except that it needs all columns to apply the .dropna & drop_duplicates methods). Second, the function do too many things, it drops nan values, drop duplicates, drop data with wrong unit, convert some strings to float, strip and fix parenthesis issues on NomParametreLims column.

If I had to do it again I would probably separate things out. Maybe make 6 simple functions:

def dropna(df, how="all", axis=1, **kwargs) -> pd.DataFrame:
   return df.dropna(how=how, axis=axis, **kwargs)

def drop_duplicates(df, **kwargs) -> pd.DataFrame:
   return df.drop_duplicates(**kwargs)

def drop_wrong_unit_data(df) -> pd.DataFrame:
   return df.drop(df.loc[df["Unit"] == "-"].index)

def convert_string_to_float(column) -> pd.Series:
   return column.apply(string_to_float)

def strip(column) -> pd.Series:
   return column.str.strip()

def replace(column, pattern, string) -> pd.Series:
   return column.str.replace(pattern, string)

I also see the possibility to make a class called for instance CleanData that takes the dataframe as an constructor argument, and implement these functions as methods.

The problem I see in making a lot of simple functions like these is that it requires a lot of work (in writing and unit testing). Also, sometimes, you know the function will be used only once (for instance drop_wrong_unit_data()), so I see no need to take time to make it very clean simple and modular. Also, once you split the process to a lot of functions, do you create other functions to call theses functions?

For instance, will the script look like this?

# ...
# *defintions of all the simple functions here*
# ...


def load_data():
   simple_function_1()
   simple_function_2()

def clean_data():
   dropna(...)
   drop duplicates(...)
   drop(...)
   convert_string_to_float(...)
   convert_string_to_float(...)
   strip(...)
   replace(...)

def process_data():
   simple_function_1()
   simple_function_2()
   simple_function_2()

def main():
   load_data()
   clean_data()
   process_data()

if __name__ == '__main__':
   main()

Or is there anything that you'll do differently?

If anyone experienced already dealt with this kind of questioning and found great solutions, please do not hesitate to comment! It'll be greatly appreciated.

EDIT:

Here is the full script

"""
This scripts transforms bronze SAVLAB data to silver data.

It reads bronze SAVLAB data from 'data/processed_data/BRONZE/SAVLAB', transform it,
and stored it as silver data in 'data/processed_data/SILVER/SAVLAB' partitionned by Annee.
"""
import os
import sys
from pathlib import Path

import pandas as pd
import pyarrow as pa
import numpy as np
from unidecode import unidecode
from dotenv import load_dotenv

# add root folder to path
sys.path.append(str(Path(__file__).resolve().parent.parent))

from data_pipeline.utils import (
    load_data,
    add_columns_to_dataframe,
    string_to_float,
    clean_rejet_names,
    generate_param_id,
    timer,
    rename_columns, 
    save_dataframe_to_parquet
)
from logs.utils import setup_logger

load_dotenv()


# ================= Constants ================= #
PROCESSED_DATA_DIR = Path(os.getenv("PROCESSED_DATA_DIR"))
INPUT_DIR = PROCESSED_DATA_DIR / "BRONZE" / "SAVLAB"
OUTPUT_DIR = PROCESSED_DATA_DIR / "SILVER" / "SAVLAB"
COLS_MAPPING = {
    "DatePrel": "Date",
    "RefEchantillonLabo": "Ref_echantillon",
    "RsAna_converted": "Valeur",
    "Parametre": "Param_id",
    "NomParametreLims": "Parametre",
}
LOGGER = setup_logger("Root", "pipeline.log")


# ================= FUNCTIONS ================= #
def clean_data(df: pd.DataFrame) -> pd.DataFrame:
    """Clean the dataframe
    - drop columns with all NaNs
    - drop duplicated entries
    - convert strings to float (O,2 -> 0.2)
    - clean some NomParametreLims with double '(' instead of one
    """
    # drop columns with all NaNs
    df.dropna(how="all", axis=1, inplace=True)
    # drop duplicated entries
    df.drop_duplicates(inplace=True)
    # drop data with wrong unit
    df.drop(df.loc[df["Unit"] == "-"].index, inplace=True)
    # convert strings to float (O,2 -> 0.2)
    df["RsAna"] = df["RsAna"].apply(string_to_float)
    df["LQAna"] = df["LQAna"].apply(string_to_float)
    # clean some NomParametreLims with double '(' instead of one
    df["NomParametreLims"] = df["NomParametreLims"].str.strip().str.replace("((", "(").str.replace("))", ")")
    return df


def process_data(df: pd.DataFrame) -> pd.DataFrame:
    """Apply some processing to the dataframe"""
    # normalize dirty rejets names (remove accents)
    df["InfosEchantillonClient"] = df["InfosEchantillonClient"].apply(unidecode)
    # normalize dirty rejets names (remove new lines)
    df["InfosEchantillonClient"] = df["InfosEchantillonClient"].str.replace("\r\n", "")
    # create new names for rejet with regex
    df["Rejet"] = df["InfosEchantillonClient"].apply(clean_rejet_names)
    # add mask for analyse non réglementaire
    df["Analyse_non_reglementaire"] = df["InfosEchantillonClient"].str.contains(
        "non reglementaire|complementaires", case=False
    )

    # drop duplicated values (drop when duplicates & 'analyses complementaires/reglementaires')
    df["Duplicated"] = df.duplicated(["Rejet", "DatePrel", "Parametre", "NomParametreLims"], keep=False)
    df.drop(df.loc[(df["Duplicated"] == True) & df["Analyse_non_reglementaire"] == True].index, inplace=True)
    df.drop(df[(df["Duplicated"] == True) & (df["RqAna"] == -1)].index, inplace=True)
    df.drop(df[(df["Duplicated"] == True) & (df["RqAna"].isna())].index, inplace=True)
    df.drop_duplicates(subset=["Rejet", "DatePrel", "Parametre", "NomParametreLims"], inplace=True)

    # add boolean mask for Limite Quantification
    df["Limite_quantification"] = df["RefAna"].str.contains("<")
    # convert µg/l or ng/l to mg/l when possible
    df["RsAna_converted"] = np.where(df["SymUniteReferenceLims"] == "µg/l", df["RsAna"] / 1e3, df["RsAna"])
    df["RsAna_converted"] = np.where(
        df["SymUniteReferenceLims"] == "ng/l", df["RsAna_converted"] / 1e6, df["RsAna_converted"]
    )
    df["Unite"] = np.where(
        (df["SymUniteReferenceLims"] == "µg/l") | (df["SymUniteReferenceLims"] == "ng/l"),
        "mg/l",
        df["SymUniteReferenceLims"],
    )
    # add SAVLAB to param id
    df["Parametre"] = "SAVLAB_" + df["Parametre"]

    return df


def fill_missing_ids(df: pd.DataFrame) -> None:
    """Fill missing ids for some parameters"""
    missing_params = df.query("Parametre.isna()")["NomParametreLims"].unique()
    all_ids = df["Parametre"].unique()
    param_id = {}
    for missing_param in missing_params:
        param_id = generate_param_id(missing_param, existing_ids=all_ids)
        df.loc[df["NomParametreLims"] == missing_param, "Parametre"] = param_id


def specific_processing(df: pd.DataFrame) -> pd.DataFrame:
    """Apply some processing to specific rejets and parameters"""
    # create new param SAVLAB__TEAU as = MOY(_TEAU30, _TEAU0) for TAR 41 ERA et TAR 41 ERT
    sub = df.query("Rejet in ['TAR 41 ERA', 'TAR 41 ERT'] & Param_id in ['SAVLAB__TEAU0', 'SAVLAB__TEAU30']").copy()
    sub["Valeur"] = sub.groupby(["Rejet", "Date"])["Valeur"].transform("mean")
    sub["Param_id"] = "SAVLAB__TEAU_MOY"
    sub["Parametre"] = "Température de l'eau ou de mesure (moy)"
    df = pd.concat([df, sub], ignore_index=True)
    df.drop_duplicates(inplace=True)
    return df


@timer(logger=LOGGER)
def main():
    LOGGER.info("Script is running.")

    # load bronze data
    bronze = load_data(INPUT_DIR, format="parquet")
    # clean data
    silver = clean_data(bronze)
    # process data
    silver = process_data(silver)
    # create missing ids when missing
    fill_missing_ids(silver)
    # rename
    silver = rename_columns(silver, columns=COLS_MAPPING)
    # do some specific processing
    silver = specific_processing(silver)
    # order
    silver.sort_values(["Rejet", "Date", "Param_id"], ignore_index=True, inplace=True)
    # add Anne column for partitionning
    silver = add_columns_to_dataframe(silver, columns_values={"Annee": silver["Date"].dt.year})
    # write to parquet
    save_dataframe_to_parquet(
        silver,
        output_dir=OUTPUT_DIR,
        partitioning_schema=pa.schema([("Annee", pa.int32())])
    )


if __name__ == "__main__":
    main()

\$\endgroup\$
5
  • 2
    \$\begingroup\$ The first clean_data is a better idea than your "six simple functions"; can you show the entire program containing the former? \$\endgroup\$
    – Reinderien
    Commented Sep 14, 2023 at 12:25
  • \$\begingroup\$ With the caveat that you should not be doing in-place operations here. \$\endgroup\$
    – Reinderien
    Commented Sep 14, 2023 at 12:38
  • \$\begingroup\$ @Reinderien it's a bit long but here it is codefile.io/f/3vwbOW6WFH \$\endgroup\$
    – Izem
    Commented Sep 14, 2023 at 13:56
  • 2
    \$\begingroup\$ 130-some lines is not long; please paste it here. \$\endgroup\$
    – Reinderien
    Commented Sep 14, 2023 at 14:02
  • \$\begingroup\$ I posted it as an EDIT in the post \$\endgroup\$
    – Izem
    Commented Sep 20, 2023 at 12:17

0

Browse other questions tagged or ask your own question.