Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ implementation 'com.google.cloud:google-cloud-bigtable'
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-bigtable:2.27.0'
implementation 'com.google.cloud:google-cloud-bigtable:2.27.1'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-bigtable" % "2.27.0"
libraryDependencies += "com.google.cloud" % "google-cloud-bigtable" % "2.27.1"
```
<!-- {x-version-update-end} -->

Expand Down Expand Up @@ -609,7 +609,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-bigtable/java11.html
[stability-image]: https://img.shields.io/badge/stability-stable-green
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-bigtable.svg
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigtable/2.27.0
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigtable/2.27.1
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.rpc.Code;
import java.util.List;
Expand Down Expand Up @@ -263,9 +264,12 @@ private void handleAttemptSuccess(List<MutateRowsResponse> responses) {

Builder builder = lastRequest.toBuilder().clearEntries();
List<Integer> newOriginalIndexes = Lists.newArrayList();
boolean[] seenIndices = new boolean[currentRequest.getEntriesCount()];

for (MutateRowsResponse response : responses) {
for (Entry entry : response.getEntriesList()) {
seenIndices[Ints.checkedCast(entry.getIndex())] = true;

if (entry.getStatus().getCode() == Code.OK_VALUE) {
continue;
}
Expand All @@ -288,6 +292,26 @@ private void handleAttemptSuccess(List<MutateRowsResponse> responses) {
}
}

// Handle missing mutations
for (int i = 0; i < seenIndices.length; i++) {
if (seenIndices[i]) {
continue;
}

int origIndex = getOriginalIndex(i);
FailedMutation failedMutation =
FailedMutation.create(
origIndex,
ApiExceptionFactory.createException(
"Missing entry response for entry " + origIndex,
null,
GrpcStatusCode.of(io.grpc.Status.Code.INTERNAL),
false));

allFailures.add(failedMutation);
permanentFailures.add(failedMutation);
}

currentRequest = builder.build();
originalIndexes = newOriginalIndexes;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,11 @@ public void mutateRow(MutateRowRequest request, StreamObserver<MutateRowResponse

@Override
public void mutateRows(MutateRowsRequest request, StreamObserver<MutateRowsResponse> observer) {
observer.onNext(MutateRowsResponse.getDefaultInstance());
MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder();
for (int i = 0; i < request.getEntriesCount(); i++) {
builder.addEntries(MutateRowsResponse.Entry.newBuilder().setIndex(i));
}
observer.onNext(builder.build());
observer.onCompleted();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,11 @@ public void mutateRows(
Thread.sleep(SERVER_LATENCY);
} catch (InterruptedException e) {
}
responseObserver.onNext(MutateRowsResponse.getDefaultInstance());
MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder();
for (int i = 0; i < request.getEntriesCount(); i++) {
builder.addEntriesBuilder().setIndex(i);
}
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,10 +422,15 @@ public void testBatchMutateRowsThrottledTime() throws Exception {
new Answer() {
@Override
public Object answer(InvocationOnMock invocation) {
MutateRowsRequest request = (MutateRowsRequest) invocation.getArguments()[0];
@SuppressWarnings("unchecked")
StreamObserver<MutateRowsResponse> observer =
(StreamObserver<MutateRowsResponse>) invocation.getArguments()[1];
observer.onNext(MutateRowsResponse.getDefaultInstance());
MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder();
for (int i = 0; i < request.getEntriesCount(); i++) {
builder.addEntriesBuilder().setIndex(i);
}
observer.onNext(builder.build());
observer.onCompleted();
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,11 @@ public void mutateRows(MutateRowsRequest request, StreamObserver<MutateRowsRespo
observer.onError(new StatusRuntimeException(Status.UNAVAILABLE));
return;
}
observer.onNext(MutateRowsResponse.getDefaultInstance());
MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder();
for (int i = 0; i < request.getEntriesCount(); i++) {
builder.addEntriesBuilder().setIndex(i);
}
observer.onNext(builder.build());
observer.onCompleted();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -92,6 +94,37 @@ public void singleEntrySuccessTest() throws Exception {
assertThat(innerCallable.lastRequest).isEqualTo(request);
}

@Test
public void missingEntry() {
MutateRowsRequest request =
MutateRowsRequest.newBuilder()
.addEntries(Entry.getDefaultInstance())
.addEntries(Entry.getDefaultInstance())
.build();
innerCallable.response.add(
MutateRowsResponse.newBuilder()
.addEntries(MutateRowsResponse.Entry.newBuilder().setIndex(0))
.build());

MutateRowsAttemptCallable attemptCallable =
new MutateRowsAttemptCallable(innerCallable, request, callContext, retryCodes);
attemptCallable.setExternalFuture(parentFuture);
attemptCallable.call();

ExecutionException executionException =
Assert.assertThrows(ExecutionException.class, () -> parentFuture.attemptFuture.get());
assertThat(executionException).hasCauseThat().isInstanceOf(MutateRowsException.class);
MutateRowsException e = (MutateRowsException) executionException.getCause();

assertThat(e).hasMessageThat().contains("Some mutations failed to apply");
assertThat(e.getFailedMutations()).hasSize(1);
FailedMutation failedMutation = e.getFailedMutations().get(0);
assertThat(failedMutation.getIndex()).isEqualTo(1);
assertThat(failedMutation.getError())
.hasMessageThat()
.contains("Missing entry response for entry 1");
}

@Test
public void testNoRpcTimeout() {
parentFuture.timedAttemptSettings =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ public void mutateRows(
MutateRowsRequest request, StreamObserver<MutateRowsResponse> responseObserver) {
attemptCounter.incrementAndGet();
if (expectations.isEmpty()) {
responseObserver.onNext(MutateRowsResponse.getDefaultInstance());
MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder();
for (int i = 0; i < request.getEntriesCount(); i++) {
builder.addEntriesBuilder().setIndex(i);
}
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
} else {
Exception expectedRpc = expectations.poll();
Expand Down