My question is - when should I do dataframe.cache() and when it's useful?
Also, in my code should I cache the dataframes in the commented lines?
Note: My dataframes are loaded from a Redshift DB.
Many thanks
Here my code:
def sub_tax_transfer_pricing_eur_aux(manager, dataframe, seq_recs, seq_reservas, df_aux): df_vta = manager.get_dataframe(tables['dwc_oth_v_re_v_impuesto_sap_vta']) df_cpa = manager.get_dataframe(tables['dwc_oth_v_re_v_impuesto_sap_cpa']) dataframe = dataframe.filter(dataframe.seq_rec.isin(seq_recs)) \ .filter(dataframe.seq_reserva.isin(seq_reservas)) ################################################## #SHOULD I CACHE HERE df_vta, df_cpa and dataframe ################################################## dataframe = dataframe.join(df_vta, [dataframe.ind_tipo_imp_vta_fac == df_vta.ind_tipo_imp_vta, dataframe.cod_impuesto_vta_fac == df_vta.cod_impuesto_vta, dataframe.cod_clasif_vta_fac == df_vta.cod_clasif_vta, dataframe.cod_esquema_vta_fac == df_vta.cod_esquema_vta, dataframe.cod_empresa_vta_fac == df_vta.cod_emp_atlas_vta, ]).drop("ind_tipo_imp_vta", "cod_impuesto_vta", "cod_clasif_vta", "cod_esquema_vta", "cod_emp_atlas_vta") \ .join(df_cpa, [dataframe.ind_tipo_imp_vta_fac == df_cpa.ind_tipo_imp_cpa, dataframe.cod_impuesto_vta_fac == df_cpa.cod_impuesto_cpa, dataframe.cod_clasif_vta_fac == df_cpa.cod_clasif_cpa, dataframe.cod_esquema_vta_fac == df_cpa.cod_esquema_cpa, dataframe.cod_empresa_vta_fac == df_cpa.cod_emp_atlas_cpa, ]).drop("ind_tipo_imp_cpa", "cod_impuesto_cpa", "cod_clasif_cpa", "cod_esquema_cpa", "cod_emp_atlas_cpa") \ .select("seq_rec", "seq_reserva", "ind_tipo_regimen_fac", "imp_margen_canal", "ind_tipo_regimen_con", "imp_coste", "imp_margen_canco", "imp_venta", "pct_impuesto_vta", "pct_impuesto_cpa") ###################################### #SHOULD I CACHE HERE dataframe AGAIN ? ###################################### dataframe = dataframe.withColumn("amount1", func.when(dataframe.ind_tipo_regimen_fac == 'E', dataframe.imp_margen_canal * ( 1 - (1 / (1 + (dataframe.pct_impuesto_vta / 100))))) .otherwise(dataframe.imp_venta * ( 1 - (1 / (1 + (dataframe.pct_impuesto_vta / 100)))) - ( dataframe.imp_venta - dataframe.imp_margen_canal) * ( 1 - (1 / (1 + (dataframe.pct_impuesto_cpa / 100)))))) dataframe = dataframe.withColumn("amount2", func.when(dataframe.ind_tipo_regimen_con == 'E', dataframe.imp_margen_canco * ( 1 - (1 / (1 + (dataframe.pct_impuesto_vta / 100))))) .otherwise((dataframe.imp_coste + dataframe.imp_margen_canco) * ( 1 - (1 / (1 + (dataframe.pct_impuesto_vta / 100)))) - ( dataframe.imp_coste) * ( 1 - (1 / (1 + (dataframe.pct_impuesto_cpa / 100)))))) dataframe = dataframe.na.fill({'amount1': 0}) dataframe = dataframe.na.fill({'amount2': 0}) dataframe = dataframe.join(df_aux, [dataframe.seq_rec == df_aux.operative_incoming, dataframe.seq_reserva == df_aux.booking_id]) dataframe = dataframe.withColumn("impuesto_canco1", udf_currency_exchange(dataframe.booking_currency, func.lit(EUR), dataframe.creation_date, dataframe.amount1)) dataframe = dataframe.withColumn("impuesto_canco2", udf_currency_exchange(dataframe.booking_currency, func.lit(EUR), dataframe.creation_date, dataframe.amount2)) dataframe = dataframe.withColumn("impuesto_canco", dataframe.impuesto_canco1 + dataframe.impuesto_canco2) dataframe = dataframe.na.fill({'impuesto_canco': 0}) dataframe = dataframe.select("operative_incoming", "booking_id", "impuesto_canco") ###################################### #SHOULD I CACHE HERE dataframe AGAIN ? ###################################### dataframe = dataframe.groupBy("operative_incoming", "booking_id").agg({'impuesto_canco': 'sum'}). \ withColumnRenamed("SUM(impuesto_canco)", "impuesto_canco") return dataframe