Skip to content

Commit 4b0b500

Browse files
authored
fix: close buffers used by Converter after use (#434)
* fix: close buffers used by Converter after use * test: add more tests * test: remove buggy test
1 parent cf6d007 commit 4b0b500

File tree

2 files changed

+58
-46
lines changed

2 files changed

+58
-46
lines changed

src/main/java/com/google/cloud/spanner/pgadapter/utils/Converter.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import java.io.IOException;
3838

3939
/** Utility class for converting between generic PostgreSQL conversions. */
40-
public class Converter {
40+
public class Converter implements AutoCloseable {
4141
private final ByteArrayOutputStream buffer = new ByteArrayOutputStream(256);
4242
private final DataOutputStream outputStream = new DataOutputStream(buffer);
4343
private final IntermediateStatement statement;
@@ -56,6 +56,12 @@ public Converter(
5656
this.resultSet = resultSet;
5757
}
5858

59+
@Override
60+
public void close() throws Exception {
61+
buffer.close();
62+
outputStream.close();
63+
}
64+
5965
public ResultSet getResultSet() {
6066
return resultSet;
6167
}

src/main/java/com/google/cloud/spanner/pgadapter/wireprotocol/ControlMessage.java

Lines changed: 51 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -465,59 +465,65 @@ private SendResultSetRunnable(
465465

466466
@Override
467467
public Long call() throws Exception {
468-
if (resultSet == null && batchReadOnlyTransaction != null && partition != null) {
469-
// Note: It is OK not to close this result set, as the underlying transaction and session
470-
// will be cleaned up at a later moment.
471-
try {
472-
resultSet = batchReadOnlyTransaction.execute(partition);
473-
converter =
474-
new Converter(
475-
describedResult,
476-
mode,
477-
describedResult.getConnectionHandler().getServer().getOptions(),
478-
resultSet);
479-
hasData = resultSet.next();
480-
} catch (Throwable t) {
481-
if (includePrefix) {
482-
synchronized (describedResult) {
483-
prefixSent.setException(t);
468+
try {
469+
if (resultSet == null && batchReadOnlyTransaction != null && partition != null) {
470+
// Note: It is OK not to close this result set, as the underlying transaction and session
471+
// will be cleaned up at a later moment.
472+
try {
473+
resultSet = batchReadOnlyTransaction.execute(partition);
474+
converter =
475+
new Converter(
476+
describedResult,
477+
mode,
478+
describedResult.getConnectionHandler().getServer().getOptions(),
479+
resultSet);
480+
hasData = resultSet.next();
481+
} catch (Throwable t) {
482+
if (includePrefix) {
483+
synchronized (describedResult) {
484+
prefixSent.setException(t);
485+
}
484486
}
487+
throw t;
485488
}
486-
throw t;
487489
}
488-
}
489-
if (includePrefix) {
490-
try {
491-
for (WireOutput prefix : describedResult.createResultPrefix(resultSet)) {
492-
prefix.send(false);
490+
if (includePrefix) {
491+
try {
492+
for (WireOutput prefix : describedResult.createResultPrefix(resultSet)) {
493+
prefix.send(false);
494+
}
495+
prefixSent.set(true);
496+
} catch (Throwable t) {
497+
prefixSent.setException(t);
498+
throw t;
493499
}
494-
prefixSent.set(true);
495-
} catch (Throwable t) {
496-
prefixSent.setException(t);
497-
throw t;
498-
}
499-
}
500-
// Wait until the prefix (if any) has been sent.
501-
prefixSent.get();
502-
long rows = 0L;
503-
while (hasData) {
504-
if (Thread.interrupted()) {
505-
throw PGExceptionFactory.newQueryCancelledException();
506-
}
507-
WireOutput wireOutput = describedResult.createDataRowResponse(converter);
508-
synchronized (describedResult) {
509-
wireOutput.send(false);
510500
}
511-
rows++;
512-
hasData = resultSet.next();
513-
if (rows % 1000 == 0) {
514-
logger.log(Level.INFO, String.format("Sent %d rows", rows));
501+
// Wait until the prefix (if any) has been sent.
502+
prefixSent.get();
503+
long rows = 0L;
504+
while (hasData) {
505+
if (Thread.interrupted()) {
506+
throw PGExceptionFactory.newQueryCancelledException();
507+
}
508+
WireOutput wireOutput = describedResult.createDataRowResponse(converter);
509+
synchronized (describedResult) {
510+
wireOutput.send(false);
511+
}
512+
rows++;
513+
hasData = resultSet.next();
514+
if (rows % 1000 == 0) {
515+
logger.log(Level.INFO, String.format("Sent %d rows", rows));
516+
}
517+
if (rows == maxRows) {
518+
break;
519+
}
515520
}
516-
if (rows == maxRows) {
517-
break;
521+
return rows;
522+
} finally {
523+
if (converter != null) {
524+
converter.close();
518525
}
519526
}
520-
return rows;
521527
}
522528
}
523529
}

0 commit comments

Comments
 (0)