Skip to content

Conversation

@cnweaver
Copy link

Currently error_cb attempts to handle the case of the callback raising a python error, but this handling breaks down when the callback is invoked multiple times. The problem is that since raising an error simply consists of setting the error state in the interpreter, if this is not cleared the interpreter will treat every subsequent return from C code back to python as having raised an error, which is itself an error if that code (not having actually raised an error) attempts to return a value (including None to indicate successful completion).

The error callback may be invoked multiple times by functions like Consumer_consume when retries are performed due to network issues. Given a callback of the form:

def handle_error(kafka_error: confluent_kafka.KafkaError):	logger.warning(f"internal kafka error: {kafka_error}")	if some_condition:	raise SomeError 

Once some_condition has caused SomeError to be raised, subsequent invocations of the callback will fail in ways likely surprising to the user: A SystemError complaining that logger.warning "returned a result with an error set". (logger.warning could be any code which ends up calling into a C implmentation, including print.) Although this error will chain back to the original SomeError as the cause, it alters the control flow of the subsequent callback invocations, potentially bypasing logic that was important to the user. Additionally, the SystemErrors obscure the original error type, so that attempting to handle it in the obvious way (try. . . except SomeError) will not work.

The proposed solution is that confluent-kafka, instead of leaving the error state set after the callback to propagate when Consumer_consume, et al. return, should record any error directly after the callback completes, unset the error state, and then restore any error state before returning to python. CallState_crash and CallState_end seem to be well-positioned to do this. One possible source of complexity is that repeated invocations of the callback may raise multiple errors. The approach taken in this patch is simply to preserve and propagate the last error raised. It would certainly be possile to preserve all errors, but it does not seem obvious how to usefully communicate multiple errors back to the user.

@ghost
Copy link

ghost commented Jun 21, 2021

It looks like @cnweaver hasn't signed our Contributor License Agreement, yet.

The purpose of a CLA is to ensure that the guardian of a project's outputs has the necessary ownership or grants of rights over all contributions to allow them to distribute under the chosen licence.
Wikipedia

You can read and sign our full Contributor License Agreement here.

Once you've signed reply with [clabot:check] to prove it.

Appreciation of efforts,

clabot

@cnweaver
Copy link
Author

[clabot:check]

@ghost
Copy link

ghost commented Jun 21, 2021

@confluentinc It looks like @cnweaver just signed our Contributor License Agreement. 👍

Always at your service,

clabot

@spenczar
Copy link

I think this fixes #729 and #865.

@austenwho
Copy link

austenwho commented Jul 23, 2021

@cnweaver I'm hopeful that your fix might solve my issue as well.

At first I believed this to be a race condition where the TopicPartitions list was being appended to while I was in the process of reading from the list, I'd get the following SystemError:

Got system error trying to reference partition object: SystemError("<method 'append' of 'list' objects> returned a result with an error set")

Note: My code was not altering the list at all, just iterating over the list of partitions to create a log set of what partitions were being assigned/revoked.

Wrapping the iteration over the TopicPartitions list in a try/except SystemError loop kludged the way through this issue, and I attempted to mitigate the issue by adding a pause in execution of one second with sleep(1), which seemed to work briefly (this is an intermittent issue that is difficult to reproduce, unfortunately), I then started getting the following SystemError instead:

SystemError: <built-in function sleep> returned a result with an error set

It seems that every step I take to mitigate this issue just results in SystemError being produced somewhere else and leaking through...

@cnweaver
Copy link
Author

It sounds possible, although I think I would need to better understand exactly how your code was structured to really be certain. Was the loop over the list located inside your callback, or somewhere else and to some degree concurrent with the callback?

@austenwho
Copy link

@cnweaver Actually, I think it was myself that needed to fully understand your explanation of what was going on. You were right that the subsequent raising of SystemError is very confusing and misleading.

In my callbacks I was using an exception as control flow (a thing I don't particularly like, but is Pythonic) to handle resetting poller state when assignments/revocations happened. I misunderstood at first what you meant about raising an exception and causing an exception. I was doing the former, intentionally. I was not doing the latter, and that's where my confusion was.

I was able to fix my problem, specifically, by moving the exception I was raising inside the callback to a flag that would be checked outside the callback's stack, and then raised. So far this seems to work and never taints the Cimpl for subsequent calls to these callbacks.

My solution, which seems to be working without issue so far (all 30 minutes or so):

class MyClass: callback_state: PollerException = None def on_assign(consumer: Consumer, partitions: List[TopicPartitions]): # do stuff with partitions # Here I used to simply raise DerivedPollerException(), which was the issue self.callback_state = DerivedPollerException("...") def check_callback_state(): if self.callback_state: state = self.callback_state self.callback_state = None raise state def poll(): while True: self.check_callback_state() # do normal stuff
@mjuric
Copy link

mjuric commented Jul 28, 2021

Hi, @cnweaver -- is @austenwho's proposed workaround sufficient?

@confluentinc, is there any way to help test or review this PR so it could potentially get merged?

@cnweaver
Copy link
Author

@austenwho 's solution is a viable alternative, I think. If it is to be adopted, though, CallState_crash should probably still be updated to swallow errors if any are raised, and documentation should be updated to inform users that errors raised from their callbacks will not be propagated, so they should avoid raising intentionally (and instead use a pattern like @austenwho has outlined).

@edenhill
Copy link
Contributor

Huhm, this shouldn't be happening as all the callback handlers will call rd_kafka_yield() if an exception was raised. The yield will make librdkafka return immediately from whatever poll() call it is running, which in turn should return back to Python and propagate the exception to the user.

@cla-assistant
Copy link

cla-assistant bot commented Aug 15, 2023

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

5 participants