Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 0 additions & 49 deletions driver-core/src/main/com/mongodb/internal/Locks.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.mongodb.internal.function.CheckedSupplier;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.StampedLock;
import java.util.function.Supplier;

Expand Down Expand Up @@ -94,54 +93,6 @@ public static void lockInterruptibly(final Lock lock) throws MongoInterruptedExc
}
}

/**
* See {@link #lockInterruptiblyUnfair(ReentrantLock)} before using this method.
*/
public static void withUnfairLock(final ReentrantLock lock, final Runnable action) {
withUnfairLock(lock, () -> {
action.run();
return null;
});
}

/**
* See {@link #lockInterruptiblyUnfair(ReentrantLock)} before using this method.
*/
public static <V> V withUnfairLock(final ReentrantLock lock, final Supplier<V> supplier) {
lockUnfair(lock);
try {
return supplier.get();
} finally {
lock.unlock();
}
}

private static void lockUnfair(
// The type must be `ReentrantLock`, not `Lock`,
// because only `ReentrantLock.tryLock` is documented to have the barging (unfair) behavior.
final ReentrantLock lock) {
if (!lock.tryLock()) {
lock.lock();
}
}

/**
* This method allows a thread to attempt acquiring the {@code lock} unfairly despite the {@code lock}
* being {@linkplain ReentrantLock#ReentrantLock(boolean) fair}. In most cases you should create an unfair lock,
* instead of using this method.
*/
public static void lockInterruptiblyUnfair(
// The type must be `ReentrantLock`, not `Lock`,
// because only `ReentrantLock.tryLock` is documented to have the barging (unfair) behavior.
final ReentrantLock lock) throws MongoInterruptedException {
if (Thread.currentThread().isInterrupted()) {
throw interruptAndCreateMongoInterruptedException(null, null);
}
if (!lock.tryLock()) {
lockInterruptibly(lock);
}
}

private Locks() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,16 @@
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.assertTrue;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.Locks.lockInterruptibly;
import static com.mongodb.internal.Locks.lockInterruptiblyUnfair;
import static com.mongodb.internal.Locks.withUnfairLock;
import static com.mongodb.internal.Locks.withLock;
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException;

Expand Down Expand Up @@ -321,48 +318,17 @@ private static final class StateAndPermits {
private volatile boolean closed;
private final int maxPermits;
private volatile int permits;
/** When there are not enough available permits to serve all threads requesting a permit, threads are queued and wait on
* {@link #permitAvailableOrClosedOrPausedCondition}. Because of this waiting, we want threads to acquire the lock fairly,
* to avoid a situation when some threads are sitting in the queue for a long time while others barge in and acquire
* the lock without waiting in the queue. Fair locking reduces high percentiles of {@link #acquirePermit(long, TimeUnit)} latencies
* but reduces its throughput: it makes latencies roughly equally high for everyone, while keeping them lower than the highest
* latencies with unfair locking. The fair approach is in accordance with the
* <a href="https://github.com/mongodb/specifications/blob/568093ce7f0e1394cf4952c417e1e7dacc5fef53/source/connection-monitoring-and-pooling/connection-monitoring-and-pooling.rst#waitqueue">
* connection pool specification</a>.
* <p>
* When there are enough available permits to serve all threads requesting a permit, threads still have to acquire the lock,
* and still are queued, but since they are not waiting on {@link #permitAvailableOrClosedOrPausedCondition},
* threads spend less time in the queue. This results in having smaller high percentiles
* of {@link #acquirePermit(long, TimeUnit)} latencies, and we do not want to sacrifice the throughput
* to further reduce the high percentiles by acquiring the lock fairly.</p>
* <p>
* While there is a chance that the expressed reasoning is flawed, it is supported by the results of experiments reported in
* comments in <a href="https://jira.mongodb.org/browse/JAVA-4452">JAVA-4452</a>.</p>
* <p>
* {@link ReentrantReadWriteLock#hasWaiters(Condition)} requires holding the lock to be called, therefore we cannot use it
* to discriminate between the two cases described above, and we use {@link #waitersEstimate} instead.
* This approach results in sometimes acquiring a lock unfairly when it should have been acquired fairly, and vice versa.
* But it appears to be a good enough compromise, that results in having enough throughput when there are enough
* available permits and tolerable high percentiles of latencies when there are not enough available permits.</p>
* <p>
* It may seem viable to use {@link #permits} > 0 as a way to decide that there are likely no waiters,
* but benchmarking shows that with this approach high percentiles of contended {@link #acquirePermit(long, TimeUnit)} latencies
* (when the number of threads that use the pool is higher than the maximum pool size) become similar to a situation when no
* fair locking is used. That is, this approach does not result in the behavior we want.</p>
*/
private final AtomicInteger waitersEstimate;
@Nullable
private Supplier<MongoException> causeSupplier;

StateAndPermits(final int maxPermits, final Supplier<MongoServerUnavailableException> poolClosedExceptionSupplier) {
this.poolClosedExceptionSupplier = poolClosedExceptionSupplier;
lock = new ReentrantLock(true);
lock = new ReentrantLock();
permitAvailableOrClosedOrPausedCondition = lock.newCondition();
paused = false;
closed = false;
this.maxPermits = maxPermits;
permits = maxPermits;
waitersEstimate = new AtomicInteger();
causeSupplier = null;
}

Expand All @@ -371,7 +337,7 @@ int permits() {
}

boolean acquirePermitImmediateUnfair() {
return withUnfairLock(lock, () -> {
return withLock(lock, () -> {
throwIfClosedOrPaused();
if (permits > 0) {
//noinspection NonAtomicOperationOnVolatileField
Expand All @@ -391,17 +357,12 @@ boolean acquirePermitImmediateUnfair() {
*/
boolean acquirePermit(final long timeout, final TimeUnit unit) throws MongoInterruptedException {
long remainingNanos = unit.toNanos(timeout);
if (waitersEstimate.get() == 0) {
lockInterruptiblyUnfair(lock);
} else {
lockInterruptibly(lock);
}
lockInterruptibly(lock);
try {
while (permits == 0
// the absence of short-circuiting is of importance
& !throwIfClosedOrPaused()) {
try {
waitersEstimate.incrementAndGet();
if (timeout < 0 || remainingNanos == Long.MAX_VALUE) {
permitAvailableOrClosedOrPausedCondition.await();
} else if (remainingNanos >= 0) {
Expand All @@ -411,8 +372,6 @@ boolean acquirePermit(final long timeout, final TimeUnit unit) throws MongoInter
}
} catch (InterruptedException e) {
throw interruptAndCreateMongoInterruptedException(null, e);
} finally {
waitersEstimate.decrementAndGet();
}
}
assertTrue(permits > 0);
Expand All @@ -425,7 +384,7 @@ boolean acquirePermit(final long timeout, final TimeUnit unit) throws MongoInter
}

void releasePermit() {
withUnfairLock(lock, () -> {
withLock(lock, () -> {
assertTrue(permits < maxPermits);
//noinspection NonAtomicOperationOnVolatileField
permits++;
Expand All @@ -434,7 +393,7 @@ void releasePermit() {
}

void pause(final Supplier<MongoException> causeSupplier) {
withUnfairLock(lock, () -> {
withLock(lock, () -> {
if (!paused) {
this.paused = true;
permitAvailableOrClosedOrPausedCondition.signalAll();
Expand All @@ -445,7 +404,7 @@ void pause(final Supplier<MongoException> causeSupplier) {

void ready() {
if (paused) {
withUnfairLock(lock, () -> {
withLock(lock, () -> {
this.paused = false;
this.causeSupplier = null;
});
Expand All @@ -457,7 +416,7 @@ void ready() {
*/
boolean close() {
if (!closed) {
return withUnfairLock(lock, () -> {
return withLock(lock, () -> {
if (!closed) {
closed = true;
permitAvailableOrClosedOrPausedCondition.signalAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@
import static com.mongodb.event.ConnectionClosedEvent.Reason.ERROR;
import static com.mongodb.internal.Locks.lockInterruptibly;
import static com.mongodb.internal.Locks.withLock;
import static com.mongodb.internal.Locks.withUnfairLock;
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
import static com.mongodb.internal.connection.ConcurrentPool.INFINITE_SIZE;
Expand Down Expand Up @@ -896,7 +895,7 @@ private final class OpenConcurrencyLimiter {
private final Deque<MutableReference<PooledConnection>> desiredConnectionSlots;

OpenConcurrencyLimiter(final int maxConnecting) {
lock = new ReentrantLock(true);
lock = new ReentrantLock(false);
permitAvailableOrHandedOverOrClosedOrPausedCondition = lock.newCondition();
maxPermits = maxConnecting;
permits = maxPermits;
Expand Down Expand Up @@ -1054,10 +1053,7 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole
* 2. Thread#2 checks in a connection. Tries to hand it over, but there are no threads desiring to get one.
* 3. Thread#1 executes the current code. Expresses the desire to get a connection via the hand-over mechanism,
* but thread#2 has already tried handing over and released its connection to the pool.
* As a result, thread#1 is waiting for a permit to open a connection despite one being available in the pool.
*
* This attempt should be unfair because the current thread (Thread#1) has already waited for its turn fairly.
* Waiting fairly again puts the current thread behind other threads, which is unfair to the current thread. */
* As a result, thread#1 is waiting for a permit to open a connection despite one being available in the pool. */
availableConnection = getPooledConnectionImmediateUnfair();
if (availableConnection != null) {
return availableConnection;
Expand Down Expand Up @@ -1093,7 +1089,7 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole
}

private void releasePermit() {
withUnfairLock(lock, () -> {
withLock(lock, () -> {
assertTrue(permits < maxPermits);
permits++;
permitAvailableOrHandedOverOrClosedOrPausedCondition.signal();
Expand Down Expand Up @@ -1128,7 +1124,7 @@ private void giveUpOnTryingToGetAvailableConnection() {
* from threads that are waiting for a permit to open a connection.
*/
void tryHandOverOrRelease(final UsageTrackingInternalConnection openConnection) {
boolean handedOver = withUnfairLock(lock, () -> {
boolean handedOver = withLock(lock, () -> {
for (//iterate from first (head) to last (tail)
MutableReference<PooledConnection> desiredConnectionSlot : desiredConnectionSlots) {
if (desiredConnectionSlot.reference == null) {
Expand All @@ -1145,7 +1141,7 @@ void tryHandOverOrRelease(final UsageTrackingInternalConnection openConnection)
}

void signalClosedOrPaused() {
withUnfairLock(lock, permitAvailableOrHandedOverOrClosedOrPausedCondition::signalAll);
withLock(lock, permitAvailableOrHandedOverOrClosedOrPausedCondition::signalAll);
}
}

Expand Down