from observee_agents import chat_with_tools_stream import asyncio # Define custom tool handler async def custom_tool_handler(tool_name: str, tool_input: dict) -> str: """Handle custom tool executions""" if tool_name == "add_numbers": return str(tool_input.get("a", 0) + tool_input.get("b", 0)) elif tool_name == "get_time": from datetime import datetime return datetime.now().strftime("%I:%M %p") else: return f"Unknown tool: {tool_name}" # Define custom tools in OpenAI format custom_tools = [ { "type": "function", "function": { "name": "add_numbers", "description": "Add two numbers together", "parameters": { "type": "object", "properties": { "a": {"type": "number", "description": "First number"}, "b": {"type": "number", "description": "Second number"} }, "required": ["a", "b"] } } }, { "type": "function", "function": { "name": "get_time", "description": "Get the current time", "parameters": { "type": "object", "properties": {} } } } ] # Use custom tools async def example(): async for chunk in chat_with_tools_stream( message="What's 5 + 3? Also, what time is it?", provider="anthropic", custom_tools=custom_tools, custom_tool_handler=custom_tool_handler, observee_api_key="obs_your_key_here" ): if chunk["type"] == "content": print(chunk["content"], end="", flush=True) elif chunk["type"] == "tool_result": print(f"\n🔧 [Tool: {chunk['tool_name']} = {chunk['result']}]") asyncio.run(example())