@@ -29,6 +29,7 @@ class FunctionInfo(typing.NamedTuple):
2929 outputs : typing .Set [str ]
3030 requires_context : bool
3131 is_async : bool
32+ has_return : bool
3233
3334
3435class DispatcherMeta (type ):
@@ -74,6 +75,8 @@ def __init__(self, loop, host, port, worker_id, request_id,
7475 self ._grpc_thread = threading .Thread (
7576 name = 'grpc-thread' , target = self .__poll_grpc )
7677
78+ self ._logger = logging .getLogger ('python-azure-worker' )
79+
7780 @classmethod
7881 async def connect (cls , host , port , worker_id , request_id ,
7982 connect_timeout ):
@@ -184,14 +187,25 @@ def _register_function(self, function_id: str, func: callable,
184187 bindings = {}
185188 return_binding = None
186189 for name , desc in metadata .bindings .items ():
190+ if desc .direction == protos .BindingInfo .inout :
191+ raise TypeError (
192+ f'cannot load the { func_name } function: '
193+ f'"inout" bindings are not supported' )
194+
187195 if name == '$return' :
188196 # TODO:
189197 # * add proper gRPC->Python type reflection;
190198 # * convert the type from function.json to a Python type;
191199 # * enforce return type of a function call in Python;
192200 # * use the return type information to marshal the result into
193201 # a correct gRPC type.
194- return_binding = desc # NoQA
202+
203+ if desc .direction != protos .BindingInfo .out :
204+ raise TypeError (
205+ f'cannot load the { func_name } function: '
206+ f'"$return" binding must have direction set to "out"' )
207+
208+ return_binding = desc
195209 else :
196210 bindings [name ] = desc
197211
@@ -251,14 +265,19 @@ def _register_function(self, function_id: str, func: callable,
251265 directory = metadata .directory ,
252266 outputs = frozenset (outputs ),
253267 requires_context = requires_context ,
254- is_async = inspect .iscoroutinefunction (func ))
268+ is_async = inspect .iscoroutinefunction (func ),
269+ has_return = return_binding is not None )
255270
256271 async def _dispatch_grpc_request (self , request ):
257272 content_type = request .WhichOneof ('content' )
258273 request_handler = getattr (self , f'_handle__{ content_type } ' , None )
259274 if request_handler is None :
260- raise RuntimeError (
275+ # Don't crash on unknown messages. Some of them can be ignored;
276+ # and if something goes really wrong the host can always just
277+ # kill the worker's process.
278+ self ._logger .error (
261279 f'unknown StreamingMessage content type { content_type } ' )
280+ return
262281
263282 resp = await request_handler (request )
264283 self ._grpc_resp_queue .put_nowait (resp )
@@ -349,11 +368,15 @@ async def _handle__invocation_request(self, req):
349368 name = name ,
350369 data = rpc_val ))
351370
371+ return_value = None
372+ if fi .has_return :
373+ return_value = rpc_types .to_outgoing_proto (call_result )
374+
352375 return protos .StreamingMessage (
353376 request_id = self .request_id ,
354377 invocation_response = protos .InvocationResponse (
355378 invocation_id = invocation_id ,
356- return_value = rpc_types . to_outgoing_proto ( call_result ) ,
379+ return_value = return_value ,
357380 result = protos .StatusResult (
358381 status = protos .StatusResult .Success ),
359382 output_data = output_data ))
0 commit comments