- Notifications
You must be signed in to change notification settings - Fork 933
Unset and reset error state to avoid triggering SystemErrors #1146
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
| It looks like @cnweaver hasn't signed our Contributor License Agreement, yet.
You can read and sign our full Contributor License Agreement here. Once you've signed reply with Appreciation of efforts, clabot |
| [clabot:check] |
| @confluentinc It looks like @cnweaver just signed our Contributor License Agreement. 👍 Always at your service, clabot |
| @cnweaver I'm hopeful that your fix might solve my issue as well. At first I believed this to be a race condition where the TopicPartitions list was being appended to while I was in the process of reading from the list, I'd get the following SystemError:
Note: My code was not altering the list at all, just iterating over the list of partitions to create a log set of what partitions were being assigned/revoked. Wrapping the iteration over the TopicPartitions list in a try/except SystemError loop kludged the way through this issue, and I attempted to mitigate the issue by adding a pause in execution of one second with sleep(1), which seemed to work briefly (this is an intermittent issue that is difficult to reproduce, unfortunately), I then started getting the following SystemError instead:
It seems that every step I take to mitigate this issue just results in SystemError being produced somewhere else and leaking through... |
| It sounds possible, although I think I would need to better understand exactly how your code was structured to really be certain. Was the loop over the list located inside your callback, or somewhere else and to some degree concurrent with the callback? |
| @cnweaver Actually, I think it was myself that needed to fully understand your explanation of what was going on. You were right that the subsequent raising of SystemError is very confusing and misleading. In my callbacks I was using an exception as control flow (a thing I don't particularly like, but is Pythonic) to handle resetting poller state when assignments/revocations happened. I misunderstood at first what you meant about raising an exception and causing an exception. I was doing the former, intentionally. I was not doing the latter, and that's where my confusion was. I was able to fix my problem, specifically, by moving the exception I was raising inside the callback to a flag that would be checked outside the callback's stack, and then raised. So far this seems to work and never taints the Cimpl for subsequent calls to these callbacks. My solution, which seems to be working without issue so far (all 30 minutes or so): class MyClass: callback_state: PollerException = None def on_assign(consumer: Consumer, partitions: List[TopicPartitions]): # do stuff with partitions # Here I used to simply raise DerivedPollerException(), which was the issue self.callback_state = DerivedPollerException("...") def check_callback_state(): if self.callback_state: state = self.callback_state self.callback_state = None raise state def poll(): while True: self.check_callback_state() # do normal stuff |
| Hi, @cnweaver -- is @austenwho's proposed workaround sufficient? @confluentinc, is there any way to help test or review this PR so it could potentially get merged? |
| @austenwho 's solution is a viable alternative, I think. If it is to be adopted, though, |
| Huhm, this shouldn't be happening as all the callback handlers will call |
|
|
Currently
error_cbattempts to handle the case of the callback raising a python error, but this handling breaks down when the callback is invoked multiple times. The problem is that since raising an error simply consists of setting the error state in the interpreter, if this is not cleared the interpreter will treat every subsequent return from C code back to python as having raised an error, which is itself an error if that code (not having actually raised an error) attempts to return a value (includingNoneto indicate successful completion).The error callback may be invoked multiple times by functions like
Consumer_consumewhen retries are performed due to network issues. Given a callback of the form:Once
some_conditionhas causedSomeErrorto be raised, subsequent invocations of the callback will fail in ways likely surprising to the user: ASystemErrorcomplaining that logger.warning "returned a result with an error set". (logger.warningcould be any code which ends up calling into a C implmentation, includingprint.) Although this error will chain back to the originalSomeErroras the cause, it alters the control flow of the subsequent callback invocations, potentially bypasing logic that was important to the user. Additionally, theSystemErrors obscure the original error type, so that attempting to handle it in the obvious way (try. . . except SomeError) will not work.The proposed solution is that confluent-kafka, instead of leaving the error state set after the callback to propagate when
Consumer_consume, et al. return, should record any error directly after the callback completes, unset the error state, and then restore any error state before returning to python.CallState_crashandCallState_endseem to be well-positioned to do this. One possible source of complexity is that repeated invocations of the callback may raise multiple errors. The approach taken in this patch is simply to preserve and propagate the last error raised. It would certainly be possile to preserve all errors, but it does not seem obvious how to usefully communicate multiple errors back to the user.