66 */
77package org .hibernate .test .cache .infinispan .functional .cluster ;
88
9+ import java .util .ArrayList ;
910import java .util .HashSet ;
1011import java .util .Iterator ;
1112import java .util .List ;
1213import java .util .Map ;
1314import java .util .Set ;
15+ import java .util .concurrent .CountDownLatch ;
1416import java .util .concurrent .Phaser ;
1517import java .util .concurrent .TimeUnit ;
1618import java .util .concurrent .TimeoutException ;
1719import java .util .concurrent .atomic .AtomicReference ;
20+ import java .util .function .BiPredicate ;
21+ import java .util .stream .Stream ;
1822
1923import org .hibernate .Session ;
2024import org .hibernate .SessionFactory ;
2125import org .hibernate .cache .infinispan .InfinispanRegionFactory ;
26+ import org .hibernate .cache .infinispan .access .PutFromLoadValidator ;
27+ import org .hibernate .cache .infinispan .util .FutureUpdate ;
2228import org .hibernate .cache .infinispan .util .InfinispanMessageLogger ;
29+ import org .hibernate .cache .spi .access .AccessType ;
2330import org .hibernate .test .cache .infinispan .functional .entities .Contact ;
2431import org .hibernate .test .cache .infinispan .functional .entities .Customer ;
32+ import org .hibernate .test .cache .infinispan .util .ExpectingInterceptor ;
2533import org .hibernate .test .cache .infinispan .util .TestInfinispanRegionFactory ;
2634import org .hibernate .testing .TestForIssue ;
2735import org .infinispan .AdvancedCache ;
2836import org .infinispan .Cache ;
37+ import org .infinispan .commands .VisitableCommand ;
2938import org .infinispan .commands .read .GetKeyValueCommand ;
39+ import org .infinispan .commands .write .PutKeyValueCommand ;
3040import org .infinispan .commons .util .Util ;
3141import org .infinispan .context .InvocationContext ;
3242import org .infinispan .interceptors .base .BaseCustomInterceptor ;
4050import static org .junit .Assert .assertEquals ;
4151import static org .junit .Assert .assertNull ;
4252import static org .junit .Assert .assertTrue ;
53+ import static org .mockito .Matchers .any ;
54+ import static org .mockito .Mockito .doAnswer ;
55+ import static org .mockito .Mockito .spy ;
4356
4457/**
4558 * EntityCollectionInvalidationTestCase.
@@ -54,9 +67,9 @@ public class EntityCollectionInvalidationTest extends DualNodeTest {
5467private static final Integer CUSTOMER_ID = new Integer ( 1 );
5568
5669private EmbeddedCacheManager localManager , remoteManager ;
57- private Cache localCustomerCache , remoteCustomerCache ;
58- private Cache localContactCache , remoteContactCache ;
59- private Cache localCollectionCache , remoteCollectionCache ;
70+ private AdvancedCache localCustomerCache , remoteCustomerCache ;
71+ private AdvancedCache localContactCache , remoteContactCache ;
72+ private AdvancedCache localCollectionCache , remoteCollectionCache ;
6073private MyListener localListener , remoteListener ;
6174private SessionFactory localFactory , remoteFactory ;
6275
@@ -72,19 +85,19 @@ public void startUp() {
7285// Our region factory makes its CacheManager available to us
7386localManager = ClusterAwareRegionFactory .getCacheManager ( DualNodeTest .LOCAL );
7487// Cache localCache = localManager.getCache("entity");
75- localCustomerCache = localManager .getCache ( Customer .class .getName () );
76- localContactCache = localManager .getCache ( Contact .class .getName () );
77- localCollectionCache = localManager .getCache ( Customer .class .getName () + ".contacts" );
88+ localCustomerCache = localManager .getCache ( Customer .class .getName () ). getAdvancedCache () ;
89+ localContactCache = localManager .getCache ( Contact .class .getName () ). getAdvancedCache () ;
90+ localCollectionCache = localManager .getCache ( Customer .class .getName () + ".contacts" ). getAdvancedCache () ;
7891localListener = new MyListener ( "local" );
7992localCustomerCache .addListener ( localListener );
8093localContactCache .addListener ( localListener );
8194localCollectionCache .addListener ( localListener );
8295
8396// Bind a listener to the "remote" cache
8497remoteManager = ClusterAwareRegionFactory .getCacheManager ( DualNodeTest .REMOTE );
85- remoteCustomerCache = remoteManager .getCache ( Customer .class .getName () );
86- remoteContactCache = remoteManager .getCache ( Contact .class .getName () );
87- remoteCollectionCache = remoteManager .getCache ( Customer .class .getName () + ".contacts" );
98+ remoteCustomerCache = remoteManager .getCache ( Customer .class .getName () ). getAdvancedCache () ;
99+ remoteContactCache = remoteManager .getCache ( Contact .class .getName () ). getAdvancedCache () ;
100+ remoteCollectionCache = remoteManager .getCache ( Customer .class .getName () + ".contacts" ). getAdvancedCache () ;
88101remoteListener = new MyListener ( "remote" );
89102remoteCustomerCache .addListener ( remoteListener );
90103remoteContactCache .addListener ( remoteListener );
@@ -161,12 +174,24 @@ public void testAll() throws Exception {
161174
162175// Modify customer in remote
163176remoteListener .clear ();
177+
178+ CountDownLatch modifyLatch = null ;
179+ if (!cacheMode .isInvalidation () && accessType != AccessType .NONSTRICT_READ_WRITE ) {
180+ modifyLatch = new CountDownLatch (1 );
181+ ExpectingInterceptor .get (localCustomerCache ).when (this ::isFutureUpdate ).countDown (modifyLatch );
182+ }
183+
164184ids = modifyCustomer ( ids .customerId , remoteFactory );
165185sleep ( 250 );
166186assertLoadedFromCache ( remoteListener , ids .customerId , ids .contactIds );
167187
188+ if (modifyLatch != null ) {
189+ assertTrue (modifyLatch .await (2 , TimeUnit .SECONDS ));
190+ ExpectingInterceptor .cleanup (localCustomerCache );
191+ }
192+
168193assertEquals ( 0 , localCollectionCache .size () );
169- if (localCustomerCache . getCacheConfiguration (). clustering (). cacheMode () .isInvalidation ()) {
194+ if (cacheMode .isInvalidation ()) {
170195// After modification, local cache should have been invalidated and hence should be empty
171196assertEquals (0 , localCustomerCache .size ());
172197} else {
@@ -277,8 +302,32 @@ private IdContainer createCustomer(SessionFactory sessionFactory)
277302
278303customer .setContacts (contacts );
279304
305+ ArrayList <Runnable > cleanup = new ArrayList <>();
306+ CountDownLatch customerLatch = new CountDownLatch (1 );
307+ CountDownLatch collectionLatch = new CountDownLatch (1 );
308+ CountDownLatch contactsLatch = new CountDownLatch (2 );
309+
310+ if (cacheMode .isInvalidation ()) {
311+ cleanup .add (mockValidator (remoteCustomerCache , customerLatch ));
312+ cleanup .add (mockValidator (remoteCollectionCache , collectionLatch ));
313+ cleanup .add (mockValidator (remoteContactCache , contactsLatch ));
314+ } else if (accessType == AccessType .NONSTRICT_READ_WRITE ) {
315+ // ATM nonstrict mode has sync after-invalidation update
316+ Stream .of (customerLatch , collectionLatch , contactsLatch , contactsLatch ).forEach (l -> l .countDown ());
317+ } else {
318+ ExpectingInterceptor .get (remoteCustomerCache ).when (this ::isFutureUpdate ).countDown (collectionLatch );
319+ ExpectingInterceptor .get (remoteCollectionCache ).when (this ::isFutureUpdate ).countDown (customerLatch );
320+ ExpectingInterceptor .get (remoteContactCache ).when (this ::isFutureUpdate ).countDown (contactsLatch );
321+ cleanup .add (() -> ExpectingInterceptor .cleanup (remoteCustomerCache , remoteCollectionCache , remoteContactCache ));
322+ }
323+
280324withTxSession (sessionFactory , session -> session .save (customer ));
281325
326+ assertTrue (customerLatch .await (2 , TimeUnit .SECONDS ));
327+ assertTrue (collectionLatch .await (2 , TimeUnit .SECONDS ));
328+ assertTrue (contactsLatch .await (2 , TimeUnit .SECONDS ));
329+ cleanup .forEach (Runnable ::run );
330+
282331IdContainer ids = new IdContainer ();
283332ids .customerId = customer .getId ();
284333Set contactIds = new HashSet ();
@@ -290,6 +339,27 @@ private IdContainer createCustomer(SessionFactory sessionFactory)
290339return ids ;
291340}
292341
342+ private boolean isFutureUpdate (InvocationContext ctx , VisitableCommand cmd ) {
343+ return cmd instanceof PutKeyValueCommand && ((PutKeyValueCommand ) cmd ).getValue () instanceof FutureUpdate ;
344+ }
345+
346+ private Runnable mockValidator (AdvancedCache cache , CountDownLatch latch ) {
347+ PutFromLoadValidator originalValidator = PutFromLoadValidator .removeFromCache (cache );
348+ PutFromLoadValidator mockValidator = spy (originalValidator );
349+ doAnswer (invocation -> {
350+ try {
351+ return invocation .callRealMethod ();
352+ } finally {
353+ latch .countDown ();
354+ }
355+ }).when (mockValidator ).endInvalidatingKey (any (), any ());
356+ PutFromLoadValidator .addToCache (cache , mockValidator );
357+ return () -> {
358+ PutFromLoadValidator .removeFromCache (cache );
359+ PutFromLoadValidator .addToCache (cache , originalValidator );
360+ };
361+ }
362+
293363private Customer getCustomer (Integer id , SessionFactory sessionFactory ) throws Exception {
294364log .debug ( "Find customer with id=" + id );
295365return withTxSessionApply (sessionFactory , session -> doGetCustomer (id , session ));
0 commit comments