33# Author: Alex Kiselev <a.kiselev@volz-servos.com>, Pavel Kirienko <pavel@opencyphal.org>
44
55from __future__ import annotations
6+ import queue
67import time
78import typing
89import asyncio
910import logging
1011import threading
11- import functools
12+ from functools import partial
1213import dataclasses
1314import collections
14- import concurrent .futures
1515import warnings
1616
1717import can
2222_logger = logging .getLogger (__name__ )
2323
2424
25+ @dataclasses .dataclass (frozen = True )
26+ class _TxItem :
27+ msg : can .Message
28+ timeout : float
29+ future : asyncio .Future [None ]
30+ loop : asyncio .AbstractEventLoop
31+
32+
2533@dataclasses .dataclass (frozen = True )
2634class PythonCANBusOptions :
2735 hardware_loopback : bool = False
@@ -188,7 +196,9 @@ def __init__(
188196 self ._closed = False
189197 self ._maybe_thread : typing .Optional [threading .Thread ] = None
190198 self ._rx_handler : typing .Optional [Media .ReceivedFramesHandler ] = None
191- self ._background_executor = concurrent .futures .ThreadPoolExecutor (max_workers = 1 )
199+ # This is for communication with a thread that handles the call to _bus.send
200+ self ._tx_queue : queue .Queue [_TxItem | None ] = queue .Queue ()
201+ self ._tx_thread = threading .Thread (target = self .transmit_thread_worker , daemon = True )
192202
193203 params : typing .Union [_FDInterfaceParameters , _ClassicInterfaceParameters ]
194204 if self ._is_fd :
@@ -231,6 +241,7 @@ def is_fd(self) -> bool:
231241 return self ._is_fd
232242
233243 def start (self , handler : Media .ReceivedFramesHandler , no_automatic_retransmission : bool ) -> None :
244+ self ._tx_thread .start ()
234245 if self ._maybe_thread is None :
235246 self ._rx_handler = handler
236247 self ._maybe_thread = threading .Thread (
@@ -254,6 +265,24 @@ def configure_acceptance_filters(self, configuration: typing.Sequence[FilterConf
254265 _logger .debug ("%s: Acceptance filters activated: %s" , self , ", " .join (map (str , configuration )))
255266 self ._bus .set_filters (filters )
256267
268+ def transmit_thread_worker (self ) -> None :
269+ try :
270+ while not self ._closed :
271+ tx = self ._tx_queue .get (block = True )
272+ if self ._closed or tx is None :
273+ break
274+ try :
275+ self ._bus .send (tx .msg , tx .timeout )
276+ tx .loop .call_soon_threadsafe (partial (tx .future .set_result , None ))
277+ except Exception as ex :
278+ tx .loop .call_soon_threadsafe (partial (tx .future .set_exception , ex ))
279+ except Exception as ex :
280+ _logger .critical (
281+ "Unhandled exception in transmit thread, transmission thread stopped and transmission is no longer possible: %s" ,
282+ ex ,
283+ exc_info = True ,
284+ )
285+
257286 async def send (self , frames : typing .Iterable [Envelope ], monotonic_deadline : float ) -> int :
258287 num_sent = 0
259288 loopback : typing .List [typing .Tuple [Timestamp , Envelope ]] = []
@@ -269,10 +298,16 @@ async def send(self, frames: typing.Iterable[Envelope], monotonic_deadline: floa
269298 )
270299 try :
271300 desired_timeout = monotonic_deadline - loop .time ()
272- await loop .run_in_executor (
273- self ._background_executor ,
274- functools .partial (self ._bus .send , message , timeout = max (desired_timeout , 0 )),
301+ received_future : asyncio .Future [None ] = asyncio .Future ()
302+ self ._tx_queue .put_nowait (
303+ _TxItem (
304+ message ,
305+ max (desired_timeout , 0 ),
306+ received_future ,
307+ asyncio .get_running_loop (),
308+ )
275309 )
310+ await received_future
276311 except (asyncio .TimeoutError , can .CanError ): # CanError is also used to report timeouts (weird).
277312 break
278313 else :
@@ -287,6 +322,8 @@ async def send(self, frames: typing.Iterable[Envelope], monotonic_deadline: floa
287322 def close (self ) -> None :
288323 self ._closed = True
289324 try :
325+ self ._tx_queue .put (None )
326+ self ._tx_thread .join (timeout = self ._MAXIMAL_TIMEOUT_SEC * 10 )
290327 if self ._maybe_thread is not None :
291328 self ._maybe_thread .join (timeout = self ._MAXIMAL_TIMEOUT_SEC * 10 )
292329 self ._maybe_thread = None
0 commit comments