How to change the behavior of a task cancellation from where the task is being cancelled?
What I would dream of:
task = ensure_future(foo()) def foo_done(task) try: return task.get_result() except CancelError as e: when, why = e.args if when == "now" # do something... elif when == "asap": # do something else... else: # do default print(f"task cancelled because {why}") task.add_done_callback(foo_done) [...] task.cancel("now", "This is an order!") I could attach an object to the task before calling task.cancel(), and inspect it later.
task = ensure_future(foo()) def foo_done(task) try: return task.get_result() except CancelError as e: when = getattr(task, "_when", "") why = getattr(task, "_why", "") if when == "now" # do something... elif when == "asap": # do something else... else: # do default print(f"task cancelled because {why}") task.add_done_callback(foo_done) [...] task._when = "now" task._why = "This is an order!" task.cancel() But it looks clunky in some situation, when I want to capture the CancelError within the task being process for example:
async def foo(): # some stuff try: # some other stuff except CancellError as e: # here I have easily access to the error, but not the task :( [...] I'm looking for a more Pythonic way to do it.
foo_doneis a synchronous function.