Skip to content

Commit c416ce2

Browse files
committed
drop using double write buffer
1 parent be37ac0 commit c416ce2

File tree

10 files changed

+62
-91
lines changed

10 files changed

+62
-91
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ repositories {
1414
}
1515
1616
ext {
17-
rlibVersion = "10.0.alpha3"
17+
rlibVersion = "10.0.alpha4"
1818
}
1919
2020
dependencies {

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
rootProject.version = "10.0.alpha3"
1+
rootProject.version = "10.0.alpha4"
22
group = 'javasabr.rlib'
33

44
allprojects {

rlib-network/src/loadTest/java/javasabr/rlib/network/StringNetworkLoadTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ void testServerWithMultiplyClients() {
130130
var serverAllocator = new DefaultBufferAllocator(serverConfig);
131131
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
132132

133-
int clientCount = 200;
133+
int clientCount = 500;
134134
int messagesPerIteration = 3_000;
135135
int expectedMessages = clientCount * messagesPerIteration * MAX_ITERATIONS;
136136

rlib-network/src/main/java/javasabr/rlib/network/ServerNetworkConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ default int scheduledThreadGroupSize() {
7777
return 1;
7878
}
7979

80+
8081
/**
8182
* Get a thread constructor which should be used to create network threads.
8283
*/

rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketReader.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ protected void startReadImpl() {
121121
if (reading.compareAndSet(true, false)) {
122122
retryReadLater();
123123
}
124+
} catch (Error error) {
125+
throw error;
124126
}
125127
}
126128

rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketWriter.java

Lines changed: 41 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,10 @@ public void failed(Throwable exc, @Nullable WritableNetworkPacket<C> packet) {
5353
final C connection;
5454
final AsynchronousSocketChannel socketChannel;
5555
final BufferAllocator bufferAllocator;
56-
final ByteBuffer firstWriteBuffer;
57-
final ByteBuffer secondWriteBuffer;
56+
final ByteBuffer writeBuffer;
5857

5958
@Nullable
60-
volatile ByteBuffer firstWriteTempBuffer;
61-
@Nullable
62-
volatile ByteBuffer secondWriteTempBuffer;
63-
59+
volatile ByteBuffer writeTempBuffer;
6460
@Getter(AccessLevel.PROTECTED)
6561
volatile ByteBuffer writingBuffer = EMPTY_BUFFER;
6662

@@ -80,8 +76,7 @@ public AbstractNetworkPacketWriter(
8076
this.connection = connection;
8177
this.socketChannel = socketChannel;
8278
this.bufferAllocator = bufferAllocator;
83-
this.firstWriteBuffer = bufferAllocator.takeWriteBuffer();
84-
this.secondWriteBuffer = bufferAllocator.takeWriteBuffer();
79+
this.writeBuffer = bufferAllocator.takeWriteBuffer();
8580
this.updateActivityFunction = updateActivityFunction;
8681
this.writablePacketProvider = packetProvider;
8782
this.serializedToChannelPacketHandler = serializedToChannelPacketHandler;
@@ -156,28 +151,23 @@ protected ByteBuffer serialize(WritableNetworkPacket<C> packet) {
156151
int totalSize = expectedLength == -1 ? -1 : totalSize(packet, expectedLength);
157152

158153
// if the packet is too big to use a write buffer
159-
if (expectedLength != -1 && totalSize > firstWriteBuffer.capacity()) {
160-
ByteBuffer first = bufferAllocator.takeBuffer(totalSize);
161-
ByteBuffer second = bufferAllocator.takeBuffer(totalSize);
162-
firstWriteTempBuffer = first;
163-
secondWriteTempBuffer = second;
154+
if (expectedLength != -1 && totalSize > writeBuffer.capacity()) {
155+
ByteBuffer tempBuffer = bufferAllocator.takeBuffer(totalSize);
164156
try {
165-
return serialize(resultPacket, expectedLength, totalSize, first, second);
157+
ByteBuffer serialized = serialize(resultPacket, expectedLength, totalSize, tempBuffer);
158+
writeTempBuffer = tempBuffer;
159+
return serialized;
166160
} catch (BufferOverflowException ex) {
167161
log.error(ex);
168-
bufferAllocator.putBuffer(first);
169-
bufferAllocator.putBuffer(second);
170-
firstWriteTempBuffer = null;
171-
secondWriteTempBuffer = null;
162+
bufferAllocator.putBuffer(tempBuffer);
163+
writeTempBuffer = null;
172164
throw new RuntimeException(ex);
173165
}
174166
} else {
175167
try {
176-
return serialize(resultPacket, expectedLength, totalSize, firstWriteBuffer, secondWriteBuffer);
168+
return serialize(resultPacket, expectedLength, totalSize, writeBuffer);
177169
} catch (BufferOverflowException ex) {
178170
log.error(ex);
179-
firstWriteBuffer.clear();
180-
secondWriteBuffer.clear();
181171
throw new RuntimeException(ex);
182172
}
183173
}
@@ -196,84 +186,76 @@ protected ByteBuffer serialize(WritableNetworkPacket<C> packet) {
196186
* @param packet the network packet to serialize.
197187
* @param expectedLength the packet's expected size.
198188
* @param totalSize the packet's total size.
199-
* @param firstBuffer the first buffer.
200-
* @param secondBuffer the second buffer.
189+
* @param writeBuffer the write buffer.
201190
* @return the final buffer to write to channel.
202191
*/
203192
protected ByteBuffer serialize(
204193
W packet,
205194
int expectedLength,
206195
int totalSize,
207-
ByteBuffer firstBuffer,
208-
ByteBuffer secondBuffer) {
209-
210-
if (!onBeforeSerialize(packet, expectedLength, totalSize, firstBuffer, secondBuffer)) {
211-
return firstBuffer.clear().limit(0);
212-
} else if (!doSerialize(packet, expectedLength, totalSize, firstBuffer, secondBuffer)) {
213-
return firstBuffer.clear().limit(0);
214-
} else if (!onAfterSerialize(packet, expectedLength, totalSize, firstBuffer, secondBuffer)) {
215-
return firstBuffer.clear().limit(0);
196+
ByteBuffer writeBuffer) {
197+
198+
if (!onBeforeSerialize(packet, expectedLength, totalSize, writeBuffer)) {
199+
return writeBuffer.clear().limit(0);
200+
} else if (!doSerialize(packet, expectedLength, totalSize, writeBuffer)) {
201+
return writeBuffer.clear().limit(0);
202+
} else if (!onAfterSerialize(packet, expectedLength, totalSize, writeBuffer)) {
203+
return writeBuffer.clear().limit(0);
216204
}
217205

218-
return onSerializeResult(packet, expectedLength, totalSize, firstBuffer, secondBuffer);
206+
return onSerializeResult(packet, expectedLength, totalSize, writeBuffer);
219207
}
220208

221209
/**
222-
* Handles the buffers before serializing packet's data.
210+
* Handles the buffer before serializing packet's data.
223211
*
224212
* @param packet the network packet.
225213
* @param expectedLength the packet's expected size.
226214
* @param totalSize the packet's total size.
227-
* @param firstBuffer the first buffer.
228-
* @param secondBuffer the second buffer.
215+
* @param writeBuffer the write buffer.
229216
* @return true if handling was successful.
230217
*/
231218
protected boolean onBeforeSerialize(
232219
W packet,
233220
int expectedLength,
234221
int totalSize,
235-
ByteBuffer firstBuffer,
236-
ByteBuffer secondBuffer) {
237-
firstBuffer.clear();
222+
ByteBuffer writeBuffer) {
223+
writeBuffer.clear();
238224
return true;
239225
}
240226

241227
/**
242-
* Serializes the network packet data to the buffers.
228+
* Serializes the network packet data to the buffer.
243229
*
244230
* @param packet the network packet.
245231
* @param expectedLength the packet's expected size.
246232
* @param totalSize the packet's total size.
247-
* @param firstBuffer the first buffer.
248-
* @param secondBuffer the second buffer.
233+
* @param writeBuffer the first buffer.
249234
* @return true if writing was successful.
250235
*/
251236
protected boolean doSerialize(
252237
W packet,
253238
int expectedLength,
254239
int totalSize,
255-
ByteBuffer firstBuffer,
256-
ByteBuffer secondBuffer) {
257-
return packet.write(connection, firstBuffer);
240+
ByteBuffer writeBuffer) {
241+
return packet.write(connection, writeBuffer);
258242
}
259243

260244
/**
261-
* Handles the buffers after serializing packet's data.
245+
* Handles the buffer after serializing packet's data.
262246
*
263247
* @param packet the network packet.
264248
* @param expectedLength the packet's expected size.
265249
* @param totalSize the packet's total size.
266-
* @param firstBuffer the first buffer.
267-
* @param secondBuffer the second buffer.
250+
* @param writeBuffer the write buffer.
268251
* @return true if handling was successful.
269252
*/
270253
protected boolean onAfterSerialize(
271254
W packet,
272255
int expectedLength,
273256
int totalSize,
274-
ByteBuffer firstBuffer,
275-
ByteBuffer secondBuffer) {
276-
firstBuffer.flip();
257+
ByteBuffer writeBuffer) {
258+
writeBuffer.flip();
277259
return true;
278260
}
279261

@@ -283,17 +265,15 @@ protected boolean onAfterSerialize(
283265
* @param packet the network packet.
284266
* @param expectedLength the packet's expected size.
285267
* @param totalSize the packet's total size.
286-
* @param firstBuffer the first buffer.
287-
* @param secondBuffer the second buffer.
268+
* @param writeBuffer the write buffer.
288269
* @return the result buffer.
289270
*/
290271
protected ByteBuffer onSerializeResult(
291272
W packet,
292273
int expectedLength,
293274
int totalSize,
294-
ByteBuffer firstBuffer,
295-
ByteBuffer secondBuffer) {
296-
return firstBuffer.position(0);
275+
ByteBuffer writeBuffer) {
276+
return writeBuffer;
297277
}
298278

299279
protected ByteBuffer writeHeader(ByteBuffer buffer, int position, int value, int headerSize) {
@@ -405,27 +385,18 @@ protected void handleFailedWritingData(Throwable exception, @Nullable WritableNe
405385

406386
@Override
407387
public void close() {
408-
bufferAllocator
409-
.putWriteBuffer(firstWriteBuffer)
410-
.putWriteBuffer(secondWriteBuffer);
388+
bufferAllocator.putWriteBuffer(writeBuffer);
411389
clearTempBuffers();
412390
writingBuffer = EMPTY_BUFFER;
413391
writing.compareAndSet(true, false);
414392
}
415393

416394
protected void clearTempBuffers() {
417395
this.writingBuffer = EMPTY_BUFFER;
418-
419-
var firstWriteTempBuffer = this.firstWriteTempBuffer;
420-
if (firstWriteTempBuffer != null) {
421-
this.firstWriteTempBuffer = null;
422-
bufferAllocator.putBuffer(firstWriteTempBuffer);
423-
}
424-
425-
var secondWriteTempBuffer = this.secondWriteTempBuffer;
426-
if (secondWriteTempBuffer != null) {
427-
this.secondWriteTempBuffer = null;
428-
bufferAllocator.putBuffer(secondWriteTempBuffer);
396+
var writeTempBuffer = this.writeTempBuffer;
397+
if (writeTempBuffer != null) {
398+
this.writeTempBuffer = null;
399+
bufferAllocator.putBuffer(writeTempBuffer);
429400
}
430401
}
431402
}

rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultNetworkPacketWriter.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,8 @@ protected boolean onBeforeSerialize(
5252
W packet,
5353
int expectedLength,
5454
int totalSize,
55-
ByteBuffer firstBuffer,
56-
ByteBuffer secondBuffer) {
57-
firstBuffer
55+
ByteBuffer writeBuffer) {
56+
writeBuffer
5857
.clear()
5958
.position(packetLengthHeaderSize);
6059
return true;
@@ -65,9 +64,8 @@ protected ByteBuffer onSerializeResult(
6564
W packet,
6665
int expectedLength,
6766
int totalSize,
68-
ByteBuffer firstBuffer,
69-
ByteBuffer secondBuffer) {
70-
return writePacketLength(firstBuffer, firstBuffer.limit())
67+
ByteBuffer writeBuffer) {
68+
return writePacketLength(writeBuffer, writeBuffer.limit())
7169
.position(0);
7270
}
7371

rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultSslNetworkPacketWriter.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,8 @@ protected boolean onBeforeSerialize(
5757
W packet,
5858
int expectedLength,
5959
int totalSize,
60-
ByteBuffer firstBuffer,
61-
ByteBuffer secondBuffer) {
62-
firstBuffer
60+
ByteBuffer writeBuffer) {
61+
writeBuffer
6362
.clear()
6463
.position(packetLengthHeaderSize);
6564
return true;
@@ -70,9 +69,9 @@ protected ByteBuffer onSerializeResult(
7069
W packet,
7170
int expectedLength,
7271
int totalSize,
73-
ByteBuffer firstBuffer,
74-
ByteBuffer secondBuffer) {
75-
return writePacketLength(firstBuffer, firstBuffer.limit()).position(0);
72+
ByteBuffer writeBuffer) {
73+
return writePacketLength(writeBuffer, writeBuffer.limit())
74+
.position(0);
7675
}
7776

7877
protected ByteBuffer writePacketLength(ByteBuffer buffer, int packetLength) {

rlib-network/src/main/java/javasabr/rlib/network/packet/impl/IdBasedNetworkPacketWriter.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,8 @@ protected boolean doSerialize(
5050
W packet,
5151
int expectedLength,
5252
int totalSize,
53-
ByteBuffer firstBuffer,
54-
ByteBuffer secondBuffer) {
55-
writeHeader(firstBuffer, packet.packetId(), packetIdHeaderSize);
56-
return super.doSerialize(packet, expectedLength, totalSize, firstBuffer, secondBuffer);
53+
ByteBuffer writeBuffer) {
54+
writeHeader(writeBuffer, packet.packetId(), packetIdHeaderSize);
55+
return super.doSerialize(packet, expectedLength, totalSize, writeBuffer);
5756
}
5857
}

rlib-network/src/main/java/javasabr/rlib/network/server/impl/DefaultServerNetwork.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.nio.channels.CompletionHandler;
1212
import java.util.concurrent.ExecutorService;
1313
import java.util.concurrent.Executors;
14+
import java.util.concurrent.LinkedBlockingQueue;
1415
import java.util.concurrent.ScheduledExecutorService;
1516
import java.util.concurrent.ScheduledThreadPoolExecutor;
1617
import java.util.concurrent.SynchronousQueue;
@@ -180,9 +181,9 @@ protected ExecutorService buildExecutor(ServerNetworkConfig config) {
180181
config.threadGroupMaxSize(),
181182
120,
182183
TimeUnit.SECONDS,
183-
new SynchronousQueue<>(),
184+
new LinkedBlockingQueue<>(),
184185
threadFactory,
185-
new ThreadPoolExecutor.CallerRunsPolicy());
186+
new ThreadPoolExecutor.AbortPolicy());
186187
} else {
187188
executorService = Executors.newFixedThreadPool(config.threadGroupMinSize(), threadFactory);
188189
}

0 commit comments

Comments
 (0)