Skip to content

Commit 1d869c8

Browse files
feat: add logical termination for RunQueryResponse (#956)
* feat: add logical termination for RunQueryResponse * Add one test and comments * Change code based on feedback * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * add comments * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent b496376 commit 1d869c8

File tree

3 files changed

+136
-0
lines changed

3 files changed

+136
-0
lines changed

google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1330,6 +1330,8 @@ public void stream(@Nonnull final ApiStreamObserver<DocumentSnapshot> responseOb
13301330

13311331
internalStream(
13321332
new QuerySnapshotObserver() {
1333+
boolean hasCompleted = false;
1334+
13331335
@Override
13341336
public void onNext(QueryDocumentSnapshot documentSnapshot) {
13351337
responseObserver.onNext(documentSnapshot);
@@ -1342,6 +1344,8 @@ public void onError(Throwable throwable) {
13421344

13431345
@Override
13441346
public void onCompleted() {
1347+
if (hasCompleted) return;
1348+
hasCompleted = true;
13451349
responseObserver.onCompleted();
13461350
}
13471351
},
@@ -1537,6 +1541,16 @@ public void onResponse(RunQueryResponse response) {
15371541
if (readTime == null) {
15381542
readTime = Timestamp.fromProto(response.getReadTime());
15391543
}
1544+
1545+
if (response.hasDone() && response.getDone()) {
1546+
Tracing.getTracer()
1547+
.getCurrentSpan()
1548+
.addAnnotation(
1549+
"Firestore.Query: Completed",
1550+
ImmutableMap.of(
1551+
"numDocuments", AttributeValue.longAttributeValue(numDocuments)));
1552+
documentObserver.onCompleted(readTime);
1553+
}
15401554
}
15411555

15421556
@Override
@@ -1640,6 +1654,9 @@ ApiFuture<QuerySnapshot> get(@Nullable ByteString transactionId) {
16401654
internalStream(
16411655
new QuerySnapshotObserver() {
16421656
final List<QueryDocumentSnapshot> documentSnapshots = new ArrayList<>();
1657+
// The stream's onCompleted could be called more than once,
1658+
// this flag makes sure only the first one is actually processed.
1659+
boolean hasCompleted = false;
16431660

16441661
@Override
16451662
public void onNext(QueryDocumentSnapshot documentSnapshot) {
@@ -1653,6 +1670,9 @@ public void onError(Throwable throwable) {
16531670

16541671
@Override
16551672
public void onCompleted() {
1673+
if (hasCompleted) return;
1674+
hasCompleted = true;
1675+
16561676
// The results for limitToLast queries need to be flipped since we reversed the
16571677
// ordering constraints before sending the query to the backend.
16581678
List<QueryDocumentSnapshot> resultView =

google-cloud-firestore/src/test/java/com/google/cloud/firestore/LocalFirestoreHelper.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,29 @@ public static Answer<RunQueryResponse> queryResponse(
311311
return streamingResponse(responses, throwable);
312312
}
313313

314+
/** Returns a stream of responses when RunQueryResponse.done set to true */
315+
public static Answer<RunQueryResponse> queryResponseWithDone(
316+
boolean callWithoutOnComplete, String... documentNames) {
317+
RunQueryResponse[] responses = new RunQueryResponse[documentNames.length];
318+
319+
for (int i = 0; i < documentNames.length; ++i) {
320+
final RunQueryResponse.Builder runQueryResponse = RunQueryResponse.newBuilder();
321+
runQueryResponse.setDocument(
322+
Document.newBuilder().setName(documentNames[i]).putAllFields(SINGLE_FIELD_PROTO));
323+
runQueryResponse.setReadTime(
324+
com.google.protobuf.Timestamp.newBuilder().setSeconds(1).setNanos(2));
325+
if (i == (documentNames.length - 1)) {
326+
runQueryResponse.setDone(true);
327+
}
328+
responses[i] = runQueryResponse.build();
329+
}
330+
if (callWithoutOnComplete) {
331+
return streamingResponseWithoutOnComplete(responses);
332+
} else {
333+
return streamingResponse(responses, null);
334+
}
335+
}
336+
314337
/** Returns a stream of responses followed by an optional exception. */
315338
public static <T> Answer<T> streamingResponse(
316339
final T[] response, @Nullable final Throwable throwable) {
@@ -328,6 +351,18 @@ public static <T> Answer<T> streamingResponse(
328351
};
329352
}
330353

354+
/** Returns a stream of responses even though onComplete() wasn't triggered */
355+
public static <T> Answer<T> streamingResponseWithoutOnComplete(final T[] response) {
356+
return invocation -> {
357+
Object[] args = invocation.getArguments();
358+
ResponseObserver<T> observer = (ResponseObserver<T>) args[1];
359+
for (T resp : response) {
360+
observer.onResponse(resp);
361+
}
362+
return null;
363+
};
364+
}
365+
331366
public static ApiFuture<CommitResponse> commitResponse(int adds, int deletes) {
332367
CommitResponse.Builder commitResponse = CommitResponse.newBuilder();
333368
commitResponse.getCommitTimeBuilder().setSeconds(0).setNanos(0);

google-cloud-firestore/src/test/java/com/google/cloud/firestore/QueryTest.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import static com.google.cloud.firestore.LocalFirestoreHelper.order;
2828
import static com.google.cloud.firestore.LocalFirestoreHelper.query;
2929
import static com.google.cloud.firestore.LocalFirestoreHelper.queryResponse;
30+
import static com.google.cloud.firestore.LocalFirestoreHelper.queryResponseWithDone;
3031
import static com.google.cloud.firestore.LocalFirestoreHelper.reference;
3132
import static com.google.cloud.firestore.LocalFirestoreHelper.select;
3233
import static com.google.cloud.firestore.LocalFirestoreHelper.startAt;
@@ -952,6 +953,86 @@ public void onCompleted() {
952953
semaphore.acquire();
953954
}
954955

956+
@Test
957+
public void successfulReturnWithoutOnComplete() throws Exception {
958+
doAnswer(
959+
queryResponseWithDone(
960+
/* callWithoutOnComplete */ true, DOCUMENT_NAME + "1", DOCUMENT_NAME + "2"))
961+
.when(firestoreMock)
962+
.streamRequest(
963+
runQuery.capture(),
964+
streamObserverCapture.capture(),
965+
Matchers.<ServerStreamingCallable>any());
966+
967+
final Semaphore semaphore = new Semaphore(0);
968+
final Iterator<String> iterator = Arrays.asList("doc1", "doc2").iterator();
969+
970+
query.stream(
971+
new ApiStreamObserver<DocumentSnapshot>() {
972+
@Override
973+
public void onNext(DocumentSnapshot documentSnapshot) {
974+
assertEquals(iterator.next(), documentSnapshot.getId());
975+
}
976+
977+
@Override
978+
public void onError(Throwable throwable) {
979+
fail();
980+
}
981+
982+
@Override
983+
public void onCompleted() {
984+
semaphore.release();
985+
}
986+
});
987+
988+
semaphore.acquire();
989+
}
990+
991+
@Test
992+
/**
993+
* onComplete() will be called twice. The first time is when it detects RunQueryResponse.done set
994+
* to true. The second time is when it receives half close
995+
*/
996+
public void successfulReturnCallsOnCompleteTwice() throws Exception {
997+
doAnswer(
998+
queryResponseWithDone(
999+
/* callWithoutOnComplete */ false, DOCUMENT_NAME + "1", DOCUMENT_NAME + "2"))
1000+
.when(firestoreMock)
1001+
.streamRequest(
1002+
runQuery.capture(),
1003+
streamObserverCapture.capture(),
1004+
Matchers.<ServerStreamingCallable>any());
1005+
1006+
final Semaphore semaphore = new Semaphore(0);
1007+
final Iterator<String> iterator = Arrays.asList("doc1", "doc2").iterator();
1008+
final int[] counter = {0};
1009+
1010+
query.stream(
1011+
new ApiStreamObserver<DocumentSnapshot>() {
1012+
@Override
1013+
public void onNext(DocumentSnapshot documentSnapshot) {
1014+
assertEquals(iterator.next(), documentSnapshot.getId());
1015+
}
1016+
1017+
@Override
1018+
public void onError(Throwable throwable) {
1019+
fail();
1020+
}
1021+
1022+
@Override
1023+
public void onCompleted() {
1024+
counter[0]++;
1025+
semaphore.release();
1026+
}
1027+
});
1028+
1029+
semaphore.acquire();
1030+
1031+
// Wait for some time to see whether onCompleted() has been called more than once
1032+
Thread.sleep(200);
1033+
assertEquals(1, counter[0]);
1034+
}
1035+
9551036
@Test
9561037
public void retriesAfterRetryableError() throws Exception {
9571038
final boolean[] returnError = new boolean[] {true};

0 commit comments

Comments
 (0)