3737# Library worker import reloaded in init and reload request
3838_library_worker = None
3939
40+ # Thread-local invocation ID registry for efficient lookup
41+ _thread_invocation_registry : typing .Dict [int , str ] = {}
42+ _registry_lock = threading .Lock ()
43+
44+ # Global current invocation tracker (as a fallback)
45+ _current_invocation_id : Optional [str ] = None
46+ _current_invocation_lock = threading .Lock ()
47+
4048
4149class ContextEnabledTask (asyncio .Task ):
4250 AZURE_INVOCATION_ID = '__azure_function_invocation_id__'
@@ -58,16 +66,63 @@ def set_azure_invocation_id(self, invocation_id: str) -> None:
5866_invocation_id_local = threading .local ()
5967
6068
69+ def set_thread_invocation_id (thread_id : int , invocation_id : str ) -> None :
70+ """Set the invocation ID for a specific thread"""
71+ with _registry_lock :
72+ _thread_invocation_registry [thread_id ] = invocation_id
73+
74+
75+ def clear_thread_invocation_id (thread_id : int ) -> None :
76+ """Clear the invocation ID for a specific thread"""
77+ with _registry_lock :
78+ _thread_invocation_registry .pop (thread_id , None )
79+
80+
81+ def get_thread_invocation_id (thread_id : int ) -> Optional [str ]:
82+ """Get the invocation ID for a specific thread"""
83+ with _registry_lock :
84+ return _thread_invocation_registry .get (thread_id )
85+
86+
87+ def set_current_invocation_id (invocation_id : str ) -> None :
88+ """Set the global current invocation ID"""
89+ global _current_invocation_id
90+ with _current_invocation_lock :
91+ _current_invocation_id = invocation_id
92+
93+
94+ def get_global_current_invocation_id () -> Optional [str ]:
95+ """Get the global current invocation ID"""
96+ with _current_invocation_lock :
97+ return _current_invocation_id
98+
99+
61100def get_current_invocation_id () -> Optional [Any ]:
62- loop = asyncio ._get_running_loop ()
63- if loop is not None :
64- current_task = asyncio .current_task (loop )
65- if current_task is not None :
66- task_invocation_id = getattr (current_task ,
67- ContextEnabledTask .AZURE_INVOCATION_ID ,
68- None )
69- if task_invocation_id is not None :
70- return task_invocation_id
101+ # Check global current invocation first (most up-to-date)
102+ global_invocation_id = get_global_current_invocation_id ()
103+ if global_invocation_id is not None :
104+ return global_invocation_id
105+
106+ # Check asyncio task context
107+ try :
108+ loop = asyncio ._get_running_loop ()
109+ if loop is not None :
110+ current_task = asyncio .current_task (loop )
111+ if current_task is not None :
112+ task_invocation_id = getattr (current_task ,
113+ ContextEnabledTask .AZURE_INVOCATION_ID ,
114+ None )
115+ if task_invocation_id is not None :
116+ return task_invocation_id
117+ except RuntimeError :
118+ # No event loop running
119+ pass
120+
121+ # Check the thread-local invocation ID registry
122+ current_thread_id = threading .get_ident ()
123+ thread_invocation_id = get_thread_invocation_id (current_thread_id )
124+ if thread_invocation_id is not None :
125+ return thread_invocation_id
71126
72127 return getattr (_invocation_id_local , 'invocation_id' , None )
73128
@@ -493,13 +548,30 @@ async def _handle__invocation_request(self, request):
493548 'invocation_id: %s, worker_id: %s' ,
494549 self .request_id , function_id , invocation_id , self .worker_id )
495550
496- invocation_request = WorkerRequest (
497- name = "FunctionInvocationRequest" ,
498- request = request )
499- invocation_response = await (
500- _library_worker .invocation_request ( # type: ignore[union-attr]
501- invocation_request ))
551+ # Set the global current invocation ID first (for all threads to access)
552+ set_current_invocation_id (invocation_id )
502553
503- return protos .StreamingMessage (
504- request_id = self .request_id ,
505- invocation_response = invocation_response )
554+ # Set the current `invocation_id` to the current task so
555+ # that our logging handler can find it.
556+ current_task = asyncio .current_task ()
557+ if current_task is not None and isinstance (current_task , ContextEnabledTask ):
558+ current_task .set_azure_invocation_id (invocation_id )
559+
560+ # Register the invocation ID for the current thread
561+ current_thread_id = threading .get_ident ()
562+ set_thread_invocation_id (current_thread_id , invocation_id )
563+
564+ try :
565+ invocation_request = WorkerRequest (name = "FunctionInvocationRequest" ,
566+ request = request )
567+ invocation_response = await (
568+ _library_worker .invocation_request ( # type: ignore[union-attr]
569+ invocation_request ))
570+
571+ return protos .StreamingMessage (
572+ request_id = self .request_id ,
573+ invocation_response = invocation_response )
574+ except Exception :
575+ # Clear thread registry on exception to prevent stale IDs
576+ clear_thread_invocation_id (current_thread_id )
577+ raise
0 commit comments