Skip to content

Commit 94b3fab

Browse files
committed
reduce fields for packet reader/writer
1 parent d91547f commit 94b3fab

16 files changed

+95
-108
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
11
package javasabr.rlib.network;
22

3+
import java.nio.channels.AsynchronousSocketChannel;
4+
35
public interface UnsafeConnection<C extends UnsafeConnection<C>> extends Connection<C> {
46

57
Network<?> network();
68

9+
BufferAllocator bufferAllocator();
10+
11+
AsynchronousSocketChannel channel();
12+
713
void onConnected();
814
}

rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractConnection.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,11 @@ public WritablePacketWithFeedback(CompletableFuture<Boolean> attachment, Writabl
5353
final String remoteAddress;
5454
@Getter
5555
final Network<C> network;
56-
56+
@Getter
5757
final BufferAllocator bufferAllocator;
58+
@Getter
5859
final AsynchronousSocketChannel channel;
60+
5961
final Deque<WritableNetworkPacket<C>> pendingPackets;
6062
final StampedLock lock;
6163

rlib-network/src/main/java/javasabr/rlib/network/impl/DefaultDataConnection.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@ public DefaultDataConnection(
4141
protected NetworkPacketReader createPacketReader() {
4242
return new DefaultNetworkPacketReader<>(
4343
(C) this,
44-
channel,
45-
bufferAllocator,
4644
this::updateLastActivity,
4745
this::handleReceivedPacket,
4846
value -> createReadablePacket(),
@@ -53,8 +51,6 @@ protected NetworkPacketReader createPacketReader() {
5351
protected NetworkPacketWriter createPacketWriter() {
5452
return new DefaultNetworkPacketWriter<>(
5553
(C) this,
56-
channel,
57-
bufferAllocator,
5854
this::updateLastActivity,
5955
this::nextPacketToWrite,
6056
this::serializedPacket,

rlib-network/src/main/java/javasabr/rlib/network/impl/DefaultDataSslConnection.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@ public DefaultDataSslConnection(
4444
protected NetworkPacketReader createPacketReader() {
4545
return new DefaultSslNetworkPacketReader<>(
4646
(C) this,
47-
channel,
48-
bufferAllocator,
4947
this::updateLastActivity,
5048
this::handleReceivedPacket,
5149
value -> createReadablePacket(),
@@ -58,8 +56,6 @@ protected NetworkPacketReader createPacketReader() {
5856
protected NetworkPacketWriter createPacketWriter() {
5957
return new DefaultSslNetworkPacketWriter<>(
6058
(C) this,
61-
channel,
62-
bufferAllocator,
6359
this::updateLastActivity,
6460
this::nextPacketToWrite,
6561
this::serializedPacket,

rlib-network/src/main/java/javasabr/rlib/network/impl/IdBasedPacketConnection.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@ public IdBasedPacketConnection(
4848
protected NetworkPacketReader createPacketReader() {
4949
return new IdBasedNetworkPacketReader<>(
5050
(C) this,
51-
channel,
52-
bufferAllocator,
5351
this::updateLastActivity,
5452
this::handleReceivedPacket,
5553
packetLengthHeaderSize,
@@ -61,8 +59,6 @@ protected NetworkPacketReader createPacketReader() {
6159
protected NetworkPacketWriter createPacketWriter() {
6260
return new IdBasedNetworkPacketWriter<>(
6361
(C) this,
64-
channel,
65-
bufferAllocator,
6662
this::updateLastActivity,
6763
() -> nextPacketToWrite(),
6864
this::serializedPacket,

rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketReader.java

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import java.nio.ByteBuffer;
66
import java.nio.channels.AsynchronousCloseException;
7-
import java.nio.channels.AsynchronousSocketChannel;
87
import java.nio.channels.ClosedChannelException;
98
import java.nio.channels.CompletionHandler;
109
import java.nio.channels.InterruptedByTimeoutException;
@@ -13,7 +12,6 @@
1312
import java.util.concurrent.atomic.AtomicInteger;
1413
import java.util.function.Consumer;
1514
import javasabr.rlib.common.util.BufferUtils;
16-
import javasabr.rlib.network.BufferAllocator;
1715
import javasabr.rlib.network.Network;
1816
import javasabr.rlib.network.NetworkConfig;
1917
import javasabr.rlib.network.UnsafeConnection;
@@ -59,8 +57,6 @@ public void failed(Throwable exc, ByteBuffer readingBuffer) {
5957
final AtomicInteger emptyReadsCounter = new AtomicInteger(0);
6058

6159
final C connection;
62-
final AsynchronousSocketChannel socketChannel;
63-
final BufferAllocator bufferAllocator;
6460

6561
final ByteBuffer readBuffer;
6662
final ByteBuffer pendingBuffer;
@@ -77,16 +73,12 @@ public void failed(Throwable exc, ByteBuffer readingBuffer) {
7773

7874
protected AbstractNetworkPacketReader(
7975
C connection,
80-
AsynchronousSocketChannel socketChannel,
81-
BufferAllocator bufferAllocator,
8276
Runnable updateActivityFunction,
8377
Consumer<? super R> packetHandler,
8478
int maxPacketsByRead) {
8579
this.connection = connection;
86-
this.socketChannel = socketChannel;
87-
this.bufferAllocator = bufferAllocator;
88-
this.readBuffer = bufferAllocator.takeReadBuffer();
89-
this.pendingBuffer = bufferAllocator.takePendingBuffer();
80+
this.readBuffer = connection.bufferAllocator().takeReadBuffer();
81+
this.pendingBuffer = connection.bufferAllocator().takePendingBuffer();
9082
this.updateActivityFunction = updateActivityFunction;
9183
this.packetHandler = packetHandler;
9284
this.maxPacketsByRead = maxPacketsByRead;
@@ -116,7 +108,7 @@ protected void startReadImpl() {
116108
log.debug(remoteAddress(), "[%s] Start waiting for new data from channel..."::formatted);
117109
ByteBuffer buffer = bufferToReadFromChannel();
118110
try {
119-
socketChannel.read(buffer, buffer, readChannelHandler);
111+
connection.channel().read(buffer, buffer, readChannelHandler);
120112
} catch (RuntimeException ex) {
121113
log.error(ex);
122114
if (reading.compareAndSet(true, false)) {
@@ -367,9 +359,13 @@ protected void reAllocTempBigBuffers(ByteBuffer sourceBuffer, int fullPacketLeng
367359
log.debug(remoteAddress(), sourceBuffer.capacity(), fullPacketLength,
368360
"[%s] Resize temp big buffer from:[%s] to:[%s]"::formatted);
369361

370-
var newTempBuffer = bufferAllocator.takeBuffer(fullPacketLength + readBuffer.capacity());
362+
ByteBuffer newTempBuffer = connection
363+
.bufferAllocator()
364+
.takeBuffer(fullPacketLength + readBuffer.capacity());
365+
371366
log.debug(remoteAddress(), sourceBuffer, newTempBuffer,
372367
"[%s] Moved data from old temp big buffer:[%s] to new:[%s]"::formatted);
368+
373369
newTempBuffer.put(sourceBuffer);
374370

375371
freeTempBigBuffers();
@@ -378,23 +374,29 @@ protected void reAllocTempBigBuffers(ByteBuffer sourceBuffer, int fullPacketLeng
378374

379375
protected void allocTempBigBuffers(ByteBuffer sourceBuffer, int fullPacketLength) {
380376
int notConsumeBytes = sourceBuffer.remaining();
377+
381378
log.debug(
382379
notConsumeBytes,
383380
fullPacketLength,
384381
"Request temp big buffer to store part:[%s] of big packet with length:[%s]"::formatted);
385382

386-
var tempBigBuffer = bufferAllocator.takeBuffer(fullPacketLength + readBuffer.capacity());
383+
ByteBuffer tempBigBuffer = connection
384+
.bufferAllocator()
385+
.takeBuffer(fullPacketLength + readBuffer.capacity());
386+
387387
log.debug(sourceBuffer, tempBigBuffer, "Put data from old temp big buffer:[%s] to new:[%s]"::formatted);
388-
tempBigBuffer.put(sourceBuffer);
389388

389+
tempBigBuffer.put(sourceBuffer);
390390
this.tempBigBuffer = tempBigBuffer;
391391
}
392392

393393
protected void freeTempBigBuffers() {
394394
ByteBuffer tempBuffer = tempBigBuffer();
395395
if (tempBuffer != null) {
396396
tempBigBuffer(null);
397-
bufferAllocator.putBuffer(tempBuffer);
397+
connection
398+
.bufferAllocator()
399+
.putBuffer(tempBuffer);
398400
}
399401
}
400402

@@ -425,7 +427,7 @@ protected void handleEmptyReadFromChannel() {
425427
if (connection.closed()) {
426428
reading.compareAndSet(true, false);
427429
return;
428-
} else if (!socketChannel.isOpen()) {
430+
} else if (!connection.channel().isOpen()) {
429431
connection.close();
430432
return;
431433
}
@@ -502,7 +504,8 @@ protected abstract R createPacketFor(
502504

503505
@Override
504506
public void close() {
505-
bufferAllocator
507+
connection
508+
.bufferAllocator()
506509
.putReadBuffer(readBuffer)
507510
.putPendingBuffer(pendingBuffer);
508511
freeTempBigBuffers();

rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketWriter.java

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@
66
import java.io.IOException;
77
import java.nio.BufferOverflowException;
88
import java.nio.ByteBuffer;
9-
import java.nio.channels.AsynchronousSocketChannel;
109
import java.nio.channels.CompletionHandler;
1110
import java.util.concurrent.atomic.AtomicBoolean;
1211
import java.util.function.Consumer;
1312
import java.util.function.Supplier;
1413
import javasabr.rlib.functions.ObjBoolConsumer;
1514
import javasabr.rlib.network.BufferAllocator;
16-
import javasabr.rlib.network.Connection;
15+
import javasabr.rlib.network.NetworkConfig;
16+
import javasabr.rlib.network.UnsafeConnection;
17+
import javasabr.rlib.network.exception.MalformedProtocolException;
1718
import javasabr.rlib.network.packet.NetworkPacketWriter;
1819
import javasabr.rlib.network.packet.WritableNetworkPacket;
1920
import lombok.AccessLevel;
@@ -33,7 +34,7 @@
3334
@FieldDefaults(level = AccessLevel.PROTECTED)
3435
public abstract class AbstractNetworkPacketWriter<
3536
W extends WritableNetworkPacket<C>,
36-
C extends Connection<C>> implements NetworkPacketWriter {
37+
C extends UnsafeConnection<C>> implements NetworkPacketWriter {
3738

3839
final CompletionHandler<Integer, @Nullable WritableNetworkPacket<C>> writeHandler = new CompletionHandler<>() {
3940

@@ -51,8 +52,6 @@ public void failed(Throwable exc, @Nullable WritableNetworkPacket<C> packet) {
5152
final AtomicBoolean writing = new AtomicBoolean();
5253

5354
final C connection;
54-
final AsynchronousSocketChannel socketChannel;
55-
final BufferAllocator bufferAllocator;
5655
final ByteBuffer writeBuffer;
5756

5857
@Nullable
@@ -67,16 +66,12 @@ public void failed(Throwable exc, @Nullable WritableNetworkPacket<C> packet) {
6766

6867
public AbstractNetworkPacketWriter(
6968
C connection,
70-
AsynchronousSocketChannel socketChannel,
71-
BufferAllocator bufferAllocator,
7269
Runnable updateActivityFunction,
7370
Supplier<@Nullable WritableNetworkPacket<C>> packetProvider,
7471
Consumer<WritableNetworkPacket<C>> serializedToChannelPacketHandler,
7572
ObjBoolConsumer<WritableNetworkPacket<C>> sentPacketHandler) {
7673
this.connection = connection;
77-
this.socketChannel = socketChannel;
78-
this.bufferAllocator = bufferAllocator;
79-
this.writeBuffer = bufferAllocator.takeWriteBuffer();
74+
this.writeBuffer = connection.bufferAllocator().takeWriteBuffer();
8075
this.updateActivityFunction = updateActivityFunction;
8176
this.writablePacketProvider = packetProvider;
8277
this.serializedToChannelPacketHandler = serializedToChannelPacketHandler;
@@ -129,7 +124,7 @@ protected boolean writeBuffer(
129124
}
130125
writingBuffer = resultBuffer;
131126
log.debug(remoteAddress(), resultBuffer, (address, buff) -> "[%s] Write to channel data:\n" + hexDump(buff));
132-
socketChannel.write(resultBuffer, nextPacket, writeHandler);
127+
connection.channel().write(resultBuffer, nextPacket, writeHandler);
133128
return true;
134129
}
135130

@@ -148,10 +143,17 @@ protected ByteBuffer serialize(WritableNetworkPacket<C> packet) {
148143
W resultPacket = (W) packet;
149144

150145
int expectedLength = packet.expectedLength(connection);
151-
int totalSize = expectedLength == -1 ? -1 : totalSize(packet, expectedLength);
146+
int totalSize = expectedLength == WritableNetworkPacket.UNKNOWN_EXPECTED_BYTES
147+
? WritableNetworkPacket.UNKNOWN_EXPECTED_BYTES
148+
: totalSize(packet, expectedLength);
149+
150+
if (totalSize > 0) {
151+
validateMaxPacketSize(packet, totalSize);
152+
}
152153

153154
// if the packet is too big to use a write buffer
154155
if (expectedLength != -1 && totalSize > writeBuffer.capacity()) {
156+
BufferAllocator bufferAllocator = connection.bufferAllocator();
155157
ByteBuffer tempBuffer = bufferAllocator.takeBuffer(totalSize);
156158
try {
157159
ByteBuffer serialized = serialize(resultPacket, expectedLength, totalSize, tempBuffer);
@@ -173,6 +175,17 @@ protected ByteBuffer serialize(WritableNetworkPacket<C> packet) {
173175
}
174176
}
175177

178+
protected void validateMaxPacketSize(WritableNetworkPacket<C> packet, int totalSize) {
179+
NetworkConfig networkConfig = connection
180+
.network()
181+
.config();
182+
183+
if (totalSize > networkConfig.maxPacketSize()) {
184+
throw new MalformedProtocolException(
185+
"Writable packet:[" + packet + "] is too big:[" + totalSize + ">" + networkConfig.maxPacketSize() + "]");
186+
}
187+
}
188+
176189
/**
177190
* Gets total size of the packet if it's possible.
178191
*
@@ -341,7 +354,7 @@ protected void handleSuccessfulWritingData(Integer wroteBytes, @Nullable Writabl
341354
log.debug(remoteAddress(), writingBuffer,
342355
"[%s] Buffer was not consumed fully, try to write else [%s] bytes to channel"::formatted);
343356
try {
344-
socketChannel.write(writingBuffer, packet, writeHandler);
357+
connection.channel().write(writingBuffer, packet, writeHandler);
345358
} catch (RuntimeException ex) {
346359
log.error(ex);
347360
if (writing.compareAndSet(true, false)) {
@@ -385,7 +398,9 @@ protected void handleFailedWritingData(Throwable exception, @Nullable WritableNe
385398

386399
@Override
387400
public void close() {
388-
bufferAllocator.putWriteBuffer(writeBuffer);
401+
connection
402+
.bufferAllocator()
403+
.putWriteBuffer(writeBuffer);
389404
clearTempBuffers();
390405
writingBuffer = EMPTY_BUFFER;
391406
writing.compareAndSet(true, false);
@@ -396,7 +411,9 @@ protected void clearTempBuffers() {
396411
var writeTempBuffer = this.writeTempBuffer;
397412
if (writeTempBuffer != null) {
398413
this.writeTempBuffer = null;
399-
bufferAllocator.putBuffer(writeTempBuffer);
414+
connection
415+
.bufferAllocator()
416+
.putBuffer(writeTempBuffer);
400417
}
401418
}
402419
}

0 commit comments

Comments
 (0)