Skip to content

Commit 7146c61

Browse files
committed
improving network module
1 parent fc1e6d9 commit 7146c61

20 files changed

+189
-57
lines changed

rlib-network/src/loadTest/java/javasabr/rlib/network/StringNetworkLoadTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ void testServerWithMultiplyClients() {
122122

123123
var serverConfig = SimpleServerNetworkConfig
124124
.builder()
125-
.threadGroupSize(10)
125+
.threadGroupMaxSize(10)
126126
.writeBufferSize(1024)
127127
.readBufferSize(1024)
128128
.pendingBufferSize(2048)

rlib-network/src/loadTest/java/javasabr/rlib/network/StringSslNetworkLoadTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ void testServerWithMultiplyClients() {
129129

130130
var serverConfig = SimpleServerNetworkConfig
131131
.builder()
132-
.threadGroupSize(10)
132+
.threadGroupMaxSize(10)
133133
.writeBufferSize(1024)
134134
.readBufferSize(1024)
135135
.pendingBufferSize(2048)

rlib-network/src/main/java/javasabr/rlib/network/Network.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
package javasabr.rlib.network;
22

3+
import java.util.concurrent.ScheduledExecutorService;
4+
35
/**
46
* The interface to implement an asynchronous network.
57
*
68
* @author JavaSaBr
79
*/
810
public interface Network<C extends Connection<C>> {
911

12+
ScheduledExecutorService scheduledExecutor();
13+
14+
NetworkConfig config();
15+
1016
/**
1117
* Shutdown this network.
1218
*/

rlib-network/src/main/java/javasabr/rlib/network/NetworkConfig.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ class SimpleNetworkConfig implements NetworkConfig {
2828
private int pendingBufferSize = 4096;
2929
@Builder.Default
3030
private int writeBufferSize = 2048;
31+
@Builder.Default
32+
private int retryDelayInMs = 1000;
3133
}
3234

3335
NetworkConfig DEFAULT_CLIENT = new NetworkConfig() {
@@ -46,34 +48,46 @@ default String threadGroupName() {
4648
}
4749

4850
/**
49-
* Get size of buffer with used to collect received data from network.
51+
* Get a group name of scheduling network threads.
52+
*/
53+
default String scheduledThreadGroupName() {
54+
return "ScheduledNetworkThread";
55+
}
56+
57+
/**
58+
* Get size of buffer with will be used to collect received data from network.
5059
*/
5160
default int readBufferSize() {
5261
return 2048;
5362
}
5463

5564
/**
56-
* Get size of buffer with pending data. Pending buffer allows to construct a packet with bigger data than
65+
* Gets size of buffer with pending reading data. Pending buffer allows to construct a packet with bigger data part than
5766
* {@link #readBufferSize()}. It should be at least 2x of {@link #readBufferSize()}
58-
*
59-
* @return the pending buffer's size.
6067
*/
6168
default int pendingBufferSize() {
6269
return readBufferSize() * 2;
6370
}
6471

6572
/**
66-
* Get size of buffer which used to serialize packets to bytes.
73+
* Gets a size of buffer which will be used for packet serialization.
6774
*/
6875
default int writeBufferSize() {
6976
return 2048;
7077
}
7178

79+
/**
80+
* Gets a timeout for retry read/write operation.
81+
*/
82+
default int retryDelayInMs() {
83+
return 1000;
84+
}
85+
7286
default ByteOrder byteOrder() {
7387
return ByteOrder.BIG_ENDIAN;
7488
}
7589

76-
default boolean isDirectByteBuffer() {
90+
default boolean useDirectByteBuffer() {
7791
return false;
7892
}
7993
}

rlib-network/src/main/java/javasabr/rlib/network/ServerNetworkConfig.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,21 +32,27 @@ class SimpleServerNetworkConfig implements ServerNetworkConfig {
3232
@Builder.Default
3333
private int writeBufferSize = 2048;
3434
@Builder.Default
35-
private int threadGroupSize = 1;
35+
private int retryDelayInMs = 1000;
36+
@Builder.Default
37+
private int threadGroupMinSize = 1;
38+
@Builder.Default
39+
private int threadGroupMaxSize = 1;
40+
@Builder.Default
41+
private int scheduledThreadGroupSize = 1;
3642
@Builder.Default
3743
private int threadPriority = Thread.NORM_PRIORITY;
3844
}
3945

4046
ServerNetworkConfig DEFAULT_SERVER = new ServerNetworkConfig() {
4147

4248
@Override
43-
public int threadGroupMinSize() {
44-
return 1;
49+
public String threadGroupName() {
50+
return "ServerNetworkThread";
4551
}
4652

4753
@Override
48-
public String threadGroupName() {
49-
return "ServerNetworkThread";
54+
public String scheduledThreadGroupName() {
55+
return "ServerScheduledNetworkThread";
5056
}
5157
};
5258

@@ -64,6 +70,13 @@ default int threadGroupMaxSize() {
6470
return threadGroupMinSize();
6571
}
6672

73+
/**
74+
* Get a size of network scheduled thread executor.
75+
*/
76+
default int scheduledThreadGroupSize() {
77+
return 1;
78+
}
79+
6780
/**
6881
* Get a thread constructor which should be used to create network threads.
6982
*/

rlib-network/src/main/java/javasabr/rlib/network/UnsafeConnection.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,7 @@
22

33
public interface UnsafeConnection<C extends UnsafeConnection<C>> extends Connection<C> {
44

5+
Network<?> network();
6+
57
void onConnected();
68
}

rlib-network/src/main/java/javasabr/rlib/network/client/impl/DefaultClientNetwork.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@
66
import java.nio.channels.CompletionHandler;
77
import java.util.Optional;
88
import java.util.concurrent.CompletableFuture;
9+
import java.util.concurrent.Executors;
10+
import java.util.concurrent.ScheduledExecutorService;
911
import java.util.concurrent.atomic.AtomicBoolean;
1012
import java.util.function.BiFunction;
1113
import javasabr.rlib.common.util.AsyncUtils;
14+
import javasabr.rlib.common.util.GroupThreadFactory;
1215
import javasabr.rlib.common.util.ThreadUtils;
1316
import javasabr.rlib.common.util.Utils;
1417
import javasabr.rlib.network.Connection;
@@ -38,6 +41,9 @@ public class DefaultClientNetwork<C extends Connection<C>> extends AbstractNetwo
3841

3942
final AtomicBoolean connecting;
4043

44+
@Getter
45+
final ScheduledExecutorService scheduledExecutor;
46+
4147
@Nullable
4248
@Getter(AccessLevel.PROTECTED)
4349
volatile CompletableFuture<C> pendingConnection;
@@ -51,6 +57,8 @@ public DefaultClientNetwork(
5157
BiFunction<Network<C>, AsynchronousSocketChannel, C> channelToConnection) {
5258
super(config, channelToConnection);
5359
this.connecting = new AtomicBoolean(false);
60+
this.scheduledExecutor = Executors
61+
.newSingleThreadScheduledExecutor(new GroupThreadFactory(config.scheduledThreadGroupName()));
5462
log.info(config, DefaultClientNetwork::buildConfigDescription);
5563
}
5664

@@ -85,7 +93,7 @@ public CompletableFuture<C> connectAsync(InetSocketAddress serverAddress) {
8593
@Override
8694
public void completed(@Nullable Void result, DefaultClientNetwork<C> network) {
8795
SocketAddress remoteAddress = NetworkUtils.getRemoteAddress(channel);
88-
log.info(remoteAddress, "Connected to server:[%s]"::formatted);
96+
log.info(remoteAddress, "[%s] Connected to server."::formatted);
8997
asyncResult.complete(channelToConnection.apply(network, channel));
9098
}
9199

rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractConnection.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,9 @@ public WritablePacketWithFeedback(CompletableFuture<Boolean> attachment, Writabl
5151

5252
@Getter
5353
final String remoteAddress;
54-
54+
@Getter
5555
final Network<C> network;
56+
5657
final BufferAllocator bufferAllocator;
5758
final AsynchronousSocketChannel channel;
5859
final Deque<WritableNetworkPacket<C>> pendingPackets;

rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractNetwork.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
import javasabr.rlib.network.NetworkConfig;
88
import lombok.AccessLevel;
99
import lombok.CustomLog;
10+
import lombok.Getter;
1011
import lombok.RequiredArgsConstructor;
12+
import lombok.experimental.Accessors;
1113
import lombok.experimental.FieldDefaults;
1214

1315
/**
@@ -17,8 +19,11 @@
1719
*/
1820
@CustomLog
1921
@RequiredArgsConstructor
22+
@Accessors(fluent = true, chain = false)
2023
@FieldDefaults(level = AccessLevel.PROTECTED, makeFinal = true)
2124
public abstract class AbstractNetwork<C extends Connection<C>> implements Network<C> {
25+
26+
@Getter
2227
NetworkConfig config;
2328
BiFunction<Network<C>, AsynchronousSocketChannel, C> channelToConnection;
2429
}

rlib-network/src/main/java/javasabr/rlib/network/impl/DefaultBufferAllocator.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public DefaultBufferAllocator(NetworkConfig config) {
2828
public ByteBuffer takeReadBuffer() {
2929
int bufferSize = config.readBufferSize();
3030
log.debug(bufferSize, "Allocate new read buffer with size:[%s]"::formatted);
31-
return config.isDirectByteBuffer()
31+
return config.useDirectByteBuffer()
3232
? ByteBuffer.allocateDirect(bufferSize)
3333
: ByteBuffer
3434
.allocate(bufferSize)
@@ -40,7 +40,7 @@ public ByteBuffer takeReadBuffer() {
4040
public ByteBuffer takePendingBuffer() {
4141
int bufferSize = config.pendingBufferSize();
4242
log.debug(bufferSize, "Allocate new pending buffer with size:[%s]"::formatted);
43-
return config.isDirectByteBuffer()
43+
return config.useDirectByteBuffer()
4444
? ByteBuffer.allocateDirect(bufferSize)
4545
: ByteBuffer
4646
.allocate(bufferSize)
@@ -52,7 +52,7 @@ public ByteBuffer takePendingBuffer() {
5252
public ByteBuffer takeWriteBuffer() {
5353
int bufferSize = config.writeBufferSize();
5454
log.debug(bufferSize, "Allocate new write buffer with size:[%s]"::formatted);
55-
return config.isDirectByteBuffer()
55+
return config.useDirectByteBuffer()
5656
? ByteBuffer.allocateDirect(bufferSize)
5757
: ByteBuffer
5858
.allocate(bufferSize)
@@ -63,7 +63,7 @@ public ByteBuffer takeWriteBuffer() {
6363
@Override
6464
public ByteBuffer takeBuffer(int bufferSize) {
6565
log.debug(bufferSize, "Allocate new buffer with size:[%s]"::formatted);
66-
return config.isDirectByteBuffer()
66+
return config.useDirectByteBuffer()
6767
? ByteBuffer.allocateDirect(bufferSize)
6868
: ByteBuffer
6969
.allocate(bufferSize)

0 commit comments

Comments
 (0)