Skip to content

Commit b976e85

Browse files
rvansaSanne
authored andcommitted
HHH-11267 Fix SessionRefreshTest
* Refactor common code from NonTx- and TxInvalidationInterceptor to common base
1 parent 6aaaf37 commit b976e85

File tree

5 files changed

+135
-134
lines changed

5 files changed

+135
-134
lines changed
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Hibernate, Relational Persistence for Idiomatic Java
3+
*
4+
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
5+
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
6+
*/
7+
package org.hibernate.cache.infinispan.access;
8+
9+
import org.infinispan.commands.CommandsFactory;
10+
import org.infinispan.factories.annotations.Inject;
11+
import org.infinispan.factories.annotations.Start;
12+
import org.infinispan.interceptors.base.BaseRpcInterceptor;
13+
import org.infinispan.jmx.JmxStatisticsExposer;
14+
import org.infinispan.jmx.annotations.DataType;
15+
import org.infinispan.jmx.annotations.ManagedAttribute;
16+
import org.infinispan.jmx.annotations.ManagedOperation;
17+
import org.infinispan.jmx.annotations.MeasurementType;
18+
import org.infinispan.jmx.annotations.Parameter;
19+
import org.infinispan.remoting.inboundhandler.DeliverOrder;
20+
import org.infinispan.remoting.rpc.ResponseMode;
21+
import org.infinispan.remoting.rpc.RpcOptions;
22+
import org.infinispan.remoting.transport.Address;
23+
import org.infinispan.statetransfer.StateTransferManager;
24+
25+
import java.util.List;
26+
import java.util.concurrent.atomic.AtomicLong;
27+
28+
public abstract class BaseInvalidationInterceptor extends BaseRpcInterceptor implements JmxStatisticsExposer {
29+
private final AtomicLong invalidations = new AtomicLong(0);
30+
protected CommandsFactory commandsFactory;
31+
protected StateTransferManager stateTransferManager;
32+
protected boolean statisticsEnabled;
33+
protected RpcOptions syncRpcOptions;
34+
protected RpcOptions asyncRpcOptions;
35+
36+
@Inject
37+
public void injectDependencies(CommandsFactory commandsFactory, StateTransferManager stateTransferManager) {
38+
this.commandsFactory = commandsFactory;
39+
this.stateTransferManager = stateTransferManager;
40+
}
41+
42+
@Start
43+
private void start() {
44+
this.setStatisticsEnabled(cacheConfiguration.jmxStatistics().enabled());
45+
syncRpcOptions = rpcManager.getRpcOptionsBuilder(ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, DeliverOrder.NONE).build();
46+
asyncRpcOptions = rpcManager.getDefaultRpcOptions(false);
47+
}
48+
49+
@ManagedOperation(
50+
description = "Resets statistics gathered by this component",
51+
displayName = "Reset statistics"
52+
)
53+
public void resetStatistics() {
54+
invalidations.set(0);
55+
}
56+
57+
@ManagedAttribute(
58+
displayName = "Statistics enabled",
59+
description = "Enables or disables the gathering of statistics by this component",
60+
dataType = DataType.TRAIT,
61+
writable = true
62+
)
63+
public boolean getStatisticsEnabled() {
64+
return this.statisticsEnabled;
65+
}
66+
67+
public void setStatisticsEnabled(@Parameter(name = "enabled", description = "Whether statistics should be enabled or disabled (true/false)") boolean enabled) {
68+
this.statisticsEnabled = enabled;
69+
}
70+
71+
@ManagedAttribute(
72+
description = "Number of invalidations",
73+
displayName = "Number of invalidations",
74+
measurementType = MeasurementType.TRENDSUP
75+
)
76+
public long getInvalidations() {
77+
return invalidations.get();
78+
}
79+
80+
protected void incrementInvalidations() {
81+
if (statisticsEnabled) {
82+
invalidations.incrementAndGet();
83+
}
84+
}
85+
86+
protected List<Address> getMembers() {
87+
return stateTransferManager.getCacheTopology().getMembers();
88+
}
89+
}

hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxInvalidationInterceptor.java

Lines changed: 4 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
import org.hibernate.cache.infinispan.util.CacheCommandInitializer;
1010
import org.hibernate.cache.infinispan.util.InfinispanMessageLogger;
11-
import org.infinispan.commands.CommandsFactory;
1211
import org.infinispan.commands.FlagAffectedCommand;
1312
import org.infinispan.commands.write.ClearCommand;
1413
import org.infinispan.commands.write.InvalidateCommand;
@@ -21,18 +20,8 @@
2120
import org.infinispan.context.Flag;
2221
import org.infinispan.context.InvocationContext;
2322
import org.infinispan.factories.annotations.Inject;
24-
import org.infinispan.factories.annotations.Start;
2523
import org.infinispan.interceptors.InvalidationInterceptor;
26-
import org.infinispan.interceptors.base.BaseRpcInterceptor;
27-
import org.infinispan.jmx.JmxStatisticsExposer;
28-
import org.infinispan.jmx.annotations.DataType;
2924
import org.infinispan.jmx.annotations.MBean;
30-
import org.infinispan.jmx.annotations.ManagedAttribute;
31-
import org.infinispan.jmx.annotations.ManagedOperation;
32-
import org.infinispan.jmx.annotations.MeasurementType;
33-
import org.infinispan.jmx.annotations.Parameter;
34-
35-
import java.util.concurrent.atomic.AtomicLong;
3625

3726
/**
3827
* This interceptor should completely replace default InvalidationInterceptor.
@@ -45,12 +34,9 @@
4534
* @author Galder Zamarreño
4635
*/
4736
@MBean(objectName = "Invalidation", description = "Component responsible for invalidating entries on remote caches when entries are written to locally.")
48-
public class NonTxInvalidationInterceptor extends BaseRpcInterceptor implements JmxStatisticsExposer {
49-
private final AtomicLong invalidations = new AtomicLong(0);
37+
public class NonTxInvalidationInterceptor extends BaseInvalidationInterceptor {
5038
private final PutFromLoadValidator putFromLoadValidator;
51-
private CommandsFactory commandsFactory;
5239
private CacheCommandInitializer commandInitializer;
53-
private boolean statisticsEnabled;
5440

5541
private static final InfinispanMessageLogger log = InfinispanMessageLogger.Provider.getLog(InvalidationInterceptor.class);
5642

@@ -59,16 +45,10 @@ public NonTxInvalidationInterceptor(PutFromLoadValidator putFromLoadValidator) {
5945
}
6046

6147
@Inject
62-
public void injectDependencies(CommandsFactory commandsFactory, CacheCommandInitializer commandInitializer) {
63-
this.commandsFactory = commandsFactory;
48+
public void injectDependencies(CacheCommandInitializer commandInitializer) {
6449
this.commandInitializer = commandInitializer;
6550
}
6651

67-
@Start
68-
private void start() {
69-
this.setStatisticsEnabled(cacheConfiguration.jmxStatistics().enabled());
70-
}
71-
7252
@Override
7353
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
7454
if (!isPutForExternalRead(command)) {
@@ -93,7 +73,7 @@ public Object visitClearCommand(InvocationContext ctx, ClearCommand command) thr
9373
if (!isLocalModeForced(command)) {
9474
// just broadcast the clear command - this is simplest!
9575
if (ctx.isOriginLocal()) {
96-
rpcManager.invokeRemotely(null, command, rpcManager.getDefaultRpcOptions(defaultSynchronous));
76+
rpcManager.invokeRemotely(getMembers(), command, isSynchronous(command) ? syncRpcOptions : asyncRpcOptions);
9777
}
9878
}
9979
return retval;
@@ -132,13 +112,7 @@ private void invalidateAcrossCluster(FlagAffectedCommand command, Object[] keys)
132112
log.debug("Cache [" + rpcManager.getAddress() + "] replicating " + invalidateCommand);
133113
}
134114

135-
rpcManager.invokeRemotely(null, invalidateCommand, rpcManager.getDefaultRpcOptions(isSynchronous(command)));
136-
}
137-
}
138-
139-
private void incrementInvalidations() {
140-
if (statisticsEnabled) {
141-
invalidations.incrementAndGet();
115+
rpcManager.invokeRemotely(getMembers(), invalidateCommand, isSynchronous(command) ? syncRpcOptions : asyncRpcOptions);
142116
}
143117
}
144118

@@ -150,35 +124,4 @@ private boolean isPutForExternalRead(FlagAffectedCommand command) {
150124
return false;
151125
}
152126

153-
@ManagedOperation(
154-
description = "Resets statistics gathered by this component",
155-
displayName = "Reset statistics"
156-
)
157-
public void resetStatistics() {
158-
invalidations.set(0);
159-
}
160-
161-
@ManagedAttribute(
162-
displayName = "Statistics enabled",
163-
description = "Enables or disables the gathering of statistics by this component",
164-
dataType = DataType.TRAIT,
165-
writable = true
166-
)
167-
public boolean getStatisticsEnabled() {
168-
return this.statisticsEnabled;
169-
}
170-
171-
public void setStatisticsEnabled(@Parameter(name = "enabled", description = "Whether statistics should be enabled or disabled (true/false)") boolean enabled) {
172-
this.statisticsEnabled = enabled;
173-
}
174-
175-
@ManagedAttribute(
176-
description = "Number of invalidations",
177-
displayName = "Number of invalidations",
178-
measurementType = MeasurementType.TRENDSUP
179-
)
180-
public long getInvalidations() {
181-
return invalidations.get();
182-
}
183-
184127
}

hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxPutFromLoadInterceptor.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,16 @@
1313
import org.infinispan.commands.write.InvalidateCommand;
1414
import org.infinispan.context.InvocationContext;
1515
import org.infinispan.factories.annotations.Inject;
16+
import org.infinispan.factories.annotations.Start;
1617
import org.infinispan.interceptors.base.BaseCustomInterceptor;
1718
import org.infinispan.remoting.inboundhandler.DeliverOrder;
19+
import org.infinispan.remoting.rpc.ResponseMode;
1820
import org.infinispan.remoting.rpc.RpcManager;
21+
import org.infinispan.remoting.rpc.RpcOptions;
22+
import org.infinispan.remoting.transport.Address;
23+
import org.infinispan.statetransfer.StateTransferManager;
24+
25+
import java.util.List;
1926

2027
/**
2128
* Non-transactional counterpart of {@link TxPutFromLoadInterceptor}.
@@ -30,16 +37,24 @@ public class NonTxPutFromLoadInterceptor extends BaseCustomInterceptor {
3037
private final PutFromLoadValidator putFromLoadValidator;
3138
private CacheCommandInitializer commandInitializer;
3239
private RpcManager rpcManager;
40+
private StateTransferManager stateTransferManager;
41+
private RpcOptions asyncUnordered;
3342

3443
public NonTxPutFromLoadInterceptor(PutFromLoadValidator putFromLoadValidator, String cacheName) {
3544
this.putFromLoadValidator = putFromLoadValidator;
3645
this.cacheName = cacheName;
3746
}
3847

3948
@Inject
40-
public void injectDependencies(CacheCommandInitializer commandInitializer, RpcManager rpcManager) {
49+
public void injectDependencies(CacheCommandInitializer commandInitializer, RpcManager rpcManager, StateTransferManager stateTransferManager) {
4150
this.commandInitializer = commandInitializer;
4251
this.rpcManager = rpcManager;
52+
this.stateTransferManager = stateTransferManager;
53+
}
54+
55+
@Start
56+
public void start() {
57+
asyncUnordered = rpcManager.getRpcOptionsBuilder(ResponseMode.ASYNCHRONOUS, DeliverOrder.NONE).build();
4358
}
4459

4560
@Override
@@ -56,6 +71,7 @@ public void broadcastEndInvalidationCommand(Object[] keys, Object sessionTransac
5671
assert sessionTransactionId != null;
5772
EndInvalidationCommand endInvalidationCommand = commandInitializer.buildEndInvalidationCommand(
5873
cacheName, keys, sessionTransactionId);
59-
rpcManager.invokeRemotely(null, endInvalidationCommand, rpcManager.getDefaultRpcOptions(false, DeliverOrder.NONE));
74+
List<Address> members = stateTransferManager.getCacheTopology().getMembers();
75+
rpcManager.invokeRemotely(members, endInvalidationCommand, asyncUnordered);
6076
}
6177
}

hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TxInvalidationInterceptor.java

Lines changed: 7 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,10 @@
1111
import java.util.HashSet;
1212
import java.util.List;
1313
import java.util.Set;
14-
import java.util.concurrent.atomic.AtomicLong;
1514

1615
import org.hibernate.cache.infinispan.util.InfinispanMessageLogger;
1716

1817
import org.infinispan.commands.AbstractVisitor;
19-
import org.infinispan.commands.CommandsFactory;
2018
import org.infinispan.commands.FlagAffectedCommand;
2119
import org.infinispan.commands.ReplicableCommand;
2220
import org.infinispan.commands.control.LockControlCommand;
@@ -33,16 +31,8 @@
3331
import org.infinispan.context.InvocationContext;
3432
import org.infinispan.context.impl.LocalTxInvocationContext;
3533
import org.infinispan.context.impl.TxInvocationContext;
36-
import org.infinispan.factories.annotations.Inject;
37-
import org.infinispan.factories.annotations.Start;
38-
import org.infinispan.interceptors.base.BaseRpcInterceptor;
39-
import org.infinispan.jmx.JmxStatisticsExposer;
40-
import org.infinispan.jmx.annotations.DataType;
4134
import org.infinispan.jmx.annotations.MBean;
42-
import org.infinispan.jmx.annotations.ManagedAttribute;
43-
import org.infinispan.jmx.annotations.ManagedOperation;
44-
import org.infinispan.jmx.annotations.MeasurementType;
45-
import org.infinispan.jmx.annotations.Parameter;
35+
import org.infinispan.remoting.transport.Address;
4636

4737
/**
4838
* This interceptor acts as a replacement to the replication interceptor when the CacheImpl is configured with
@@ -58,24 +48,9 @@
5848
* @since 4.0
5949
*/
6050
@MBean(objectName = "Invalidation", description = "Component responsible for invalidating entries on remote caches when entries are written to locally.")
61-
public class TxInvalidationInterceptor extends BaseRpcInterceptor implements JmxStatisticsExposer {
62-
63-
private final AtomicLong invalidations = new AtomicLong( 0 );
64-
private CommandsFactory commandsFactory;
65-
private boolean statisticsEnabled;
66-
51+
public class TxInvalidationInterceptor extends BaseInvalidationInterceptor {
6752
private static final InfinispanMessageLogger log = InfinispanMessageLogger.Provider.getLog( TxInvalidationInterceptor.class );
6853

69-
@Inject
70-
public void injectDependencies(CommandsFactory commandsFactory) {
71-
this.commandsFactory = commandsFactory;
72-
}
73-
74-
@Start
75-
private void start() {
76-
this.setStatisticsEnabled( cacheConfiguration.jmxStatistics().enabled() );
77-
}
78-
7954
@Override
8055
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
8156
if ( !isPutForExternalRead( command ) ) {
@@ -100,7 +75,7 @@ public Object visitClearCommand(InvocationContext ctx, ClearCommand command) thr
10075
if ( !isLocalModeForced( command ) ) {
10176
// just broadcast the clear command - this is simplest!
10277
if ( ctx.isOriginLocal() ) {
103-
rpcManager.invokeRemotely( null, command, rpcManager.getDefaultRpcOptions( defaultSynchronous ) );
78+
rpcManager.invokeRemotely( getMembers(), command, isSynchronous(command) ? syncRpcOptions : asyncRpcOptions );
10479
}
10580
}
10681
return retval;
@@ -137,8 +112,9 @@ public Object visitLockControlCommand(TxInvocationContext ctx, LockControlComman
137112
if ( ctx.isOriginLocal() ) {
138113
//unlock will happen async as it is a best effort
139114
boolean sync = !command.isUnlock();
140-
( (LocalTxInvocationContext) ctx ).remoteLocksAcquired( rpcManager.getTransport().getMembers() );
141-
rpcManager.invokeRemotely( null, command, rpcManager.getDefaultRpcOptions( sync ) );
115+
List<Address> members = getMembers();
116+
( (LocalTxInvocationContext) ctx ).remoteLocksAcquired(members);
117+
rpcManager.invokeRemotely(members, command, sync ? syncRpcOptions : asyncRpcOptions );
142118
}
143119
return retVal;
144120
}
@@ -244,13 +220,7 @@ private void invalidateAcrossCluster(boolean synchronous, Object[] keys, Invocat
244220
// but this does not impact consistency and the speed benefit is worth it.
245221
command = commandsFactory.buildPrepareCommand( txCtx.getGlobalTransaction(), Collections.<WriteCommand>singletonList( invalidateCommand ), true );
246222
}
247-
rpcManager.invokeRemotely( null, command, rpcManager.getDefaultRpcOptions( synchronous ) );
248-
}
249-
250-
private void incrementInvalidations() {
251-
if ( statisticsEnabled ) {
252-
invalidations.incrementAndGet();
253-
}
223+
rpcManager.invokeRemotely( getMembers(), command, synchronous ? syncRpcOptions : asyncRpcOptions );
254224
}
255225

256226
private boolean isPutForExternalRead(FlagAffectedCommand command) {
@@ -260,36 +230,4 @@ private boolean isPutForExternalRead(FlagAffectedCommand command) {
260230
}
261231
return false;
262232
}
263-
264-
@ManagedOperation(
265-
description = "Resets statistics gathered by this component",
266-
displayName = "Reset statistics"
267-
)
268-
public void resetStatistics() {
269-
invalidations.set( 0 );
270-
}
271-
272-
@ManagedAttribute(
273-
displayName = "Statistics enabled",
274-
description = "Enables or disables the gathering of statistics by this component",
275-
dataType = DataType.TRAIT,
276-
writable = true
277-
)
278-
public boolean getStatisticsEnabled() {
279-
return this.statisticsEnabled;
280-
}
281-
282-
public void setStatisticsEnabled(
283-
@Parameter(name = "enabled", description = "Whether statistics should be enabled or disabled (true/false)") boolean enabled) {
284-
this.statisticsEnabled = enabled;
285-
}
286-
287-
@ManagedAttribute(
288-
description = "Number of invalidations",
289-
displayName = "Number of invalidations",
290-
measurementType = MeasurementType.TRENDSUP
291-
)
292-
public long getInvalidations() {
293-
return invalidations.get();
294-
}
295233
}

0 commit comments

Comments
 (0)