Skip to content

Commit 98f5b33

Browse files
authored
Add mutable change stream in IntegrateionTestEnv.java for postgresql (#38057)
* Add mutable change stream in IntegrateionTestEnv.java for postgresql * Retriggering checks * Fix tests
1 parent f3bd329 commit 98f5b33

4 files changed

Lines changed: 35 additions & 12 deletions

File tree

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/IntegrationTestEnv.java

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ public class IntegrationTestEnv extends ExternalResource {
6464
private DatabaseAdminClient databaseAdminClient;
6565
private DatabaseClient databaseClient;
6666
private boolean isPostgres;
67-
private boolean isPlacementTableBasedChangeStream;
67+
private boolean isMutableChangeStream;
68+
private boolean isPlacementTable;
6869
public boolean useSeparateMetadataDb;
6970

7071
@Override
@@ -100,13 +101,18 @@ protected void before() throws Throwable {
100101

101102
IntegrationTestEnv() {
102103
this.isPostgres = false;
103-
this.isPlacementTableBasedChangeStream = false;
104+
this.isMutableChangeStream = false;
105+
this.isPlacementTable = false;
104106
}
105107

106108
IntegrationTestEnv(
107-
boolean isPostgres, boolean isPlacementTableBasedChangeStream, Optional<String> host) {
109+
boolean isPostgres,
110+
boolean isMutableChangeStream,
111+
boolean isPlacementTable,
112+
Optional<String> host) {
108113
this.isPostgres = isPostgres;
109-
this.isPlacementTableBasedChangeStream = isPlacementTableBasedChangeStream;
114+
this.isMutableChangeStream = isMutableChangeStream;
115+
this.isPlacementTable = isPlacementTable;
110116
if (host.isPresent()) {
111117
this.host = host.get();
112118
}
@@ -155,7 +161,7 @@ protected void after() {
155161
.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
156162
}
157163
} catch (Exception e) {
158-
if (isPlacementTableBasedChangeStream) {
164+
if (isPlacementTable) {
159165
// Drop placement table requires all rows deleted and garbage collected.
160166
LOG.info("Failed to drop table {}. Skipping...", table, e);
161167
} else {
@@ -214,7 +220,7 @@ String createSingersTable() throws InterruptedException, ExecutionException, Tim
214220
}
215221

216222
String createGSQLTableDDL(String tableName) {
217-
if (this.isPlacementTableBasedChangeStream) {
223+
if (this.isPlacementTable) {
218224
// create a placement table.
219225
return "CREATE TABLE "
220226
+ tableName
@@ -245,8 +251,7 @@ String createChangeStreamFor(String tableName)
245251
.updateDatabaseDdl(
246252
instanceId,
247253
databaseId,
248-
Collections.singletonList(
249-
"CREATE CHANGE STREAM \"" + changeStreamName + "\" FOR \"" + tableName + "\""),
254+
Collections.singletonList(createPostgresChangeStreamDDL(changeStreamName, tableName)),
250255
null)
251256
.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
252257
} else {
@@ -263,7 +268,7 @@ String createChangeStreamFor(String tableName)
263268
}
264269

265270
String createGSQLChangeStreamDDL(String changeStreamName, String tableName) {
266-
if (this.isPlacementTableBasedChangeStream) {
271+
if (this.isMutableChangeStream) {
267272
// Create a MUTABLE_KEY_RANGE change stream.
268273
String statement =
269274
"CREATE CHANGE STREAM "
@@ -276,6 +281,21 @@ String createGSQLChangeStreamDDL(String changeStreamName, String tableName) {
276281
return "CREATE CHANGE STREAM " + changeStreamName + " FOR " + tableName;
277282
}
278283

284+
String createPostgresChangeStreamDDL(String changeStreamName, String tableName) {
285+
if (this.isMutableChangeStream) {
286+
// Create a MUTABLE_KEY_RANGE change stream.
287+
String statement =
288+
"CREATE CHANGE STREAM \""
289+
+ changeStreamName
290+
+ "\" FOR \""
291+
+ tableName
292+
+ "\""
293+
+ " WITH (partition_mode = 'MUTABLE_KEY_RANGE')";
294+
return statement;
295+
}
296+
return "CREATE CHANGE STREAM \"" + changeStreamName + "\" FOR \"" + tableName + "\"";
297+
}
298+
279299
void createRoleAndGrantPrivileges(String table, String changeStream)
280300
throws InterruptedException, ExecutionException, TimeoutException {
281301
if (this.isPostgres) {

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPlacementTableIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ public class SpannerChangeStreamPlacementTableIT {
7171
public static final IntegrationTestEnv ENV =
7272
new IntegrationTestEnv(
7373
/*isPostgres=*/ false,
74-
/*isPlacementTableBasedChangeStream=*/ true,
74+
/*isMutableChangeStream=*/ true,
75+
/*isPlacementTable=*/ true,
7576
/*host=*/ Optional.empty());
7677

7778
@Rule public final transient TestPipeline pipeline = TestPipeline.create();

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPlacementTablePostgresIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ public class SpannerChangeStreamPlacementTablePostgresIT {
6565
public static final IntegrationTestEnv ENV =
6666
new IntegrationTestEnv(
6767
/*isPostgres=*/ true,
68-
/*isPlacementTableBasedChangeStream=*/ true,
68+
/*isMutableChangeStream=*/ true,
69+
/*isPlacementTable=*/ true,
6970
/*host=*/ Optional.empty());
7071

7172
@Rule public final transient TestPipeline pipeline = TestPipeline.create();

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPostgresIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ public class SpannerChangeStreamPostgresIT {
6565
public static final IntegrationTestEnv ENV =
6666
new IntegrationTestEnv(
6767
/*isPostgres=*/ true,
68-
/*isPlacementTableBasedChangeStream=*/ false,
68+
/*isMutableChangeStream=*/ false,
69+
/*isPlacementTable=*/ false,
6970
/*host=*/ Optional.empty());
7071

7172
@Rule public final transient TestPipeline pipeline = TestPipeline.create();

0 commit comments

Comments
 (0)