Hi I'm trying to implement a stateprocessor for my custom logic., ideally we are streaming and I want the custom logic of calculating packet loss from a previous row.
i implemented the stateprocessor but it is not working as expected. the first microbatch with full load works as expected but as the streaming proceeds with the next microbatches the rows start diminishing.
Im not sure if its due to the state not able to recall the last value or the join being incorrect.
I have a filter at my final write in my for each where i filter off values that are null.
Below is my class and subsequent transformations
df_base = ( add_oid(df_mef,"managedGroupID","terminalID") .withWatermark("time_trunc", "15 minutes") ) counters = df_base.select( "OId", "time_trunc", "fwdPacketLoss", "fwdPacketsSent", "rtnPacketLoss", "rtnPacketsSent" ) class PacketLossProcessor(StatefulProcessor): def init(self, handle: StatefulProcessorHandle) -> None: self.handle = handle state_schema = StructType([ StructField("fwd_sent", DoubleType(), True), StructField("fwd_loss", DoubleType(), True), StructField("rtn_sent", DoubleType(), True), StructField("rtn_loss", DoubleType(), True) ]) self.state = handle.getValueState("packet_loss_state", state_schema) def handleInputRows(self, key, rows: Iterator[pd.DataFrame], timerValues) -> Iterator[pd.DataFrame]: # ---- DEBUG START (safe) ---------------------------- oid = key[0] if isinstance(key, (list, tuple)) else key # safe extraction; no logic change # ---- DEBUG END ------------------------------------- # Ensure state is a DataFrame if self.state.exists(): state_df = self.state.get() if isinstance(state_df, pd.DataFrame) and not state_df.empty: state = state_df.iloc[0].to_dict() else: state = { "fwd_sent": None, "fwd_loss": None, "rtn_sent": None, "rtn_loss": None } else: state = { "fwd_sent": None, "fwd_loss": None, "rtn_sent": None, "rtn_loss": None } # Process input rows all_rows = pd.concat(list(rows)).sort_values("time_trunc") output = [] for _, r in all_rows.iterrows(): ts = r["time_trunc"] fs, fl = r["fwdPacketsSent"], r["fwdPacketLoss"] rs, rl = r["rtnPacketsSent"], r["rtnPacketLoss"] def pct(prev_s, prev_l, cur_s, cur_l): if pd.isna(prev_s) or pd.isna(prev_l) or pd.isna(cur_s) or pd.isna(cur_l): return None #PR For modem resets we are treating the loss as Null(currently only if the counter increases we treat it as valid) if cur_s <= prev_s: return None ds, dl = cur_s - prev_s, cur_l - prev_l if ds <= 0: return None val = 100.0 * (dl / ds) return max(0.0, min(100.0, val)) fwd_pct = pct(state["fwd_sent"], state["fwd_loss"], fs, fl) rtn_pct = pct(state["rtn_sent"], state["rtn_loss"], rs, rl) row_out = {"OId": str(oid), "time_trunc": ts} if fwd_pct is not None: row_out["fwdpacketLoss"] = fwd_pct if rtn_pct is not None: row_out["rtnpacketLoss"] = rtn_pct if "fwdpacketLoss" in row_out or "rtnpacketLoss" in row_out: output.append(row_out) state["fwd_sent"], state["fwd_loss"] = fs, fl state["rtn_sent"], state["rtn_loss"] = rs, rl # Update state as a DataFrame self.state.update(pd.DataFrame([state])) yield pd.DataFrame(output) output_schema = StructType([ StructField("OId", StringType(), True), StructField("time_trunc", TimestampType(), True), StructField("fwdpacketLoss", DoubleType(), True), StructField("rtnpacketLoss", DoubleType(), True) ]) df_loss = ( counters.groupBy("OId") .transformWithStateInPandas( statefulProcessor=PacketLossProcessor(), outputStructType=output_schema, outputMode="append", timeMode="ProcessingTime" ) ) I'm using the start date as a parameter in the below example I used the start date as 2025-10-19T05:34:00.000Z and the df_loss processed one row but stopped processing live inputs after that. below is my input
|count(1)|date_trunc(minute, timedate)| |---|---| |1|2025-10-19T05:50:00.000+00:00| |1|2025-10-19T05:49:00.000+00:00| |1|2025-10-19T05:48:00.000+00:00| |1|2025-10-19T05:47:00.000+00:00| |1|2025-10-19T05:46:00.000+00:00| |1|2025-10-19T05:45:00.000+00:00| |1|2025-10-19T05:44:00.000+00:00| |1|2025-10-19T05:43:00.000+00:00| |1|2025-10-19T05:42:00.000+00:00| |1|2025-10-19T05:41:00.000+00:00| |1|2025-10-19T05:40:00.000+00:00| |1|2025-10-19T05:39:00.000+00:00| |1|2025-10-19T05:38:00.000+00:00| |1|2025-10-19T05:37:00.000+00:00| |1|2025-10-19T05:36:00.000+00:00| |1|2025-10-19T05:35:00.000+00:00| |1|2025-10-19T05:34:00.000+00:00| My df_loss processed the first two rows but stopped afterwards:
|OId|time_trunc|pctfwdpacketLoss|pctrtnpacketLoss| |---|---|---|---| |20000147|2025-10-19T05:35:00.000+00:00|0|0|