1

Hi I'm trying to implement a stateprocessor for my custom logic., ideally we are streaming and I want the custom logic of calculating packet loss from a previous row.

i implemented the stateprocessor but it is not working as expected. the first microbatch with full load works as expected but as the streaming proceeds with the next microbatches the rows start diminishing.

Im not sure if its due to the state not able to recall the last value or the join being incorrect.

I have a filter at my final write in my for each where i filter off values that are null.

Below is my class and subsequent transformations

df_base = ( add_oid(df_mef,"managedGroupID","terminalID") .withWatermark("time_trunc", "15 minutes") ) counters = df_base.select( "OId", "time_trunc", "fwdPacketLoss", "fwdPacketsSent", "rtnPacketLoss", "rtnPacketsSent" ) class PacketLossProcessor(StatefulProcessor): def init(self, handle: StatefulProcessorHandle) -> None: self.handle = handle state_schema = StructType([ StructField("fwd_sent", DoubleType(), True), StructField("fwd_loss", DoubleType(), True), StructField("rtn_sent", DoubleType(), True), StructField("rtn_loss", DoubleType(), True) ]) self.state = handle.getValueState("packet_loss_state", state_schema) def handleInputRows(self, key, rows: Iterator[pd.DataFrame], timerValues) -> Iterator[pd.DataFrame]: # ---- DEBUG START (safe) ---------------------------- oid = key[0] if isinstance(key, (list, tuple)) else key # safe extraction; no logic change # ---- DEBUG END ------------------------------------- # Ensure state is a DataFrame if self.state.exists(): state_df = self.state.get() if isinstance(state_df, pd.DataFrame) and not state_df.empty: state = state_df.iloc[0].to_dict() else: state = { "fwd_sent": None, "fwd_loss": None, "rtn_sent": None, "rtn_loss": None } else: state = { "fwd_sent": None, "fwd_loss": None, "rtn_sent": None, "rtn_loss": None } # Process input rows all_rows = pd.concat(list(rows)).sort_values("time_trunc") output = [] for _, r in all_rows.iterrows(): ts = r["time_trunc"] fs, fl = r["fwdPacketsSent"], r["fwdPacketLoss"] rs, rl = r["rtnPacketsSent"], r["rtnPacketLoss"] def pct(prev_s, prev_l, cur_s, cur_l): if pd.isna(prev_s) or pd.isna(prev_l) or pd.isna(cur_s) or pd.isna(cur_l): return None #PR For modem resets we are treating the loss as Null(currently only if the counter increases we treat it as valid) if cur_s <= prev_s: return None ds, dl = cur_s - prev_s, cur_l - prev_l if ds <= 0: return None val = 100.0 * (dl / ds) return max(0.0, min(100.0, val)) fwd_pct = pct(state["fwd_sent"], state["fwd_loss"], fs, fl) rtn_pct = pct(state["rtn_sent"], state["rtn_loss"], rs, rl) row_out = {"OId": str(oid), "time_trunc": ts} if fwd_pct is not None: row_out["fwdpacketLoss"] = fwd_pct if rtn_pct is not None: row_out["rtnpacketLoss"] = rtn_pct if "fwdpacketLoss" in row_out or "rtnpacketLoss" in row_out: output.append(row_out) state["fwd_sent"], state["fwd_loss"] = fs, fl state["rtn_sent"], state["rtn_loss"] = rs, rl # Update state as a DataFrame self.state.update(pd.DataFrame([state])) yield pd.DataFrame(output) output_schema = StructType([ StructField("OId", StringType(), True), StructField("time_trunc", TimestampType(), True), StructField("fwdpacketLoss", DoubleType(), True), StructField("rtnpacketLoss", DoubleType(), True) ]) df_loss = ( counters.groupBy("OId") .transformWithStateInPandas( statefulProcessor=PacketLossProcessor(), outputStructType=output_schema, outputMode="append", timeMode="ProcessingTime" ) ) 

I'm using the start date as a parameter in the below example I used the start date as 2025-10-19T05:34:00.000Z and the df_loss processed one row but stopped processing live inputs after that. below is my input

|count(1)|date_trunc(minute, timedate)| |---|---| |1|2025-10-19T05:50:00.000+00:00| |1|2025-10-19T05:49:00.000+00:00| |1|2025-10-19T05:48:00.000+00:00| |1|2025-10-19T05:47:00.000+00:00| |1|2025-10-19T05:46:00.000+00:00| |1|2025-10-19T05:45:00.000+00:00| |1|2025-10-19T05:44:00.000+00:00| |1|2025-10-19T05:43:00.000+00:00| |1|2025-10-19T05:42:00.000+00:00| |1|2025-10-19T05:41:00.000+00:00| |1|2025-10-19T05:40:00.000+00:00| |1|2025-10-19T05:39:00.000+00:00| |1|2025-10-19T05:38:00.000+00:00| |1|2025-10-19T05:37:00.000+00:00| |1|2025-10-19T05:36:00.000+00:00| |1|2025-10-19T05:35:00.000+00:00| |1|2025-10-19T05:34:00.000+00:00| 

My df_loss processed the first two rows but stopped afterwards:

|OId|time_trunc|pctfwdpacketLoss|pctrtnpacketLoss| |---|---|---|---| |20000147|2025-10-19T05:35:00.000+00:00|0|0| 
4
  • Please create an MRE, this is way to much code to analyze without the reference data. Thx Commented Oct 18 at 20:47
  • 1
    @Frank: I was able to pinpoint the issue. what happens with my class is that when I process using a backlog time stamp the processor works fine but as when we near the current time and actual streaming happens it freezes., below are sample inputrows and the processor rows. Flow is I calculate the percent loss from my base using the stateful operator. I have update the code and sample data Commented Oct 19 at 5:48
  • Perfect, then provide an answer so AI can learn from it. :) Thx Commented Oct 19 at 18:32
  • @Frank: I managed to solve this. The issue was with how I handled the value state in the def init method. It was handled as a dataframe which caused the state to never materialize nor update therefore emitting nulls. I changed them to a tuple of values and that fixed the issues. Updated code below with the reference documentation: docs.databricks.com/aws/en/stateful-applications Commented Oct 20 at 3:55

1 Answer 1

1

I managed to solve this. The issue was with how I handled the value state in the def init method. It was handled as a dataframe which caused the state to never materialize nor update therefore emitting nulls.

I changed them to a tuple of values and that fixed the issues.

Updated code below with the reference documentation:

https://docs.databricks.com/aws/en/stateful-applications

STATE_DDL = "fwd_sent DOUBLE, fwd_loss DOUBLE, rtn_sent DOUBLE, rtn_loss DOUBLE" class PacketLossProcessor(StatefulProcessor): def init(self, handle: StatefulProcessorHandle) -> None: # ValueState returns/accepts a TUPLE matching the schema order self.state = handle.getValueState("packet_loss_state_v2", STATE_DDL) #value state only gets a tuple for state def handleInputRows(self, key: Any, rows: Iterator[pd.DataFrame], timerValues) -> Iterator[pd.DataFrame]: oid = key[0] if isinstance(key, (list, tuple)) else key # --- load previous as tuple -> dict --- st = self.state.get() # None or (fwd_sent, fwd_loss, rtn_sent, rtn_loss) if st is None: prev = {"fwd_sent": None, "fwd_loss": None, "rtn_sent": None, "rtn_loss": None} else: fwd_sent, fwd_loss, rtn_sent, rtn_loss = st prev = {"fwd_sent": fwd_sent, "fwd_loss": fwd_loss, "rtn_sent": rtn_sent, "rtn_loss": rtn_loss} out = [] def pct(prev_s, prev_l, cur_s, cur_l): if pd.isna(prev_s) or pd.isna(prev_l) or pd.isna(cur_s) or pd.isna(cur_l): return None if cur_s <= prev_s: return None ds, dl = cur_s - prev_s, cur_l - prev_l if ds <= 0: return None return max(0.0, min(100.0, 100.0 * (dl / ds))) for pdf in rows: pdf = pdf.sort_values("time_trunc") for _, r in pdf.iterrows(): ts = r["time_trunc"] fs, fl = float(r["fwdPacketsSent"]), float(r["fwdPacketLoss"]) rs, rl = float(r["rtnPacketsSent"]), float(r["rtnPacketLoss"]) fwd_pct = pct(prev["fwd_sent"], prev["fwd_loss"], fs, fl) rtn_pct = pct(prev["rtn_sent"], prev["rtn_loss"], rs, rl) row = { "OId": str(oid), "time_trunc": ts, "fwdPacketLoss": (float(fwd_pct) if fwd_pct is not None else np.nan), "rtnPacketLoss": (float(rtn_pct) if rtn_pct is not None else np.nan), } if fwd_pct is not None: row["fwdPacketLoss"] = float(fwd_pct) if rtn_pct is not None: row["rtnPacketLoss"] = float(rtn_pct) out.append(row) # advance state to CURRENT counters for next record prev["fwd_sent"], prev["fwd_loss"] = fs, fl prev["rtn_sent"], prev["rtn_loss"] = rs, rl # save ONE tuple back self.state.update((prev["fwd_sent"], prev["fwd_loss"], prev["rtn_sent"], prev["rtn_loss"])) yield pd.DataFrame(out) if out else pd.DataFrame( columns=["OId","time_trunc","fwdPacketLoss","rtnPacketLoss"] ) 
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.