Skip to content

Commit aef2422

Browse files
jk4235buttercannfly
authored andcommitted
feat(chat-adapter): implement tool call argument streaming events and enhance tool call handling
1 parent 5f3fff4 commit aef2422

File tree

6 files changed

+362
-16
lines changed

6 files changed

+362
-16
lines changed

packages/aipex-react/src/adapters/chat-adapter.test.ts

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,73 @@ describe("ChatAdapter", () => {
205205
expect(adapter.getStatus()).toBe("streaming");
206206
});
207207

208+
it("should create a pending tool call on tool_call_args_streaming_start", () => {
209+
adapter.processEvent({
210+
type: "tool_call_args_streaming_start",
211+
toolName: "search",
212+
});
213+
214+
const messages = adapter.getMessages();
215+
expect(messages).toHaveLength(1);
216+
expect(messages[0]).toMatchObject({ role: "assistant" });
217+
218+
const toolPart = messages[0]?.parts.find((p) => p.type === "tool");
219+
expect(toolPart).toMatchObject({
220+
toolName: "search",
221+
state: "pending",
222+
input: {},
223+
});
224+
expect(adapter.getStatus()).toBe("streaming");
225+
});
226+
227+
it("should update pending tool params on tool_call_args_streaming_complete", () => {
228+
adapter.processEvent({
229+
type: "tool_call_args_streaming_start",
230+
toolName: "search",
231+
});
232+
adapter.processEvent({
233+
type: "tool_call_args_streaming_complete",
234+
toolName: "search",
235+
params: { q: "ts" },
236+
});
237+
238+
const toolPart = adapter
239+
.getMessages()[0]
240+
?.parts.find((p) => p.type === "tool");
241+
expect(toolPart).toMatchObject({
242+
toolName: "search",
243+
state: "pending",
244+
input: { q: "ts" },
245+
});
246+
});
247+
248+
it("should not duplicate tool parts when tool_call_start follows tool args streaming events", () => {
249+
adapter.processEvent({
250+
type: "tool_call_args_streaming_start",
251+
toolName: "search",
252+
});
253+
adapter.processEvent({
254+
type: "tool_call_args_streaming_complete",
255+
toolName: "search",
256+
params: { q: "ts" },
257+
});
258+
adapter.processEvent({
259+
type: "tool_call_start",
260+
toolName: "search",
261+
params: { q: "ts" },
262+
});
263+
264+
const toolParts =
265+
adapter.getMessages()[0]?.parts.filter((p) => p.type === "tool") ?? [];
266+
expect(toolParts).toHaveLength(1);
267+
expect(toolParts[0]).toMatchObject({
268+
toolName: "search",
269+
state: "executing",
270+
input: { q: "ts" },
271+
});
272+
expect(adapter.getStatus()).toBe("executing_tools");
273+
});
274+
208275
it("should append to existing assistant message", () => {
209276
adapter.processEvent({ type: "content_delta", delta: "Hello" });
210277
adapter.processEvent({ type: "content_delta", delta: " world" });

packages/aipex-react/src/adapters/chat-adapter.ts

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,22 @@ export class ChatAdapter {
139139
this.appendContentDelta(event.delta);
140140
break;
141141

142+
case "tool_call_args_streaming_start":
143+
this.ensureAssistantMessage();
144+
this.ensurePendingToolCall(event.toolName, {});
145+
this.updateStatus("streaming");
146+
break;
147+
148+
case "tool_call_args_streaming_complete":
149+
this.ensureAssistantMessage();
150+
this.ensurePendingToolCall(event.toolName, event.params);
151+
break;
152+
142153
case "tool_call_start":
143154
this.ensureAssistantMessage();
144-
this.addToolCall(event.toolName, event.params);
155+
if (!this.startExistingToolCall(event.toolName, event.params)) {
156+
this.addToolCall(event.toolName, event.params);
157+
}
145158
this.updateStatus("executing_tools");
146159
break;
147160

@@ -304,6 +317,81 @@ export class ChatAdapter {
304317
this.toolsAddedSinceLastText = true;
305318
}
306319

320+
private ensurePendingToolCall(toolName: string, params: unknown): void {
321+
const existingCallId = this.findPendingToolCallId(toolName);
322+
if (existingCallId) {
323+
this.updateToolPart(existingCallId, (toolPart) => ({
324+
...toolPart,
325+
toolName,
326+
input: params,
327+
}));
328+
return;
329+
}
330+
331+
const callId = this.queueToolCall(toolName);
332+
this.updateCurrentAssistantMessage((message) => {
333+
const parts = [...message.parts];
334+
335+
const toolPart: UIToolPart = {
336+
type: "tool",
337+
toolCallId: callId,
338+
toolName,
339+
input: params,
340+
state: "pending",
341+
};
342+
343+
parts.push(toolPart);
344+
345+
return { ...message, parts };
346+
});
347+
348+
this.toolsAddedSinceLastText = true;
349+
}
350+
351+
private startExistingToolCall(toolName: string, params: unknown): boolean {
352+
const callId = this.findPendingToolCallId(toolName);
353+
if (!callId) {
354+
return false;
355+
}
356+
357+
this.updateToolPart(callId, (toolPart) => ({
358+
...toolPart,
359+
toolName,
360+
input: params,
361+
state: "executing",
362+
}));
363+
this.toolsAddedSinceLastText = true;
364+
return true;
365+
}
366+
367+
private findPendingToolCallId(toolName: string): string | undefined {
368+
const queue = this.pendingToolCalls.get(toolName);
369+
if (!queue || queue.length === 0) {
370+
return undefined;
371+
}
372+
373+
const currentId = this.state.currentAssistantMessageId;
374+
if (!currentId) {
375+
return undefined;
376+
}
377+
const message = this.state.messages.find((m) => m.id === currentId);
378+
if (!message) {
379+
return undefined;
380+
}
381+
382+
for (const callId of queue) {
383+
const toolPart = message.parts.find(
384+
(part): part is UIToolPart =>
385+
part.type === "tool" && part.toolCallId === callId,
386+
);
387+
if (toolPart?.state === "pending") {
388+
return callId;
389+
}
390+
}
391+
392+
return undefined;
393+
}
394+
307395
private updateToolComplete(toolName: string, result: unknown): void {
308396
const callId = this.dequeueToolCall(toolName);
309397
if (!callId) {

packages/aipex-react/src/components/chatbot/tools.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ ${typeof output === "string" ? output : JSON.stringify(output, null, 2)}
1818
export function mapToolState(state: UIToolPart["state"]): ToolComponentState {
1919
switch (state) {
2020
case "pending":
21-
return "input-available";
21+
return "input-streaming";
2222
case "executing":
2323
return "executing";
2424
case "completed":

packages/core/src/agent/aipex.test.ts

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,114 @@ describe("AIPex", () => {
555555
});
556556

557557
describe("tools and errors", () => {
558+
it("should emit tool_call_args_streaming_complete before tool_call_start", async () => {
559+
vi.mocked(run).mockResolvedValue(
560+
createMockRunResult({
561+
finalOutput: "",
562+
streamEvents: [
563+
{
564+
type: "run_item_stream_event",
565+
name: "tool_called",
566+
item: { rawItem: { name: "calculator", arguments: '{"a":1}' } },
567+
},
568+
],
569+
}),
570+
);
571+
572+
const agent = AIPex.create({
573+
instructions: "Tools",
574+
model: mockModel,
575+
});
576+
577+
const events: AgentEvent[] = [];
578+
for await (const event of agent.chat("use tool")) {
579+
events.push(event);
580+
}
581+
582+
const argsCompleteIndex = events.findIndex(
583+
(event) => event.type === "tool_call_args_streaming_complete",
584+
);
585+
const toolStartIndex = events.findIndex(
586+
(event) => event.type === "tool_call_start",
587+
);
588+
589+
expect(argsCompleteIndex).toBeGreaterThanOrEqual(0);
590+
expect(toolStartIndex).toBeGreaterThanOrEqual(0);
591+
expect(argsCompleteIndex).toBeLessThan(toolStartIndex);
592+
593+
const argsComplete = events[argsCompleteIndex];
594+
const toolStart = events[toolStartIndex];
595+
596+
expect(argsComplete?.type).toBe("tool_call_args_streaming_complete");
597+
if (argsComplete?.type === "tool_call_args_streaming_complete") {
598+
expect(argsComplete.toolName).toBe("calculator");
599+
expect(argsComplete.params).toEqual({ a: 1 });
600+
}
601+
602+
expect(toolStart?.type).toBe("tool_call_start");
603+
if (toolStart?.type === "tool_call_start") {
604+
expect(toolStart.toolName).toBe("calculator");
605+
expect(toolStart.params).toEqual({ a: 1 });
606+
}
607+
});
608+
609+
it("should emit tool_call_args_streaming_start when tool args are streamed by the model", async () => {
610+
vi.mocked(run).mockResolvedValue(
611+
createMockRunResult({
612+
finalOutput: "",
613+
streamEvents: [
614+
{
615+
type: "raw_model_stream_event",
616+
data: {
617+
type: "model",
618+
event: {
619+
choices: [
620+
{
621+
delta: {
622+
tool_calls: [
623+
{
624+
index: 0,
625+
id: "call_1",
626+
function: {
627+
name: "calculator",
628+
arguments: '{"a":',
629+
},
630+
},
631+
],
632+
},
633+
},
634+
],
635+
},
636+
},
637+
},
638+
{
639+
type: "run_item_stream_event",
640+
name: "tool_called",
641+
item: { rawItem: { name: "calculator", arguments: '{"a":1}' } },
642+
},
643+
],
644+
}),
645+
);
646+
647+
const agent = AIPex.create({
648+
instructions: "Tools",
649+
model: mockModel,
650+
});
651+
652+
const events: AgentEvent[] = [];
653+
for await (const event of agent.chat("use tool")) {
654+
events.push(event);
655+
}
656+
657+
const argsStart = events.find(
658+
(event) => event.type === "tool_call_args_streaming_start",
659+
);
660+
expect(argsStart).toBeDefined();
661+
if (argsStart?.type === "tool_call_args_streaming_start") {
662+
expect(argsStart.toolName).toBe("calculator");
663+
}
664+
});
665+
558666
it("should emit tool lifecycle events", async () => {
559667
vi.mocked(run).mockResolvedValue(
560668
createMockRunResult({

0 commit comments

Comments
 (0)