- Notifications
You must be signed in to change notification settings - Fork 89
fix: remove unrecoverable connection from connection pool during multiplexing #1967
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
5a63d95 5a13302 0297204 8a81ad3 68fd040 3106dae 5bf04e5 2fc7551 7a6d919 3ba7659 f379a78 9307776 de73013 19005a1 c5d14ba 644360a 3099d82 e707dd6 9e7a8fa 31f1755 44c36fc 87a4036 c92ea1b 019520c 47893df 8bd4e6a 83409b0 f7dd72d a48399f 6789bc9 46b4e6c dfd4dd9 d68ae70 2983fe9 d406256 22e9e07 fdb4e1c 0469474 d1b7740 e4cd529 74ff1c4 762f49e de456c2 c2f6edc 2487227 084d6d1 89c9701 8441518 d249add 83aa7ff 92a9c36 a713a52 a042d5c 53f4ec8 c494d8b 14b0c12 0da0e4b 33d23ac d2ee46e be6646e 62d8c41 adf5f3f c1970ff 3488df8 6a512e8 05edc2f 7d3da74 ecf6807 057dab9 5db46a2 32e9d33 183941a f93f89e e2ec7e7 1be6ab4 544f063 3cf5482 d9da007 4626857 File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| | @@ -234,9 +234,17 @@ ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows, | |
| streamWriter, | ||
| (key, existingStream) -> { | ||
| // Stick to the existing stream if it's not overwhelmed. | ||
| if (existingStream != null && !existingStream.getLoad().isOverwhelmed()) { | ||
| if (existingStream != null | ||
| && !existingStream.getLoad().isOverwhelmed() | ||
| && !existingStream.isConnectionInUnrecoverableState()) { | ||
| return existingStream; | ||
| } | ||
| if (existingStream != null && existingStream.isConnectionInUnrecoverableState()) { | ||
| existingStream = null; | ||
| } | ||
| // Before search for the next connection to attach, clear the finalized connections | ||
| // first so that they will not be selected. | ||
| clearFinalizedConnectionWorker(); | ||
| Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need a global remove, is it OK just to have a local remove? Contributor Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason is we want to prevent the search algorithm to hit the connection with unrecoverable state? Contributor Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The global remove is super quick as it won't need close | ||
| // Try to create or find another existing stream to reuse. | ||
| ConnectionWorker createdOrExistingConnection = null; | ||
| try { | ||
| | @@ -299,7 +307,6 @@ private ConnectionWorker createOrReuseConnectionWorker( | |
| } | ||
| return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getProtoSchema()); | ||
| } else { | ||
| | ||
| // Stick to the original connection if all the connections are overwhelmed. | ||
| if (existingConnectionWorker != null) { | ||
| return existingConnectionWorker; | ||
| | @@ -310,6 +317,18 @@ private ConnectionWorker createOrReuseConnectionWorker( | |
| } | ||
| } | ||
| | ||
| private void clearFinalizedConnectionWorker() { | ||
| Set<ConnectionWorker> connectionWorkerSet = new HashSet<>(); | ||
| for (ConnectionWorker existingWorker : connectionWorkerPool) { | ||
| if (existingWorker.isConnectionInUnrecoverableState()) { | ||
| connectionWorkerSet.add(existingWorker); | ||
| } | ||
| } | ||
| for (ConnectionWorker workerToRemove : connectionWorkerSet) { | ||
| connectionWorkerPool.remove(workerToRemove); | ||
| } | ||
| } | ||
| | ||
| /** Select out the best connection worker among the given connection workers. */ | ||
| static ConnectionWorker pickBestLoadConnection( | ||
| Comparator<Load> comparator, List<ConnectionWorker> connectionWorkerList) { | ||
| | ||
Uh oh!
There was an error while loading. Please reload this page.