From 8743f0315d1435781732797ef28d4670a7be98f8 Mon Sep 17 00:00:00 2001 From: Ronja Quensel Date: Mon, 11 May 2026 15:47:05 +0200 Subject: [PATCH 1/9] feat: SQL stores --- .../port/exception/PersistenceException.java | 16 +++ .../port/store/AbstractSqlStore.java | 33 +++++ .../port/store/SqlControlPlaneStore.java | 95 +++++++++++++++ .../port/store/SqlDataFlowStore.java | 114 ++++++++++++++++++ .../resources/sql/controlplane_schema.sql | 8 ++ src/main/resources/sql/data_flow_schema.sql | 23 ++++ 6 files changed, 289 insertions(+) create mode 100644 src/main/java/org/eclipse/dataplane/port/exception/PersistenceException.java create mode 100644 src/main/java/org/eclipse/dataplane/port/store/AbstractSqlStore.java create mode 100644 src/main/java/org/eclipse/dataplane/port/store/SqlControlPlaneStore.java create mode 100644 src/main/java/org/eclipse/dataplane/port/store/SqlDataFlowStore.java create mode 100644 src/main/resources/sql/controlplane_schema.sql create mode 100644 src/main/resources/sql/data_flow_schema.sql diff --git a/src/main/java/org/eclipse/dataplane/port/exception/PersistenceException.java b/src/main/java/org/eclipse/dataplane/port/exception/PersistenceException.java new file mode 100644 index 0000000..34e80a4 --- /dev/null +++ b/src/main/java/org/eclipse/dataplane/port/exception/PersistenceException.java @@ -0,0 +1,16 @@ +package org.eclipse.dataplane.port.exception; + +/** + * Indicates an error during database interactions, i.e. an error occurred persisting, reading or + * deleting an entry. + */ +public class PersistenceException extends RuntimeException { + + public PersistenceException(String message) { + super(message); + } + + public PersistenceException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/org/eclipse/dataplane/port/store/AbstractSqlStore.java b/src/main/java/org/eclipse/dataplane/port/store/AbstractSqlStore.java new file mode 100644 index 0000000..d2dff6b --- /dev/null +++ b/src/main/java/org/eclipse/dataplane/port/store/AbstractSqlStore.java @@ -0,0 +1,33 @@ +package org.eclipse.dataplane.port.store; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.dataplane.port.exception.PersistenceException; + +import java.sql.Connection; +import java.sql.DriverManager; + +import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; + +public abstract class AbstractSqlStore { + + protected final ObjectMapper objectMapper = new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false); + + private final String databaseUrl; + private final String databaseUsername; + private final String databasePassword; + + public AbstractSqlStore(String databaseUrl, String databaseUsername, String databasePassword) { + this.databaseUrl = databaseUrl; + this.databaseUsername = databaseUsername; + this.databasePassword = databasePassword; + } + + protected Connection getConnection() { + try { + return DriverManager.getConnection(databaseUrl, databaseUsername, databasePassword); + } catch (Exception e) { + throw new PersistenceException("Failed to connect to database.", e); + } + } + +} diff --git a/src/main/java/org/eclipse/dataplane/port/store/SqlControlPlaneStore.java b/src/main/java/org/eclipse/dataplane/port/store/SqlControlPlaneStore.java new file mode 100644 index 0000000..0e52d72 --- /dev/null +++ b/src/main/java/org/eclipse/dataplane/port/store/SqlControlPlaneStore.java @@ -0,0 +1,95 @@ +package org.eclipse.dataplane.port.store; + +import org.eclipse.dataplane.domain.Result; +import org.eclipse.dataplane.domain.controlplane.ControlPlane; +import org.eclipse.dataplane.domain.registration.AuthorizationProfile; +import org.eclipse.dataplane.port.exception.PersistenceException; +import org.eclipse.dataplane.port.exception.ResourceNotFoundException; + +import java.net.URI; + +import static java.lang.String.format; + +public class SqlControlPlaneStore extends AbstractSqlStore implements ControlPlaneStore { + + public SqlControlPlaneStore(String databaseUrl, String databaseUsername, String databasePassword) { + super(databaseUrl, databaseUsername, databasePassword); + } + + @Override + public Result save(ControlPlane controlPlane) { + var connection = getConnection(); + + var sql = "INSERT INTO control_planes (id, endpoint, authorization) VALUES (?, ?, ?::json)" + + " ON CONFLICT (id) DO UPDATE SET" + + " endpoint = EXCLUDED.endpoint," + + " authorization = EXCLUDED.authorization"; + + try (var statement = connection.prepareStatement(sql)) { + statement.setString(1, controlPlane.getId()); + statement.setString(2, controlPlane.getEndpoint().toString()); + statement.setString(3, objectMapper.writeValueAsString(controlPlane.getAuthorization())); + + statement.executeUpdate(); + return Result.success(); + } catch (Exception e) { + return Result.failure(new PersistenceException(format("Failed to persist ControlPlane with ID %s.", controlPlane.getId()), e)); + } + } + + @Override + public Result findById(String controlplaneId) { + var connection = getConnection(); + + var sql = "SELECT * FROM control_planes WHERE id = ?"; + + try (var statement = connection.prepareStatement(sql)) { + statement.setString(1, controlplaneId); + var resultSet = statement.executeQuery(); + + if (!resultSet.next()) { + return Result.failure(new ResourceNotFoundException(format("ControlPlane with id %s not found.", controlplaneId))); + } + + var controlplane = ControlPlane.newInstance() + .id(controlplaneId) + .endpoint(URI.create(resultSet.getString("endpoint"))) + .authorization(objectMapper.readValue(resultSet.getString("authorization"), AuthorizationProfile.class)) + .build(); + return Result.success(controlplane); + } catch (Exception e) { + return Result.failure(new PersistenceException(format("Failed to read ControlPlane with ID %s.", controlplaneId), e)); + } + } + + @Override + public Result delete(String id) { + var connection = getConnection(); + + var sql = "DELETE FROM control_planes WHERE id = ?"; + + try (var statement = connection.prepareStatement(sql)) { + statement.setString(1, id); + statement.executeUpdate(); + return Result.success(); + } catch (Exception e) { + return Result.failure(new PersistenceException(format("Failed to delete ControlPlane with ID %s.", id), e)); + } + } + + @Override + public boolean exists(String controlplaneId) { + var connection = getConnection(); + + var sql = "SELECT COUNT(*) FROM control_planes WHERE id = ?"; + + try (var statement = connection.prepareStatement(sql)) { + statement.setString(1, controlplaneId); + var resultSet = statement.executeQuery(); + return resultSet.getInt(1) > 0; + } catch (Exception e) { + throw new PersistenceException(format("Failed to check for existence of ControlPlane with ID %s.", controlplaneId), e); + } + } + +} diff --git a/src/main/java/org/eclipse/dataplane/port/store/SqlDataFlowStore.java b/src/main/java/org/eclipse/dataplane/port/store/SqlDataFlowStore.java new file mode 100644 index 0000000..29beb46 --- /dev/null +++ b/src/main/java/org/eclipse/dataplane/port/store/SqlDataFlowStore.java @@ -0,0 +1,114 @@ +package org.eclipse.dataplane.port.store; + +import com.fasterxml.jackson.core.type.TypeReference; +import org.eclipse.dataplane.domain.DataAddress; +import org.eclipse.dataplane.domain.Result; +import org.eclipse.dataplane.domain.dataflow.DataFlow; +import org.eclipse.dataplane.port.exception.PersistenceException; +import org.eclipse.dataplane.port.exception.ResourceNotFoundException; + +import java.net.URI; + +import static java.lang.String.format; + +public class SqlDataFlowStore extends AbstractSqlStore implements DataFlowStore { + + public SqlDataFlowStore(String databaseUrl, String databaseUsername, String databasePassword) { + super(databaseUrl, databaseUsername, databasePassword); + } + + @Override + public Result save(DataFlow dataFlow) { + var connection = getConnection(); + + var sql = "INSERT INTO data_flows (id, transfer_type, type, state, dataset_id, agreement_id, participant_id," + + " counter_party_id, dataspace_context, callback_address, suspension_reason, termination_reason," + + " labels, metadata, data_address, controlplane_id) VALUES" + + " (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::json, ?::json, ?::json, ?)" + + " ON CONFLICT (id) DO UPDATE SET" + + " transfer_type = EXCLUDED.transfer_type," + + " type = EXCLUDED.type," + + " state = EXCLUDED.state," + + " dataset_id = EXCLUDED.dataset_id," + + " agreement_id = EXCLUDED.agreement_id," + + " participant_id = EXCLUDED.participant_id," + + " counter_party_id = EXCLUDED.counter_party_id," + + " dataspace_context = EXCLUDED.dataspace_context," + + " callback_address = EXCLUDED.callback_address," + + " suspension_reason = EXCLUDED.suspension_reason," + + " termination_reason = EXCLUDED.termination_reason," + + " labels = EXCLUDED.labels," + + " metadata = EXCLUDED.metadata," + + " data_address = EXCLUDED.data_address," + + " controlplane_id = EXCLUDED.controlplane_id"; + + try (var statement = connection.prepareStatement(sql)) { + statement.setString(1, dataFlow.getId()); + statement.setString(2, dataFlow.getState().name()); + statement.setString(3, dataFlow.getTransferType()); + statement.setString(4, dataFlow.getDatasetId()); + statement.setString(5, dataFlow.getAgreementId()); + statement.setString(6, dataFlow.getParticipantId()); + statement.setString(7, dataFlow.getCounterPartyId()); + statement.setString(8, dataFlow.getDataspaceContext()); + statement.setString(9, dataFlow.getCallbackAddress().toString()); + statement.setString(10, dataFlow.getSuspensionReason()); + statement.setString(11, dataFlow.getTerminationReason()); + statement.setString(12, objectMapper.writeValueAsString(dataFlow.getLabels())); + statement.setString(13, objectMapper.writeValueAsString(dataFlow.getMetadata())); + statement.setString(14, objectMapper.writeValueAsString(dataFlow.getDataAddress())); + statement.setString(15, dataFlow.getControlplaneId()); + statement.setString(16, dataFlow.getType().name()); + + statement.executeUpdate(); + return Result.success(); + } catch (Exception e) { + return Result.failure(new PersistenceException(format("Failed to persist DataFlow with ID %s.", dataFlow.getId()), e)); + } + } + + @Override + public Result findById(String flowId) { + var connection = getConnection(); + + var sql = "SELECT * FROM data_flows WHERE id = ?"; + + try (var statement = connection.prepareStatement(sql)) { + statement.setString(1, flowId); + var resultSet = statement.executeQuery(); + + if (!resultSet.next()) { + return Result.failure(new ResourceNotFoundException(format("DataFlow with id %s not found.", flowId))); + } + + var dataFlow = DataFlow.newInstance() + .id(flowId) + .state(DataFlow.State.valueOf(resultSet.getString("state"))) + .transferType(resultSet.getString("transfer_type")) + .datasetId(resultSet.getString("dataset_id")) + .agreementId(resultSet.getString("agreement_id")) + .participantId(resultSet.getString("participant_id")) + .counterPartyId(resultSet.getString("counter_party_id")) + .dataspaceContext(resultSet.getString("dataspace_context")) + .callbackAddress(URI.create(resultSet.getString("callback_address"))) + .labels(objectMapper.readValue(resultSet.getString("labels"), new TypeReference<>() { + })) + .metadata(objectMapper.readValue(resultSet.getString("metadata"), new TypeReference<>() { + })) + .dataAddress(objectMapper.readValue(resultSet.getString("data_address"), DataAddress.class)) + .controlplaneId(resultSet.getString("controlplane_id")) + .type(DataFlow.Type.valueOf(resultSet.getString("type"))) + .build(); + + if (dataFlow.getState() == DataFlow.State.SUSPENDED) { + dataFlow.transitionToSuspended(resultSet.getString("suspension_reason")); + } else if (dataFlow.getState() == DataFlow.State.TERMINATED) { + dataFlow.transitionToTerminated(resultSet.getString("termination_reason")); + } + + return Result.success(dataFlow); + } catch (Exception e) { + return Result.failure(new PersistenceException(format("Failed to read DataFlow with ID %s.", flowId), e)); + } + } +} diff --git a/src/main/resources/sql/controlplane_schema.sql b/src/main/resources/sql/controlplane_schema.sql new file mode 100644 index 0000000..b2beabb --- /dev/null +++ b/src/main/resources/sql/controlplane_schema.sql @@ -0,0 +1,8 @@ +CREATE TABLE IF NOT EXISTS control_planes +( + id VARCHAR PRIMARY KEY, + endpoint VARCHAR, + authorization JSON +); + +COMMENT ON COLUMN control_planes.authorization IS 'Authorization profile serialized as JSON'; diff --git a/src/main/resources/sql/data_flow_schema.sql b/src/main/resources/sql/data_flow_schema.sql new file mode 100644 index 0000000..66c8b48 --- /dev/null +++ b/src/main/resources/sql/data_flow_schema.sql @@ -0,0 +1,23 @@ +CREATE TABLE IF NOT EXISTS data_flows +( + id VARCHAR PRIMARY KEY, + transfer_type VARCHAR, + type VARCHAR, + state VARCHAR NOT NULL, + dataset_id VARCHAR, + agreement_id VARCHAR, + participant_id VARCHAR, + counter_party_id VARCHAR, + dataspace_context VARCHAR, + callback_address VARCHAR, + suspension_reason VARCHAR, + termination_reason VARCHAR, + labels JSON, + metadata JSON, + data_address JSON, + controlplane_id VARCHAR +); + +COMMENT ON COLUMN data_flows.labels IS 'List of labels serialized as JSON'; +COMMENT ON COLUMN data_flows.metadata IS 'Metadata serialized as JSON'; +COMMENT ON COLUMN data_flows.data_address IS 'Data address serialized as JSON'; From 3749a996a91becb0fc32bc9fc1a7741b67fbd242 Mon Sep 17 00:00:00 2001 From: Ronja Quensel Date: Wed, 13 May 2026 10:12:31 +0200 Subject: [PATCH 2/9] chore: move statements to interface --- .../store/{ => sql}/AbstractSqlStore.java | 2 +- .../store/sql/ControlPlaneStatements.java | 13 +++++++ .../port/store/sql/DataFlowStatements.java | 9 +++++ .../sql/PostgresqlControlPlaneStatements.java | 26 ++++++++++++++ .../sql/PostgresqlDataFlowStatements.java | 32 +++++++++++++++++ .../store/{ => sql}/SqlControlPlaneStore.java | 27 ++++++-------- .../store/{ => sql}/SqlDataFlowStore.java | 35 +++++-------------- 7 files changed, 99 insertions(+), 45 deletions(-) rename src/main/java/org/eclipse/dataplane/port/store/{ => sql}/AbstractSqlStore.java (95%) create mode 100644 src/main/java/org/eclipse/dataplane/port/store/sql/ControlPlaneStatements.java create mode 100644 src/main/java/org/eclipse/dataplane/port/store/sql/DataFlowStatements.java create mode 100644 src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlControlPlaneStatements.java create mode 100644 src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlDataFlowStatements.java rename src/main/java/org/eclipse/dataplane/port/store/{ => sql}/SqlControlPlaneStore.java (79%) rename src/main/java/org/eclipse/dataplane/port/store/{ => sql}/SqlDataFlowStore.java (72%) diff --git a/src/main/java/org/eclipse/dataplane/port/store/AbstractSqlStore.java b/src/main/java/org/eclipse/dataplane/port/store/sql/AbstractSqlStore.java similarity index 95% rename from src/main/java/org/eclipse/dataplane/port/store/AbstractSqlStore.java rename to src/main/java/org/eclipse/dataplane/port/store/sql/AbstractSqlStore.java index d2dff6b..2592570 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/AbstractSqlStore.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/AbstractSqlStore.java @@ -1,4 +1,4 @@ -package org.eclipse.dataplane.port.store; +package org.eclipse.dataplane.port.store.sql; import com.fasterxml.jackson.databind.ObjectMapper; import org.eclipse.dataplane.port.exception.PersistenceException; diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/ControlPlaneStatements.java b/src/main/java/org/eclipse/dataplane/port/store/sql/ControlPlaneStatements.java new file mode 100644 index 0000000..f16d89a --- /dev/null +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/ControlPlaneStatements.java @@ -0,0 +1,13 @@ +package org.eclipse.dataplane.port.store.sql; + +public interface ControlPlaneStatements { + + String upsertTemplate(); + + String findByIdTemplate(); + + String deleteByIdTemplate(); + + String countByIdTemplate(); + +} diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/DataFlowStatements.java b/src/main/java/org/eclipse/dataplane/port/store/sql/DataFlowStatements.java new file mode 100644 index 0000000..ced8df1 --- /dev/null +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/DataFlowStatements.java @@ -0,0 +1,9 @@ +package org.eclipse.dataplane.port.store.sql; + +public interface DataFlowStatements { + + String upsertTemplate(); + + String findByIdTemplate(); + +} diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlControlPlaneStatements.java b/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlControlPlaneStatements.java new file mode 100644 index 0000000..4f55b3b --- /dev/null +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlControlPlaneStatements.java @@ -0,0 +1,26 @@ +package org.eclipse.dataplane.port.store.sql; + +public class PostgresqlControlPlaneStatements implements ControlPlaneStatements { + @Override + public String upsertTemplate() { + return "INSERT INTO control_planes (id, endpoint, authorization) VALUES (?, ?, ?::json)" + + " ON CONFLICT (id) DO UPDATE SET" + + " endpoint = EXCLUDED.endpoint," + + " authorization = EXCLUDED.authorization"; + } + + @Override + public String findByIdTemplate() { + return "SELECT * FROM control_planes WHERE id = ?"; + } + + @Override + public String deleteByIdTemplate() { + return "DELETE FROM control_planes WHERE id = ?"; + } + + @Override + public String countByIdTemplate() { + return "SELECT COUNT(*) FROM control_planes WHERE id = ?"; + } +} diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlDataFlowStatements.java b/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlDataFlowStatements.java new file mode 100644 index 0000000..0091a75 --- /dev/null +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlDataFlowStatements.java @@ -0,0 +1,32 @@ +package org.eclipse.dataplane.port.store.sql; + +public class PostgresqlDataFlowStatements implements DataFlowStatements { + @Override + public String upsertTemplate() { + return "INSERT INTO data_flows (id, transfer_type, type, state, dataset_id, agreement_id, participant_id," + + " counter_party_id, dataspace_context, callback_address, suspension_reason, termination_reason," + + " labels, metadata, data_address, controlplane_id) VALUES" + + " (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::json, ?::json, ?::json, ?)" + + " ON CONFLICT (id) DO UPDATE SET" + + " transfer_type = EXCLUDED.transfer_type," + + " type = EXCLUDED.type," + + " state = EXCLUDED.state," + + " dataset_id = EXCLUDED.dataset_id," + + " agreement_id = EXCLUDED.agreement_id," + + " participant_id = EXCLUDED.participant_id," + + " counter_party_id = EXCLUDED.counter_party_id," + + " dataspace_context = EXCLUDED.dataspace_context," + + " callback_address = EXCLUDED.callback_address," + + " suspension_reason = EXCLUDED.suspension_reason," + + " termination_reason = EXCLUDED.termination_reason," + + " labels = EXCLUDED.labels," + + " metadata = EXCLUDED.metadata," + + " data_address = EXCLUDED.data_address," + + " controlplane_id = EXCLUDED.controlplane_id"; + } + + @Override + public String findByIdTemplate() { + return "SELECT * FROM data_flows WHERE id = ?"; + } +} diff --git a/src/main/java/org/eclipse/dataplane/port/store/SqlControlPlaneStore.java b/src/main/java/org/eclipse/dataplane/port/store/sql/SqlControlPlaneStore.java similarity index 79% rename from src/main/java/org/eclipse/dataplane/port/store/SqlControlPlaneStore.java rename to src/main/java/org/eclipse/dataplane/port/store/sql/SqlControlPlaneStore.java index 0e52d72..9226f37 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/SqlControlPlaneStore.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/SqlControlPlaneStore.java @@ -1,10 +1,11 @@ -package org.eclipse.dataplane.port.store; +package org.eclipse.dataplane.port.store.sql; import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.controlplane.ControlPlane; import org.eclipse.dataplane.domain.registration.AuthorizationProfile; import org.eclipse.dataplane.port.exception.PersistenceException; import org.eclipse.dataplane.port.exception.ResourceNotFoundException; +import org.eclipse.dataplane.port.store.ControlPlaneStore; import java.net.URI; @@ -12,20 +13,18 @@ public class SqlControlPlaneStore extends AbstractSqlStore implements ControlPlaneStore { - public SqlControlPlaneStore(String databaseUrl, String databaseUsername, String databasePassword) { + private final ControlPlaneStatements statements; + + public SqlControlPlaneStore(String databaseUrl, String databaseUsername, String databasePassword, ControlPlaneStatements statements) { super(databaseUrl, databaseUsername, databasePassword); + this.statements = statements; } @Override public Result save(ControlPlane controlPlane) { var connection = getConnection(); - var sql = "INSERT INTO control_planes (id, endpoint, authorization) VALUES (?, ?, ?::json)" - + " ON CONFLICT (id) DO UPDATE SET" - + " endpoint = EXCLUDED.endpoint," - + " authorization = EXCLUDED.authorization"; - - try (var statement = connection.prepareStatement(sql)) { + try (var statement = connection.prepareStatement(statements.upsertTemplate())) { statement.setString(1, controlPlane.getId()); statement.setString(2, controlPlane.getEndpoint().toString()); statement.setString(3, objectMapper.writeValueAsString(controlPlane.getAuthorization())); @@ -41,9 +40,7 @@ public Result save(ControlPlane controlPlane) { public Result findById(String controlplaneId) { var connection = getConnection(); - var sql = "SELECT * FROM control_planes WHERE id = ?"; - - try (var statement = connection.prepareStatement(sql)) { + try (var statement = connection.prepareStatement(statements.findByIdTemplate())) { statement.setString(1, controlplaneId); var resultSet = statement.executeQuery(); @@ -66,9 +63,7 @@ public Result findById(String controlplaneId) { public Result delete(String id) { var connection = getConnection(); - var sql = "DELETE FROM control_planes WHERE id = ?"; - - try (var statement = connection.prepareStatement(sql)) { + try (var statement = connection.prepareStatement(statements.deleteByIdTemplate())) { statement.setString(1, id); statement.executeUpdate(); return Result.success(); @@ -81,9 +76,7 @@ public Result delete(String id) { public boolean exists(String controlplaneId) { var connection = getConnection(); - var sql = "SELECT COUNT(*) FROM control_planes WHERE id = ?"; - - try (var statement = connection.prepareStatement(sql)) { + try (var statement = connection.prepareStatement(statements.countByIdTemplate())) { statement.setString(1, controlplaneId); var resultSet = statement.executeQuery(); return resultSet.getInt(1) > 0; diff --git a/src/main/java/org/eclipse/dataplane/port/store/SqlDataFlowStore.java b/src/main/java/org/eclipse/dataplane/port/store/sql/SqlDataFlowStore.java similarity index 72% rename from src/main/java/org/eclipse/dataplane/port/store/SqlDataFlowStore.java rename to src/main/java/org/eclipse/dataplane/port/store/sql/SqlDataFlowStore.java index 29beb46..8ec7769 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/SqlDataFlowStore.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/SqlDataFlowStore.java @@ -1,4 +1,4 @@ -package org.eclipse.dataplane.port.store; +package org.eclipse.dataplane.port.store.sql; import com.fasterxml.jackson.core.type.TypeReference; import org.eclipse.dataplane.domain.DataAddress; @@ -6,6 +6,7 @@ import org.eclipse.dataplane.domain.dataflow.DataFlow; import org.eclipse.dataplane.port.exception.PersistenceException; import org.eclipse.dataplane.port.exception.ResourceNotFoundException; +import org.eclipse.dataplane.port.store.DataFlowStore; import java.net.URI; @@ -13,36 +14,18 @@ public class SqlDataFlowStore extends AbstractSqlStore implements DataFlowStore { - public SqlDataFlowStore(String databaseUrl, String databaseUsername, String databasePassword) { + private final DataFlowStatements statements; + + public SqlDataFlowStore(String databaseUrl, String databaseUsername, String databasePassword, DataFlowStatements statements) { super(databaseUrl, databaseUsername, databasePassword); + this.statements = statements; } @Override public Result save(DataFlow dataFlow) { var connection = getConnection(); - var sql = "INSERT INTO data_flows (id, transfer_type, type, state, dataset_id, agreement_id, participant_id," - + " counter_party_id, dataspace_context, callback_address, suspension_reason, termination_reason," - + " labels, metadata, data_address, controlplane_id) VALUES" - + " (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::json, ?::json, ?::json, ?)" - + " ON CONFLICT (id) DO UPDATE SET" - + " transfer_type = EXCLUDED.transfer_type," - + " type = EXCLUDED.type," - + " state = EXCLUDED.state," - + " dataset_id = EXCLUDED.dataset_id," - + " agreement_id = EXCLUDED.agreement_id," - + " participant_id = EXCLUDED.participant_id," - + " counter_party_id = EXCLUDED.counter_party_id," - + " dataspace_context = EXCLUDED.dataspace_context," - + " callback_address = EXCLUDED.callback_address," - + " suspension_reason = EXCLUDED.suspension_reason," - + " termination_reason = EXCLUDED.termination_reason," - + " labels = EXCLUDED.labels," - + " metadata = EXCLUDED.metadata," - + " data_address = EXCLUDED.data_address," - + " controlplane_id = EXCLUDED.controlplane_id"; - - try (var statement = connection.prepareStatement(sql)) { + try (var statement = connection.prepareStatement(statements.upsertTemplate())) { statement.setString(1, dataFlow.getId()); statement.setString(2, dataFlow.getState().name()); statement.setString(3, dataFlow.getTransferType()); @@ -71,9 +54,7 @@ public Result save(DataFlow dataFlow) { public Result findById(String flowId) { var connection = getConnection(); - var sql = "SELECT * FROM data_flows WHERE id = ?"; - - try (var statement = connection.prepareStatement(sql)) { + try (var statement = connection.prepareStatement(statements.findByIdTemplate())) { statement.setString(1, flowId); var resultSet = statement.executeQuery(); From 8ceccbe91c8ccbe88eabd009af93d9aa36965e38 Mon Sep 17 00:00:00 2001 From: Ronja Quensel Date: Wed, 13 May 2026 15:21:05 +0200 Subject: [PATCH 3/9] test: add store tests --- build.gradle.kts | 3 + .../port/store/sql/AbstractSqlStore.java | 14 +- .../sql/PostgresqlControlPlaneStatements.java | 4 +- .../port/store/sql/SqlControlPlaneStore.java | 29 ++-- .../port/store/sql/SqlDataFlowStore.java | 44 +++--- ...ne_schema.sql => control_plane_schema.sql} | 4 +- .../store/ControlPlaneStoreTestBase.java | 133 ++++++++++++++++++ .../store/DataFlowStoreTestBase.java | 95 +++++++++++++ .../store/InMemoryControlPlaneStoreTest.java | 17 +++ .../store/InMemoryDataFlowStoreTest.java | 17 +++ .../sql/PostgresControlPlaneStoreTest.java | 55 ++++++++ .../store/sql/PostgresDataFlowStoreTest.java | 55 ++++++++ 12 files changed, 435 insertions(+), 35 deletions(-) rename src/main/resources/sql/{controlplane_schema.sql => control_plane_schema.sql} (50%) create mode 100644 src/test/java/org/eclipse/dataplane/store/ControlPlaneStoreTestBase.java create mode 100644 src/test/java/org/eclipse/dataplane/store/DataFlowStoreTestBase.java create mode 100644 src/test/java/org/eclipse/dataplane/store/InMemoryControlPlaneStoreTest.java create mode 100644 src/test/java/org/eclipse/dataplane/store/InMemoryDataFlowStoreTest.java create mode 100644 src/test/java/org/eclipse/dataplane/store/sql/PostgresControlPlaneStoreTest.java create mode 100644 src/test/java/org/eclipse/dataplane/store/sql/PostgresDataFlowStoreTest.java diff --git a/build.gradle.kts b/build.gradle.kts index 15b9940..dab8889 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -34,6 +34,9 @@ dependencies { testImplementation("org.mockito:mockito-core:5.23.0") testImplementation("org.slf4j:slf4j-simple:2.0.17") testImplementation("org.wiremock:wiremock-jetty12:3.13.2") + testImplementation("org.testcontainers:testcontainers-junit-jupiter:2.0.5") + testImplementation("org.testcontainers:testcontainers-postgresql:2.0.5") + testImplementation("org.postgresql:postgresql:42.7.11") } tasks.test { diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/AbstractSqlStore.java b/src/main/java/org/eclipse/dataplane/port/store/sql/AbstractSqlStore.java index 2592570..c7aaa34 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/AbstractSqlStore.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/AbstractSqlStore.java @@ -6,17 +6,16 @@ import java.sql.Connection; import java.sql.DriverManager; -import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; - public abstract class AbstractSqlStore { - protected final ObjectMapper objectMapper = new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false); + protected ObjectMapper objectMapper; private final String databaseUrl; private final String databaseUsername; private final String databasePassword; - public AbstractSqlStore(String databaseUrl, String databaseUsername, String databasePassword) { + public AbstractSqlStore(ObjectMapper objectMapper, String databaseUrl, String databaseUsername, String databasePassword) { + this.objectMapper = objectMapper; this.databaseUrl = databaseUrl; this.databaseUsername = databaseUsername; this.databasePassword = databasePassword; @@ -30,4 +29,11 @@ protected Connection getConnection() { } } + protected void closeConnection(Connection connection) { + try { + connection.close(); + } catch (Exception e) { + throw new PersistenceException("Failed to commit transaction.", e); + } + } } diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlControlPlaneStatements.java b/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlControlPlaneStatements.java index 4f55b3b..6464f38 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlControlPlaneStatements.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlControlPlaneStatements.java @@ -3,10 +3,10 @@ public class PostgresqlControlPlaneStatements implements ControlPlaneStatements { @Override public String upsertTemplate() { - return "INSERT INTO control_planes (id, endpoint, authorization) VALUES (?, ?, ?::json)" + return "INSERT INTO control_planes (id, endpoint, auth) VALUES (?, ?, ?::json)" + " ON CONFLICT (id) DO UPDATE SET" + " endpoint = EXCLUDED.endpoint," - + " authorization = EXCLUDED.authorization"; + + " auth = EXCLUDED.auth"; } @Override diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/SqlControlPlaneStore.java b/src/main/java/org/eclipse/dataplane/port/store/sql/SqlControlPlaneStore.java index 9226f37..8172211 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/SqlControlPlaneStore.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/SqlControlPlaneStore.java @@ -1,5 +1,6 @@ package org.eclipse.dataplane.port.store.sql; +import com.fasterxml.jackson.databind.ObjectMapper; import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.controlplane.ControlPlane; import org.eclipse.dataplane.domain.registration.AuthorizationProfile; @@ -15,8 +16,8 @@ public class SqlControlPlaneStore extends AbstractSqlStore implements ControlPla private final ControlPlaneStatements statements; - public SqlControlPlaneStore(String databaseUrl, String databaseUsername, String databasePassword, ControlPlaneStatements statements) { - super(databaseUrl, databaseUsername, databasePassword); + public SqlControlPlaneStore(ObjectMapper objectMapper, String databaseUrl, String databaseUsername, String databasePassword, ControlPlaneStatements statements) { + super(objectMapper, databaseUrl, databaseUsername, databasePassword); this.statements = statements; } @@ -32,7 +33,9 @@ public Result save(ControlPlane controlPlane) { statement.executeUpdate(); return Result.success(); } catch (Exception e) { - return Result.failure(new PersistenceException(format("Failed to persist ControlPlane with ID %s.", controlPlane.getId()), e)); + return Result.failure(new PersistenceException(format("Failed to persist ControlPlane with id %s.", controlPlane.getId()), e)); + } finally { + closeConnection(connection); } } @@ -51,11 +54,13 @@ public Result findById(String controlplaneId) { var controlplane = ControlPlane.newInstance() .id(controlplaneId) .endpoint(URI.create(resultSet.getString("endpoint"))) - .authorization(objectMapper.readValue(resultSet.getString("authorization"), AuthorizationProfile.class)) + .authorization(objectMapper.readValue(resultSet.getString("auth"), AuthorizationProfile.class)) .build(); return Result.success(controlplane); } catch (Exception e) { - return Result.failure(new PersistenceException(format("Failed to read ControlPlane with ID %s.", controlplaneId), e)); + return Result.failure(new PersistenceException(format("Failed to read ControlPlane with id %s.", controlplaneId), e)); + } finally { + closeConnection(connection); } } @@ -65,10 +70,15 @@ public Result delete(String id) { try (var statement = connection.prepareStatement(statements.deleteByIdTemplate())) { statement.setString(1, id); - statement.executeUpdate(); + var rows = statement.executeUpdate(); + if (rows < 1) { + return Result.failure(new ResourceNotFoundException(format("ControlPlane with id %s not found.", id))); + } return Result.success(); } catch (Exception e) { - return Result.failure(new PersistenceException(format("Failed to delete ControlPlane with ID %s.", id), e)); + return Result.failure(new PersistenceException(format("Failed to delete ControlPlane with id %s.", id), e)); + } finally { + closeConnection(connection); } } @@ -79,9 +89,12 @@ public boolean exists(String controlplaneId) { try (var statement = connection.prepareStatement(statements.countByIdTemplate())) { statement.setString(1, controlplaneId); var resultSet = statement.executeQuery(); + resultSet.next(); return resultSet.getInt(1) > 0; } catch (Exception e) { - throw new PersistenceException(format("Failed to check for existence of ControlPlane with ID %s.", controlplaneId), e); + throw new PersistenceException(format("Failed to check for existence of ControlPlane with id %s.", controlplaneId), e); + } finally { + closeConnection(connection); } } diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/SqlDataFlowStore.java b/src/main/java/org/eclipse/dataplane/port/store/sql/SqlDataFlowStore.java index 8ec7769..481b90a 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/SqlDataFlowStore.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/SqlDataFlowStore.java @@ -1,6 +1,7 @@ package org.eclipse.dataplane.port.store.sql; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import org.eclipse.dataplane.domain.DataAddress; import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlow; @@ -16,8 +17,8 @@ public class SqlDataFlowStore extends AbstractSqlStore implements DataFlowStore private final DataFlowStatements statements; - public SqlDataFlowStore(String databaseUrl, String databaseUsername, String databasePassword, DataFlowStatements statements) { - super(databaseUrl, databaseUsername, databasePassword); + public SqlDataFlowStore(ObjectMapper objectMapper, String databaseUrl, String databaseUsername, String databasePassword, DataFlowStatements statements) { + super(objectMapper, databaseUrl, databaseUsername, databasePassword); this.statements = statements; } @@ -27,26 +28,29 @@ public Result save(DataFlow dataFlow) { try (var statement = connection.prepareStatement(statements.upsertTemplate())) { statement.setString(1, dataFlow.getId()); - statement.setString(2, dataFlow.getState().name()); - statement.setString(3, dataFlow.getTransferType()); - statement.setString(4, dataFlow.getDatasetId()); - statement.setString(5, dataFlow.getAgreementId()); - statement.setString(6, dataFlow.getParticipantId()); - statement.setString(7, dataFlow.getCounterPartyId()); - statement.setString(8, dataFlow.getDataspaceContext()); - statement.setString(9, dataFlow.getCallbackAddress().toString()); - statement.setString(10, dataFlow.getSuspensionReason()); - statement.setString(11, dataFlow.getTerminationReason()); - statement.setString(12, objectMapper.writeValueAsString(dataFlow.getLabels())); - statement.setString(13, objectMapper.writeValueAsString(dataFlow.getMetadata())); - statement.setString(14, objectMapper.writeValueAsString(dataFlow.getDataAddress())); - statement.setString(15, dataFlow.getControlplaneId()); - statement.setString(16, dataFlow.getType().name()); + statement.setString(2, dataFlow.getTransferType()); + statement.setString(3, dataFlow.getType().name()); + statement.setString(4, dataFlow.getState().name()); + statement.setString(5, dataFlow.getDatasetId()); + statement.setString(6, dataFlow.getAgreementId()); + statement.setString(7, dataFlow.getParticipantId()); + statement.setString(8, dataFlow.getCounterPartyId()); + statement.setString(9, dataFlow.getDataspaceContext()); + statement.setString(10, dataFlow.getCallbackAddress().toString()); + statement.setString(11, dataFlow.getSuspensionReason()); + statement.setString(12, dataFlow.getTerminationReason()); + statement.setString(13, objectMapper.writeValueAsString(dataFlow.getLabels())); + statement.setString(14, objectMapper.writeValueAsString(dataFlow.getMetadata())); + statement.setString(15, objectMapper.writeValueAsString(dataFlow.getDataAddress())); + statement.setString(16, dataFlow.getControlplaneId()); + statement.executeUpdate(); return Result.success(); } catch (Exception e) { - return Result.failure(new PersistenceException(format("Failed to persist DataFlow with ID %s.", dataFlow.getId()), e)); + return Result.failure(new PersistenceException(format("Failed to persist DataFlow with id %s.", dataFlow.getId()), e)); + } finally { + closeConnection(connection); } } @@ -89,7 +93,9 @@ public Result findById(String flowId) { return Result.success(dataFlow); } catch (Exception e) { - return Result.failure(new PersistenceException(format("Failed to read DataFlow with ID %s.", flowId), e)); + return Result.failure(new PersistenceException(format("Failed to read DataFlow with id %s.", flowId), e)); + } finally { + closeConnection(connection); } } } diff --git a/src/main/resources/sql/controlplane_schema.sql b/src/main/resources/sql/control_plane_schema.sql similarity index 50% rename from src/main/resources/sql/controlplane_schema.sql rename to src/main/resources/sql/control_plane_schema.sql index b2beabb..1c5c9b3 100644 --- a/src/main/resources/sql/controlplane_schema.sql +++ b/src/main/resources/sql/control_plane_schema.sql @@ -2,7 +2,7 @@ CREATE TABLE IF NOT EXISTS control_planes ( id VARCHAR PRIMARY KEY, endpoint VARCHAR, - authorization JSON + auth JSON ); -COMMENT ON COLUMN control_planes.authorization IS 'Authorization profile serialized as JSON'; +COMMENT ON COLUMN control_planes.auth IS 'Authorization profile serialized as JSON'; diff --git a/src/test/java/org/eclipse/dataplane/store/ControlPlaneStoreTestBase.java b/src/test/java/org/eclipse/dataplane/store/ControlPlaneStoreTestBase.java new file mode 100644 index 0000000..eb69c12 --- /dev/null +++ b/src/test/java/org/eclipse/dataplane/store/ControlPlaneStoreTestBase.java @@ -0,0 +1,133 @@ +package org.eclipse.dataplane.store; + +import org.eclipse.dataplane.domain.controlplane.ControlPlane; +import org.eclipse.dataplane.domain.registration.AuthorizationProfile; +import org.eclipse.dataplane.port.exception.ResourceNotFoundException; +import org.eclipse.dataplane.port.store.ControlPlaneStore; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.net.URI; + +import static org.assertj.core.api.Assertions.assertThat; + +public abstract class ControlPlaneStoreTestBase { + + @Nested + class Save { + @Test + void save_newControlPlane_shouldCreate() { + var id = "id"; + var controlPlane = controlPlane(id); + + var result = store().save(controlPlane); + assertThat(result.succeeded()).isTrue(); + + var persisted = store().findById(id).getContent(); + assertThat(persisted).isNotNull(); + assertThat(persisted).usingRecursiveComparison().isEqualTo(controlPlane); + } + + @Test + void save_existingControlPlane_shouldUpdate() { + var id = "toUpdate"; + var controlPlane = controlPlane(id); + store().save(controlPlane); + + var newControlPlane = ControlPlane.newInstance() + .id(id) + .endpoint(URI.create("http://some-other-endpoint")) + .authorization(controlPlane.getAuthorization()) + .build(); + store().save(newControlPlane); + + var updated = store().findById(id).getContent(); + assertThat(updated.getEndpoint()) + .isEqualTo(newControlPlane.getEndpoint()) + .isNotEqualTo(controlPlane.getEndpoint()); + assertThat(updated.getAuthorization()).usingRecursiveComparison() + .isEqualTo(controlPlane.getAuthorization()) + .isEqualTo(newControlPlane.getAuthorization()); + } + } + + @Nested + class FindById { + @Test + void findById_exists_shouldReturnControlPlane() { + var id = "id"; + var controlPlane = controlPlane(id); + store().save(controlPlane); + + var result = store().findById(id); + + assertThat(result.succeeded()).isTrue(); + assertThat(result.getContent()).isNotNull(); + } + + @Test + void findById_doesNotExist_shouldReturnNotFound() { + var result = store().findById("nonExistent"); + + assertThat(result.failed()).isTrue(); + assertThat(result.getException()).isInstanceOf(ResourceNotFoundException.class); + } + } + + @Nested + class Delete { + @Test + void delete_exists_shouldDelete() { + var id = "id"; + var controlPlane = controlPlane(id); + store().save(controlPlane); + + var deleteResult = store().delete(id); + assertThat(deleteResult.succeeded()).isTrue(); + + var findResult = store().findById(id); + assertThat(findResult.failed()).isTrue(); + assertThat(findResult.getException()).isInstanceOf(ResourceNotFoundException.class); + } + + @Test + void delete_doesNotExist_shouldReturnNotFound() { + var deleteResult = store().delete("nonExistent"); + + assertThat(deleteResult.failed()).isTrue(); + assertThat(deleteResult.getException()).isInstanceOf(ResourceNotFoundException.class); + } + } + + @Nested + class Exists { + @Test + void exists_exists_shouldReturnTrue() { + var id = "id"; + var controlPlane = controlPlane(id); + store().save(controlPlane); + + var exists = store().exists(id); + + assertThat(exists).isTrue(); + } + + @Test + void exists_doesNotExists_shouldReturnFalse() { + var exists = store().exists("nonExistent"); + + assertThat(exists).isFalse(); + } + } + + protected abstract ControlPlaneStore store(); + + private ControlPlane controlPlane(String id) { + return ControlPlane.newInstance() + .id(id) + .endpoint(URI.create("https://controlplane")) + .authorization(new AuthorizationProfile("token") + .withAttribute("Authorization", "authToken")) + .build(); + } +} diff --git a/src/test/java/org/eclipse/dataplane/store/DataFlowStoreTestBase.java b/src/test/java/org/eclipse/dataplane/store/DataFlowStoreTestBase.java new file mode 100644 index 0000000..0c57cd1 --- /dev/null +++ b/src/test/java/org/eclipse/dataplane/store/DataFlowStoreTestBase.java @@ -0,0 +1,95 @@ +package org.eclipse.dataplane.store; + +import org.eclipse.dataplane.domain.DataAddress; +import org.eclipse.dataplane.domain.dataflow.DataFlow; +import org.eclipse.dataplane.port.exception.ResourceNotFoundException; +import org.eclipse.dataplane.port.store.DataFlowStore; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.net.URI; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +public abstract class DataFlowStoreTestBase { + + @Nested + class Save { + @Test + void save_newDataFlow_shouldCreate() { + var id = "id"; + var dataFlow = dataFlow(id); + + var result = store().save(dataFlow); + assertThat(result.succeeded()).isTrue(); + + var persisted = store().findById(id).getContent(); + assertThat(persisted).isNotNull(); + assertThat(persisted).usingRecursiveComparison().isEqualTo(dataFlow); + } + + @Test + void save_existingDataFlow_shouldUpdate() { + var id = "toUpdate"; + var dataFlow = dataFlow(id); + store().save(dataFlow); + var persisted = store().findById(id).getContent(); + assertThat(persisted.getState()).isEqualTo(DataFlow.State.INITIATING); + assertThat(persisted.getSuspensionReason()).isNull(); + + var suspensionReason = "suspend"; + dataFlow.transitionToSuspended(suspensionReason); + store().save(dataFlow); + + var updated = store().findById(id).getContent(); + assertThat(updated.getState()).isEqualTo(DataFlow.State.SUSPENDED); + assertThat(updated.getSuspensionReason()).isNotNull().isEqualTo(suspensionReason); + } + } + + @Nested + class FindById { + @Test + void findById_exists_shouldReturnDataFlow() { + var id = "id"; + var dataFlow = dataFlow(id); + store().save(dataFlow); + + var result = store().findById(id); + + assertThat(result.succeeded()).isTrue(); + assertThat(result.getContent()).isNotNull(); + } + + @Test + void findById_doesNotExist_shouldReturnNotFound() { + var result = store().findById("nonExistent"); + + assertThat(result.failed()).isTrue(); + assertThat(result.getException()).isInstanceOf(ResourceNotFoundException.class); + } + } + + protected abstract DataFlowStore store(); + + private DataFlow dataFlow(String id) { + return DataFlow.newInstance() + .id(id) + .state(DataFlow.State.INITIATING) + .transferType("HTTP-PUSH") + .datasetId("dataset") + .agreementId("agreement") + .participantId("participant") + .counterPartyId("counterParty") + .dataspaceContext("dataspaceContext") + .callbackAddress(URI.create("https://callbackAddress")) + .labels(List.of("label1", "label2")) + .metadata(Map.of("key1", "value1", "key2", "value2")) + .dataAddress(new DataAddress("http", "https://endpoint", List.of())) + .controlplaneId("controlPlane") + .type(DataFlow.Type.PROVIDER) + .build(); + } +} diff --git a/src/test/java/org/eclipse/dataplane/store/InMemoryControlPlaneStoreTest.java b/src/test/java/org/eclipse/dataplane/store/InMemoryControlPlaneStoreTest.java new file mode 100644 index 0000000..f3fd6a3 --- /dev/null +++ b/src/test/java/org/eclipse/dataplane/store/InMemoryControlPlaneStoreTest.java @@ -0,0 +1,17 @@ +package org.eclipse.dataplane.store; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.dataplane.port.store.ControlPlaneStore; +import org.eclipse.dataplane.port.store.InMemoryControlPlaneStore; + +import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; + +class InMemoryControlPlaneStoreTest extends ControlPlaneStoreTestBase { + + private InMemoryControlPlaneStore store = new InMemoryControlPlaneStore(new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false)); + + @Override + protected ControlPlaneStore store() { + return store; + } +} diff --git a/src/test/java/org/eclipse/dataplane/store/InMemoryDataFlowStoreTest.java b/src/test/java/org/eclipse/dataplane/store/InMemoryDataFlowStoreTest.java new file mode 100644 index 0000000..fddbdd9 --- /dev/null +++ b/src/test/java/org/eclipse/dataplane/store/InMemoryDataFlowStoreTest.java @@ -0,0 +1,17 @@ +package org.eclipse.dataplane.store; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.dataplane.port.store.DataFlowStore; +import org.eclipse.dataplane.port.store.InMemoryDataFlowStore; + +import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; + +class InMemoryDataFlowStoreTest extends DataFlowStoreTestBase { + + private InMemoryDataFlowStore store = new InMemoryDataFlowStore(new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false)); + + @Override + protected DataFlowStore store() { + return store; + } +} diff --git a/src/test/java/org/eclipse/dataplane/store/sql/PostgresControlPlaneStoreTest.java b/src/test/java/org/eclipse/dataplane/store/sql/PostgresControlPlaneStoreTest.java new file mode 100644 index 0000000..168a329 --- /dev/null +++ b/src/test/java/org/eclipse/dataplane/store/sql/PostgresControlPlaneStoreTest.java @@ -0,0 +1,55 @@ +package org.eclipse.dataplane.store.sql; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.dataplane.port.store.ControlPlaneStore; +import org.eclipse.dataplane.port.store.sql.PostgresqlControlPlaneStatements; +import org.eclipse.dataplane.port.store.sql.SqlControlPlaneStore; +import org.eclipse.dataplane.store.ControlPlaneStoreTestBase; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.postgresql.PostgreSQLContainer; + +import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; + +@Testcontainers +class PostgresControlPlaneStoreTest extends ControlPlaneStoreTestBase { + + private static final String POSTGRES_IMAGE = "postgres:18.3"; + private static final String DATABASE = "dataplane"; + private static final String USERNAME = "user"; + private static final String PASSWORD = "password"; + + private final ObjectMapper mapper = new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false); + private SqlControlPlaneStore store; + + @Container + static PostgreSQLContainer postgres = new PostgreSQLContainer(POSTGRES_IMAGE) + .withDatabaseName(DATABASE) + .withUsername(USERNAME) + .withPassword(PASSWORD) + .withInitScript("sql/control_plane_schema.sql"); + + @BeforeAll + static void init() { + postgres.start(); + } + + @AfterAll + static void cleanUp() { + postgres.stop(); + postgres.close(); + } + + @BeforeEach + void initStore() { + store = new SqlControlPlaneStore(mapper, postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword(), new PostgresqlControlPlaneStatements()); + } + + @Override + protected ControlPlaneStore store() { + return store; + } +} diff --git a/src/test/java/org/eclipse/dataplane/store/sql/PostgresDataFlowStoreTest.java b/src/test/java/org/eclipse/dataplane/store/sql/PostgresDataFlowStoreTest.java new file mode 100644 index 0000000..1268746 --- /dev/null +++ b/src/test/java/org/eclipse/dataplane/store/sql/PostgresDataFlowStoreTest.java @@ -0,0 +1,55 @@ +package org.eclipse.dataplane.store.sql; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.dataplane.port.store.DataFlowStore; +import org.eclipse.dataplane.port.store.sql.PostgresqlDataFlowStatements; +import org.eclipse.dataplane.port.store.sql.SqlDataFlowStore; +import org.eclipse.dataplane.store.DataFlowStoreTestBase; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.postgresql.PostgreSQLContainer; + +import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; + +@Testcontainers +class PostgresDataFlowStoreTest extends DataFlowStoreTestBase { + + private static final String POSTGRES_IMAGE = "postgres:18.3"; + private static final String DATABASE = "dataplane"; + private static final String USERNAME = "user"; + private static final String PASSWORD = "password"; + + private final ObjectMapper mapper = new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false); + private SqlDataFlowStore store; + + @Container + static PostgreSQLContainer postgres = new PostgreSQLContainer(POSTGRES_IMAGE) + .withDatabaseName(DATABASE) + .withUsername(USERNAME) + .withPassword(PASSWORD) + .withInitScript("sql/data_flow_schema.sql"); + + @BeforeAll + static void init() { + postgres.start(); + } + + @AfterAll + static void cleanUp() { + postgres.stop(); + postgres.close(); + } + + @BeforeEach + void initStore() { + store = new SqlDataFlowStore(mapper, postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword(), new PostgresqlDataFlowStatements()); + } + + @Override + protected DataFlowStore store() { + return store; + } +} From 9800343c0b98d963e7917ccf1a14024798403106 Mon Sep 17 00:00:00 2001 From: Ronja Quensel Date: Wed, 13 May 2026 16:04:40 +0200 Subject: [PATCH 4/9] chore: license headers --- .../port/store/sql/AbstractSqlStore.java | 14 ++++++++++++++ .../port/store/sql/ControlPlaneStatements.java | 14 ++++++++++++++ .../port/store/sql/DataFlowStatements.java | 14 ++++++++++++++ .../sql/PostgresqlControlPlaneStatements.java | 14 ++++++++++++++ .../store/sql/PostgresqlDataFlowStatements.java | 14 ++++++++++++++ .../port/store/sql/SqlControlPlaneStore.java | 14 ++++++++++++++ .../port/store/sql/SqlDataFlowStore.java | 14 ++++++++++++++ src/main/resources/sql/control_plane_schema.sql | 15 +++++++++++++++ src/main/resources/sql/data_flow_schema.sql | 15 +++++++++++++++ .../store/ControlPlaneStoreTestBase.java | 14 ++++++++++++++ .../dataplane/store/DataFlowStoreTestBase.java | 14 ++++++++++++++ .../store/InMemoryControlPlaneStoreTest.java | 14 ++++++++++++++ .../store/InMemoryDataFlowStoreTest.java | 14 ++++++++++++++ .../store/sql/PostgresControlPlaneStoreTest.java | 14 ++++++++++++++ .../store/sql/PostgresDataFlowStoreTest.java | 14 ++++++++++++++ 15 files changed, 212 insertions(+) diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/AbstractSqlStore.java b/src/main/java/org/eclipse/dataplane/port/store/sql/AbstractSqlStore.java index c7aaa34..c351d5d 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/AbstractSqlStore.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/AbstractSqlStore.java @@ -1,3 +1,17 @@ +/* + * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation + * + */ + package org.eclipse.dataplane.port.store.sql; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/ControlPlaneStatements.java b/src/main/java/org/eclipse/dataplane/port/store/sql/ControlPlaneStatements.java index f16d89a..8d368d1 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/ControlPlaneStatements.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/ControlPlaneStatements.java @@ -1,3 +1,17 @@ +/* + * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation + * + */ + package org.eclipse.dataplane.port.store.sql; public interface ControlPlaneStatements { diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/DataFlowStatements.java b/src/main/java/org/eclipse/dataplane/port/store/sql/DataFlowStatements.java index ced8df1..08edcc5 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/DataFlowStatements.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/DataFlowStatements.java @@ -1,3 +1,17 @@ +/* + * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation + * + */ + package org.eclipse.dataplane.port.store.sql; public interface DataFlowStatements { diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlControlPlaneStatements.java b/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlControlPlaneStatements.java index 6464f38..fd78b3a 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlControlPlaneStatements.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlControlPlaneStatements.java @@ -1,3 +1,17 @@ +/* + * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation + * + */ + package org.eclipse.dataplane.port.store.sql; public class PostgresqlControlPlaneStatements implements ControlPlaneStatements { diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlDataFlowStatements.java b/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlDataFlowStatements.java index 0091a75..b2bee0c 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlDataFlowStatements.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlDataFlowStatements.java @@ -1,3 +1,17 @@ +/* + * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation + * + */ + package org.eclipse.dataplane.port.store.sql; public class PostgresqlDataFlowStatements implements DataFlowStatements { diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/SqlControlPlaneStore.java b/src/main/java/org/eclipse/dataplane/port/store/sql/SqlControlPlaneStore.java index 8172211..ab183a8 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/SqlControlPlaneStore.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/SqlControlPlaneStore.java @@ -1,3 +1,17 @@ +/* + * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation + * + */ + package org.eclipse.dataplane.port.store.sql; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/SqlDataFlowStore.java b/src/main/java/org/eclipse/dataplane/port/store/sql/SqlDataFlowStore.java index 481b90a..2662456 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/SqlDataFlowStore.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/SqlDataFlowStore.java @@ -1,3 +1,17 @@ +/* + * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation + * + */ + package org.eclipse.dataplane.port.store.sql; import com.fasterxml.jackson.core.type.TypeReference; diff --git a/src/main/resources/sql/control_plane_schema.sql b/src/main/resources/sql/control_plane_schema.sql index 1c5c9b3..60ef4d1 100644 --- a/src/main/resources/sql/control_plane_schema.sql +++ b/src/main/resources/sql/control_plane_schema.sql @@ -1,3 +1,18 @@ +-- +-- Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. +-- +-- This program and the accompanying materials are made available under the +-- terms of the Apache License, Version 2.0 which is available at +-- https://www.apache.org/licenses/LICENSE-2.0 +-- +-- SPDX-License-Identifier: Apache-2.0 +-- +-- Contributors: +-- Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial script +-- + +-- THIS SCHEMA HAS BEEN WRITTEN AND TESTED ONLY FOR POSTGRES + CREATE TABLE IF NOT EXISTS control_planes ( id VARCHAR PRIMARY KEY, diff --git a/src/main/resources/sql/data_flow_schema.sql b/src/main/resources/sql/data_flow_schema.sql index 66c8b48..30469bb 100644 --- a/src/main/resources/sql/data_flow_schema.sql +++ b/src/main/resources/sql/data_flow_schema.sql @@ -1,3 +1,18 @@ +-- +-- Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. +-- +-- This program and the accompanying materials are made available under the +-- terms of the Apache License, Version 2.0 which is available at +-- https://www.apache.org/licenses/LICENSE-2.0 +-- +-- SPDX-License-Identifier: Apache-2.0 +-- +-- Contributors: +-- Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial script +-- + +-- THIS SCHEMA HAS BEEN WRITTEN AND TESTED ONLY FOR POSTGRES + CREATE TABLE IF NOT EXISTS data_flows ( id VARCHAR PRIMARY KEY, diff --git a/src/test/java/org/eclipse/dataplane/store/ControlPlaneStoreTestBase.java b/src/test/java/org/eclipse/dataplane/store/ControlPlaneStoreTestBase.java index eb69c12..01c1449 100644 --- a/src/test/java/org/eclipse/dataplane/store/ControlPlaneStoreTestBase.java +++ b/src/test/java/org/eclipse/dataplane/store/ControlPlaneStoreTestBase.java @@ -1,3 +1,17 @@ +/* + * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation + * + */ + package org.eclipse.dataplane.store; import org.eclipse.dataplane.domain.controlplane.ControlPlane; diff --git a/src/test/java/org/eclipse/dataplane/store/DataFlowStoreTestBase.java b/src/test/java/org/eclipse/dataplane/store/DataFlowStoreTestBase.java index 0c57cd1..99650ec 100644 --- a/src/test/java/org/eclipse/dataplane/store/DataFlowStoreTestBase.java +++ b/src/test/java/org/eclipse/dataplane/store/DataFlowStoreTestBase.java @@ -1,3 +1,17 @@ +/* + * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation + * + */ + package org.eclipse.dataplane.store; import org.eclipse.dataplane.domain.DataAddress; diff --git a/src/test/java/org/eclipse/dataplane/store/InMemoryControlPlaneStoreTest.java b/src/test/java/org/eclipse/dataplane/store/InMemoryControlPlaneStoreTest.java index f3fd6a3..3809c56 100644 --- a/src/test/java/org/eclipse/dataplane/store/InMemoryControlPlaneStoreTest.java +++ b/src/test/java/org/eclipse/dataplane/store/InMemoryControlPlaneStoreTest.java @@ -1,3 +1,17 @@ +/* + * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation + * + */ + package org.eclipse.dataplane.store; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/src/test/java/org/eclipse/dataplane/store/InMemoryDataFlowStoreTest.java b/src/test/java/org/eclipse/dataplane/store/InMemoryDataFlowStoreTest.java index fddbdd9..e88c103 100644 --- a/src/test/java/org/eclipse/dataplane/store/InMemoryDataFlowStoreTest.java +++ b/src/test/java/org/eclipse/dataplane/store/InMemoryDataFlowStoreTest.java @@ -1,3 +1,17 @@ +/* + * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation + * + */ + package org.eclipse.dataplane.store; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/src/test/java/org/eclipse/dataplane/store/sql/PostgresControlPlaneStoreTest.java b/src/test/java/org/eclipse/dataplane/store/sql/PostgresControlPlaneStoreTest.java index 168a329..8840be0 100644 --- a/src/test/java/org/eclipse/dataplane/store/sql/PostgresControlPlaneStoreTest.java +++ b/src/test/java/org/eclipse/dataplane/store/sql/PostgresControlPlaneStoreTest.java @@ -1,3 +1,17 @@ +/* + * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation + * + */ + package org.eclipse.dataplane.store.sql; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/src/test/java/org/eclipse/dataplane/store/sql/PostgresDataFlowStoreTest.java b/src/test/java/org/eclipse/dataplane/store/sql/PostgresDataFlowStoreTest.java index 1268746..445a69d 100644 --- a/src/test/java/org/eclipse/dataplane/store/sql/PostgresDataFlowStoreTest.java +++ b/src/test/java/org/eclipse/dataplane/store/sql/PostgresDataFlowStoreTest.java @@ -1,3 +1,17 @@ +/* + * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation + * + */ + package org.eclipse.dataplane.store.sql; import com.fasterxml.jackson.databind.ObjectMapper; From 10f4a6c11aa24918fa8f752e0c4f1a6d725e5c3b Mon Sep 17 00:00:00 2001 From: Ronja Quensel Date: Wed, 20 May 2026 13:23:24 +0200 Subject: [PATCH 5/9] docs: Javadoc --- .../store/sql/ControlPlaneStatements.java | 40 +++++++++++++++++-- .../port/store/sql/DataFlowStatements.java | 24 ++++++++++- .../sql/PostgresqlControlPlaneStatements.java | 8 ++-- .../sql/PostgresqlDataFlowStatements.java | 4 +- .../port/store/sql/SqlControlPlaneStore.java | 8 ++-- .../port/store/sql/SqlDataFlowStore.java | 5 +-- 6 files changed, 70 insertions(+), 19 deletions(-) diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/ControlPlaneStatements.java b/src/main/java/org/eclipse/dataplane/port/store/sql/ControlPlaneStatements.java index 8d368d1..58d91b2 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/ControlPlaneStatements.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/ControlPlaneStatements.java @@ -14,14 +14,46 @@ package org.eclipse.dataplane.port.store.sql; +import org.eclipse.dataplane.domain.controlplane.ControlPlane; + +/** + * Provides templates for SQL statements for managing control planes. Used by the + * {@link SqlControlPlaneStore} within PreparedStatements. Can be implemented for different SQL + * dialects. + */ public interface ControlPlaneStatements { - String upsertTemplate(); + /** + * Provides the template for an upsert statement for control planes. The returned statement must + * contain placeholders for all properties of a control plane. + * + * @return the upsert statement template including the placeholders + */ + String upsertControlPlaneTemplate(); - String findByIdTemplate(); + /** + * Provides the template for a find-by-id statement for control planes.The returned statement + * must contain a placeholder for the control plane id. + * + * @return the find-by-id statement template including the placeholder + */ + String findControlPlaneByIdTemplate(); - String deleteByIdTemplate(); + /** + * Provides the template for a delete-by-id statement for control planes. The returned statement + * must contain a placeholder for the control plane id in the order shown in + * {@link SqlControlPlaneStore#save(ControlPlane)}. + * + * @return the delete-by-id statement template including the placeholder + */ + String deleteControlPlaneByIdTemplate(); - String countByIdTemplate(); + /** + * Provides the template for a count-by-id statement for control planes. The returned statement + * must contain a placeholder for the control plane id. + * + * @return the count-by-id statement template including the placeholder + */ + String countControlPlaneByIdTemplate(); } diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/DataFlowStatements.java b/src/main/java/org/eclipse/dataplane/port/store/sql/DataFlowStatements.java index 08edcc5..acfad03 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/DataFlowStatements.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/DataFlowStatements.java @@ -14,10 +14,30 @@ package org.eclipse.dataplane.port.store.sql; +import org.eclipse.dataplane.domain.dataflow.DataFlow; + +/** + * Provides templates for SQL statements for managing data flows. Used by the + * {@link SqlDataFlowStore} within PreparedStatements. Can be implemented for different SQL + * dialects. + */ public interface DataFlowStatements { - String upsertTemplate(); + /** + * Provides the template for an upsert statement for data flows. The returned statement must + * contain placeholders for all properties of a data flow in the order shown in + * {@link SqlDataFlowStore#save(DataFlow)}. + * + * @return the upsert statement template including the placeholders + */ + String upsertDataFlowTemplate(); - String findByIdTemplate(); + /** + * Provides the template for a find-by-id statement for data flows.The returned statement must + * contain a placeholder for the data flow id. + * + * @return the find-by-id statement template including the placeholder + */ + String findDataFlowByIdTemplate(); } diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlControlPlaneStatements.java b/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlControlPlaneStatements.java index fd78b3a..70b69a6 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlControlPlaneStatements.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlControlPlaneStatements.java @@ -16,7 +16,7 @@ public class PostgresqlControlPlaneStatements implements ControlPlaneStatements { @Override - public String upsertTemplate() { + public String upsertControlPlaneTemplate() { return "INSERT INTO control_planes (id, endpoint, auth) VALUES (?, ?, ?::json)" + " ON CONFLICT (id) DO UPDATE SET" + " endpoint = EXCLUDED.endpoint," @@ -24,17 +24,17 @@ public String upsertTemplate() { } @Override - public String findByIdTemplate() { + public String findControlPlaneByIdTemplate() { return "SELECT * FROM control_planes WHERE id = ?"; } @Override - public String deleteByIdTemplate() { + public String deleteControlPlaneByIdTemplate() { return "DELETE FROM control_planes WHERE id = ?"; } @Override - public String countByIdTemplate() { + public String countControlPlaneByIdTemplate() { return "SELECT COUNT(*) FROM control_planes WHERE id = ?"; } } diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlDataFlowStatements.java b/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlDataFlowStatements.java index b2bee0c..0602b55 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlDataFlowStatements.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlDataFlowStatements.java @@ -16,7 +16,7 @@ public class PostgresqlDataFlowStatements implements DataFlowStatements { @Override - public String upsertTemplate() { + public String upsertDataFlowTemplate() { return "INSERT INTO data_flows (id, transfer_type, type, state, dataset_id, agreement_id, participant_id," + " counter_party_id, dataspace_context, callback_address, suspension_reason, termination_reason," + " labels, metadata, data_address, controlplane_id) VALUES" @@ -40,7 +40,7 @@ public String upsertTemplate() { } @Override - public String findByIdTemplate() { + public String findDataFlowByIdTemplate() { return "SELECT * FROM data_flows WHERE id = ?"; } } diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/SqlControlPlaneStore.java b/src/main/java/org/eclipse/dataplane/port/store/sql/SqlControlPlaneStore.java index ab183a8..ac111da 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/SqlControlPlaneStore.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/SqlControlPlaneStore.java @@ -39,7 +39,7 @@ public SqlControlPlaneStore(ObjectMapper objectMapper, String databaseUrl, Strin public Result save(ControlPlane controlPlane) { var connection = getConnection(); - try (var statement = connection.prepareStatement(statements.upsertTemplate())) { + try (var statement = connection.prepareStatement(statements.upsertControlPlaneTemplate())) { statement.setString(1, controlPlane.getId()); statement.setString(2, controlPlane.getEndpoint().toString()); statement.setString(3, objectMapper.writeValueAsString(controlPlane.getAuthorization())); @@ -57,7 +57,7 @@ public Result save(ControlPlane controlPlane) { public Result findById(String controlplaneId) { var connection = getConnection(); - try (var statement = connection.prepareStatement(statements.findByIdTemplate())) { + try (var statement = connection.prepareStatement(statements.findControlPlaneByIdTemplate())) { statement.setString(1, controlplaneId); var resultSet = statement.executeQuery(); @@ -82,7 +82,7 @@ public Result findById(String controlplaneId) { public Result delete(String id) { var connection = getConnection(); - try (var statement = connection.prepareStatement(statements.deleteByIdTemplate())) { + try (var statement = connection.prepareStatement(statements.deleteControlPlaneByIdTemplate())) { statement.setString(1, id); var rows = statement.executeUpdate(); if (rows < 1) { @@ -100,7 +100,7 @@ public Result delete(String id) { public boolean exists(String controlplaneId) { var connection = getConnection(); - try (var statement = connection.prepareStatement(statements.countByIdTemplate())) { + try (var statement = connection.prepareStatement(statements.countControlPlaneByIdTemplate())) { statement.setString(1, controlplaneId); var resultSet = statement.executeQuery(); resultSet.next(); diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/SqlDataFlowStore.java b/src/main/java/org/eclipse/dataplane/port/store/sql/SqlDataFlowStore.java index 2662456..a31c363 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/SqlDataFlowStore.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/SqlDataFlowStore.java @@ -40,7 +40,7 @@ public SqlDataFlowStore(ObjectMapper objectMapper, String databaseUrl, String da public Result save(DataFlow dataFlow) { var connection = getConnection(); - try (var statement = connection.prepareStatement(statements.upsertTemplate())) { + try (var statement = connection.prepareStatement(statements.upsertDataFlowTemplate())) { statement.setString(1, dataFlow.getId()); statement.setString(2, dataFlow.getTransferType()); statement.setString(3, dataFlow.getType().name()); @@ -58,7 +58,6 @@ public Result save(DataFlow dataFlow) { statement.setString(15, objectMapper.writeValueAsString(dataFlow.getDataAddress())); statement.setString(16, dataFlow.getControlplaneId()); - statement.executeUpdate(); return Result.success(); } catch (Exception e) { @@ -72,7 +71,7 @@ public Result save(DataFlow dataFlow) { public Result findById(String flowId) { var connection = getConnection(); - try (var statement = connection.prepareStatement(statements.findByIdTemplate())) { + try (var statement = connection.prepareStatement(statements.findDataFlowByIdTemplate())) { statement.setString(1, flowId); var resultSet = statement.executeQuery(); From c9d18329496c8d5e78ddc30a7dfa4a1efc0c5e5f Mon Sep 17 00:00:00 2001 From: Ronja Quensel Date: Wed, 20 May 2026 13:24:00 +0200 Subject: [PATCH 6/9] chore: add builder methods for stores on Dataplane --- src/main/java/org/eclipse/dataplane/Dataplane.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/eclipse/dataplane/Dataplane.java b/src/main/java/org/eclipse/dataplane/Dataplane.java index a027ed8..05ec012 100644 --- a/src/main/java/org/eclipse/dataplane/Dataplane.java +++ b/src/main/java/org/eclipse/dataplane/Dataplane.java @@ -69,8 +69,8 @@ public class Dataplane { private final ObjectMapper objectMapper = new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false); - private final DataFlowStore dataFlowStore = new InMemoryDataFlowStore(objectMapper); - private final ControlPlaneStore controlPlaneStore = new InMemoryControlPlaneStore(objectMapper); + private DataFlowStore dataFlowStore = new InMemoryDataFlowStore(objectMapper); + private ControlPlaneStore controlPlaneStore = new InMemoryControlPlaneStore(objectMapper); private String id; private URI endpoint; private final Set transferTypes = new HashSet<>(); @@ -454,6 +454,16 @@ public Builder label(String label) { return this; } + public Builder dataFlowStore(DataFlowStore store) { + dataplane.dataFlowStore = store; + return this; + } + + public Builder controlPlaneStore(ControlPlaneStore store) { + dataplane.controlPlaneStore = store; + return this; + } + public Builder onPrepare(OnPrepare onPrepare) { dataplane.onPrepare = onPrepare; return this; From 3ae0c538cde8254b4deec29ce267e530990e92e2 Mon Sep 17 00:00:00 2001 From: Ronja Quensel Date: Fri, 22 May 2026 10:00:48 +0200 Subject: [PATCH 7/9] refactor: SQL stores --- .../port/exception/PersistenceException.java | 14 +++++++ .../port/store/sql/AbstractSqlStore.java | 39 +++++++++++++++++++ .../port/store/sql/DataFlowStatements.java | 3 +- .../port/store/sql/SqlControlPlaneStore.java | 4 +- .../port/store/sql/SqlDataFlowStore.java | 14 +++---- 5 files changed, 62 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/eclipse/dataplane/port/exception/PersistenceException.java b/src/main/java/org/eclipse/dataplane/port/exception/PersistenceException.java index 34e80a4..64156b0 100644 --- a/src/main/java/org/eclipse/dataplane/port/exception/PersistenceException.java +++ b/src/main/java/org/eclipse/dataplane/port/exception/PersistenceException.java @@ -1,3 +1,17 @@ +/* + * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation + * + */ + package org.eclipse.dataplane.port.exception; /** diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/AbstractSqlStore.java b/src/main/java/org/eclipse/dataplane/port/store/sql/AbstractSqlStore.java index c351d5d..dfb5e76 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/AbstractSqlStore.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/AbstractSqlStore.java @@ -14,12 +14,19 @@ package org.eclipse.dataplane.port.store.sql; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import org.eclipse.dataplane.port.exception.PersistenceException; import java.sql.Connection; import java.sql.DriverManager; +/** + * Base class for SQL-based store implementations that provides methods for common functionality + * like connection handling and JSON parsing. + */ public abstract class AbstractSqlStore { protected ObjectMapper objectMapper; @@ -50,4 +57,36 @@ protected void closeConnection(Connection connection) { throw new PersistenceException("Failed to commit transaction.", e); } } + + protected String toJson(Object object) { + if (object == null) { + return null; + } + + try { + return object instanceof String? object.toString() : objectMapper.writeValueAsString(object); + } catch (JsonProcessingException e) { + throw new PersistenceException("Failed to convert object to JSON.", e); + } + } + + protected T fromJson(String json, Class type) { + return fromJson(json, objectMapper.getTypeFactory().constructType(type)); + } + + protected T fromJson(String json, TypeReference type) { + return fromJson(json, objectMapper.getTypeFactory().constructType(type)); + } + + protected T fromJson(String json, JavaType type) { + if (json == null) { + return null; + } + + try { + return objectMapper.readValue(json, type); + } catch (JsonProcessingException e) { + throw new PersistenceException("Failed to convert JSON to object.", e); + } + } } diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/DataFlowStatements.java b/src/main/java/org/eclipse/dataplane/port/store/sql/DataFlowStatements.java index acfad03..33d7744 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/DataFlowStatements.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/DataFlowStatements.java @@ -25,8 +25,7 @@ public interface DataFlowStatements { /** * Provides the template for an upsert statement for data flows. The returned statement must - * contain placeholders for all properties of a data flow in the order shown in - * {@link SqlDataFlowStore#save(DataFlow)}. + * contain placeholders for all properties of a data flow. * * @return the upsert statement template including the placeholders */ diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/SqlControlPlaneStore.java b/src/main/java/org/eclipse/dataplane/port/store/sql/SqlControlPlaneStore.java index ac111da..d79a8b9 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/SqlControlPlaneStore.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/SqlControlPlaneStore.java @@ -42,7 +42,7 @@ public Result save(ControlPlane controlPlane) { try (var statement = connection.prepareStatement(statements.upsertControlPlaneTemplate())) { statement.setString(1, controlPlane.getId()); statement.setString(2, controlPlane.getEndpoint().toString()); - statement.setString(3, objectMapper.writeValueAsString(controlPlane.getAuthorization())); + statement.setString(3, toJson(controlPlane.getAuthorization())); statement.executeUpdate(); return Result.success(); @@ -68,7 +68,7 @@ public Result findById(String controlplaneId) { var controlplane = ControlPlane.newInstance() .id(controlplaneId) .endpoint(URI.create(resultSet.getString("endpoint"))) - .authorization(objectMapper.readValue(resultSet.getString("auth"), AuthorizationProfile.class)) + .authorization(fromJson(resultSet.getString("auth"), AuthorizationProfile.class)) .build(); return Result.success(controlplane); } catch (Exception e) { diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/SqlDataFlowStore.java b/src/main/java/org/eclipse/dataplane/port/store/sql/SqlDataFlowStore.java index a31c363..b5f89fe 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/SqlDataFlowStore.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/SqlDataFlowStore.java @@ -53,9 +53,9 @@ public Result save(DataFlow dataFlow) { statement.setString(10, dataFlow.getCallbackAddress().toString()); statement.setString(11, dataFlow.getSuspensionReason()); statement.setString(12, dataFlow.getTerminationReason()); - statement.setString(13, objectMapper.writeValueAsString(dataFlow.getLabels())); - statement.setString(14, objectMapper.writeValueAsString(dataFlow.getMetadata())); - statement.setString(15, objectMapper.writeValueAsString(dataFlow.getDataAddress())); + statement.setString(13, toJson(dataFlow.getLabels())); + statement.setString(14, toJson(dataFlow.getMetadata())); + statement.setString(15, toJson(dataFlow.getDataAddress())); statement.setString(16, dataFlow.getControlplaneId()); statement.executeUpdate(); @@ -89,11 +89,9 @@ public Result findById(String flowId) { .counterPartyId(resultSet.getString("counter_party_id")) .dataspaceContext(resultSet.getString("dataspace_context")) .callbackAddress(URI.create(resultSet.getString("callback_address"))) - .labels(objectMapper.readValue(resultSet.getString("labels"), new TypeReference<>() { - })) - .metadata(objectMapper.readValue(resultSet.getString("metadata"), new TypeReference<>() { - })) - .dataAddress(objectMapper.readValue(resultSet.getString("data_address"), DataAddress.class)) + .labels(fromJson(resultSet.getString("labels"), new TypeReference<>() {})) + .metadata(fromJson(resultSet.getString("metadata"), new TypeReference<>() {})) + .dataAddress(fromJson(resultSet.getString("data_address"), DataAddress.class)) .controlplaneId(resultSet.getString("controlplane_id")) .type(DataFlow.Type.valueOf(resultSet.getString("type"))) .build(); From 91d041974ebef61eb73f23ec13854114f684fcfc Mon Sep 17 00:00:00 2001 From: Ronja Quensel Date: Fri, 22 May 2026 11:21:34 +0200 Subject: [PATCH 8/9] chore: checkstyle --- .../port/store/sql/AbstractSqlStore.java | 2 +- .../port/store/sql/DataFlowStatements.java | 2 - .../sql/PostgresqlControlPlaneStatements.java | 8 ++-- .../sql/PostgresqlDataFlowStatements.java | 40 +++++++++---------- .../store/ControlPlaneStoreTestBase.java | 2 +- 5 files changed, 26 insertions(+), 28 deletions(-) diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/AbstractSqlStore.java b/src/main/java/org/eclipse/dataplane/port/store/sql/AbstractSqlStore.java index dfb5e76..58c8bbc 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/AbstractSqlStore.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/AbstractSqlStore.java @@ -64,7 +64,7 @@ protected String toJson(Object object) { } try { - return object instanceof String? object.toString() : objectMapper.writeValueAsString(object); + return object instanceof String ? object.toString() : objectMapper.writeValueAsString(object); } catch (JsonProcessingException e) { throw new PersistenceException("Failed to convert object to JSON.", e); } diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/DataFlowStatements.java b/src/main/java/org/eclipse/dataplane/port/store/sql/DataFlowStatements.java index 33d7744..9ba5b98 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/DataFlowStatements.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/DataFlowStatements.java @@ -14,8 +14,6 @@ package org.eclipse.dataplane.port.store.sql; -import org.eclipse.dataplane.domain.dataflow.DataFlow; - /** * Provides templates for SQL statements for managing data flows. Used by the * {@link SqlDataFlowStore} within PreparedStatements. Can be implemented for different SQL diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlControlPlaneStatements.java b/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlControlPlaneStatements.java index 70b69a6..c53aae3 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlControlPlaneStatements.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlControlPlaneStatements.java @@ -17,10 +17,10 @@ public class PostgresqlControlPlaneStatements implements ControlPlaneStatements { @Override public String upsertControlPlaneTemplate() { - return "INSERT INTO control_planes (id, endpoint, auth) VALUES (?, ?, ?::json)" - + " ON CONFLICT (id) DO UPDATE SET" - + " endpoint = EXCLUDED.endpoint," - + " auth = EXCLUDED.auth"; + return "INSERT INTO control_planes (id, endpoint, auth) VALUES (?, ?, ?::json)" + + " ON CONFLICT (id) DO UPDATE SET" + + " endpoint = EXCLUDED.endpoint," + + " auth = EXCLUDED.auth"; } @Override diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlDataFlowStatements.java b/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlDataFlowStatements.java index 0602b55..e342816 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlDataFlowStatements.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlDataFlowStatements.java @@ -17,26 +17,26 @@ public class PostgresqlDataFlowStatements implements DataFlowStatements { @Override public String upsertDataFlowTemplate() { - return "INSERT INTO data_flows (id, transfer_type, type, state, dataset_id, agreement_id, participant_id," - + " counter_party_id, dataspace_context, callback_address, suspension_reason, termination_reason," - + " labels, metadata, data_address, controlplane_id) VALUES" - + " (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::json, ?::json, ?::json, ?)" - + " ON CONFLICT (id) DO UPDATE SET" - + " transfer_type = EXCLUDED.transfer_type," - + " type = EXCLUDED.type," - + " state = EXCLUDED.state," - + " dataset_id = EXCLUDED.dataset_id," - + " agreement_id = EXCLUDED.agreement_id," - + " participant_id = EXCLUDED.participant_id," - + " counter_party_id = EXCLUDED.counter_party_id," - + " dataspace_context = EXCLUDED.dataspace_context," - + " callback_address = EXCLUDED.callback_address," - + " suspension_reason = EXCLUDED.suspension_reason," - + " termination_reason = EXCLUDED.termination_reason," - + " labels = EXCLUDED.labels," - + " metadata = EXCLUDED.metadata," - + " data_address = EXCLUDED.data_address," - + " controlplane_id = EXCLUDED.controlplane_id"; + return "INSERT INTO data_flows (id, transfer_type, type, state, dataset_id, agreement_id, participant_id," + + " counter_party_id, dataspace_context, callback_address, suspension_reason, termination_reason," + + " labels, metadata, data_address, controlplane_id) VALUES" + + " (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::json, ?::json, ?::json, ?)" + + " ON CONFLICT (id) DO UPDATE SET" + + " transfer_type = EXCLUDED.transfer_type," + + " type = EXCLUDED.type," + + " state = EXCLUDED.state," + + " dataset_id = EXCLUDED.dataset_id," + + " agreement_id = EXCLUDED.agreement_id," + + " participant_id = EXCLUDED.participant_id," + + " counter_party_id = EXCLUDED.counter_party_id," + + " dataspace_context = EXCLUDED.dataspace_context," + + " callback_address = EXCLUDED.callback_address," + + " suspension_reason = EXCLUDED.suspension_reason," + + " termination_reason = EXCLUDED.termination_reason," + + " labels = EXCLUDED.labels," + + " metadata = EXCLUDED.metadata," + + " data_address = EXCLUDED.data_address," + + " controlplane_id = EXCLUDED.controlplane_id"; } @Override diff --git a/src/test/java/org/eclipse/dataplane/store/ControlPlaneStoreTestBase.java b/src/test/java/org/eclipse/dataplane/store/ControlPlaneStoreTestBase.java index 01c1449..94e8d57 100644 --- a/src/test/java/org/eclipse/dataplane/store/ControlPlaneStoreTestBase.java +++ b/src/test/java/org/eclipse/dataplane/store/ControlPlaneStoreTestBase.java @@ -106,7 +106,7 @@ void delete_exists_shouldDelete() { @Test void delete_doesNotExist_shouldReturnNotFound() { - var deleteResult = store().delete("nonExistent"); + var deleteResult = store().delete("nonExistent"); assertThat(deleteResult.failed()).isTrue(); assertThat(deleteResult.getException()).isInstanceOf(ResourceNotFoundException.class); From d82d9bf47b0280b47c31d6c406ccff0bbe3686a0 Mon Sep 17 00:00:00 2001 From: Ronja Quensel Date: Tue, 26 May 2026 08:52:34 +0200 Subject: [PATCH 9/9] chore: PR remarks --- .../java/org/eclipse/dataplane/Dataplane.java | 11 ++-- .../dataplane/domain/dataflow/DataFlow.java | 10 ++++ .../eclipse/dataplane/port/store/Stores.java | 24 ++++++++ .../store/sql/ControlPlaneStatements.java | 59 ------------------- .../port/store/sql/DataFlowStatements.java | 40 ------------- ...re.java => PostgresControlPlaneStore.java} | 33 ++++++++--- ...wStore.java => PostgresDataFlowStore.java} | 46 +++++++++++---- .../sql/PostgresqlControlPlaneStatements.java | 40 ------------- .../sql/PostgresqlDataFlowStatements.java | 46 --------------- .../sql/PostgresControlPlaneStoreTest.java | 7 +-- .../store/sql/PostgresDataFlowStoreTest.java | 7 +-- 11 files changed, 101 insertions(+), 222 deletions(-) create mode 100644 src/main/java/org/eclipse/dataplane/port/store/Stores.java delete mode 100644 src/main/java/org/eclipse/dataplane/port/store/sql/ControlPlaneStatements.java delete mode 100644 src/main/java/org/eclipse/dataplane/port/store/sql/DataFlowStatements.java rename src/main/java/org/eclipse/dataplane/port/store/sql/{SqlControlPlaneStore.java => PostgresControlPlaneStore.java} (75%) rename src/main/java/org/eclipse/dataplane/port/store/sql/{SqlDataFlowStore.java => PostgresDataFlowStore.java} (68%) delete mode 100644 src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlControlPlaneStatements.java delete mode 100644 src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlDataFlowStatements.java diff --git a/src/main/java/org/eclipse/dataplane/Dataplane.java b/src/main/java/org/eclipse/dataplane/Dataplane.java index 05ec012..94a3185 100644 --- a/src/main/java/org/eclipse/dataplane/Dataplane.java +++ b/src/main/java/org/eclipse/dataplane/Dataplane.java @@ -51,6 +51,7 @@ import org.eclipse.dataplane.port.store.DataFlowStore; import org.eclipse.dataplane.port.store.InMemoryControlPlaneStore; import org.eclipse.dataplane.port.store.InMemoryDataFlowStore; +import org.eclipse.dataplane.port.store.Stores; import java.net.URI; import java.net.http.HttpClient; @@ -454,13 +455,9 @@ public Builder label(String label) { return this; } - public Builder dataFlowStore(DataFlowStore store) { - dataplane.dataFlowStore = store; - return this; - } - - public Builder controlPlaneStore(ControlPlaneStore store) { - dataplane.controlPlaneStore = store; + public Builder stores(Stores stores) { + dataplane.dataFlowStore = stores.dataFlowStore(); + dataplane.controlPlaneStore = stores.controlPlaneStore(); return this; } diff --git a/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java b/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java index c5efead..b1b8f36 100644 --- a/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java +++ b/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java @@ -243,6 +243,16 @@ public Builder callbackAddress(URI callbackAddress) { return this; } + public Builder suspensionReason(String suspensionReason) { + dataFlow.suspensionReason = suspensionReason; + return this; + } + + public Builder terminationReason(String terminationReason) { + dataFlow.terminationReason = terminationReason; + return this; + } + public Builder metadata(Map metadata) { dataFlow.metadata = metadata; return this; diff --git a/src/main/java/org/eclipse/dataplane/port/store/Stores.java b/src/main/java/org/eclipse/dataplane/port/store/Stores.java new file mode 100644 index 0000000..8bf2693 --- /dev/null +++ b/src/main/java/org/eclipse/dataplane/port/store/Stores.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation + * + */ + +package org.eclipse.dataplane.port.store; + +/** + * Data class that bundles the stores used by the dataplane. + * + * @param dataFlowStore store for data flows + * @param controlPlaneStore store for control planes + */ +public record Stores(DataFlowStore dataFlowStore, ControlPlaneStore controlPlaneStore) { +} diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/ControlPlaneStatements.java b/src/main/java/org/eclipse/dataplane/port/store/sql/ControlPlaneStatements.java deleted file mode 100644 index 58d91b2..0000000 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/ControlPlaneStatements.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation - * - */ - -package org.eclipse.dataplane.port.store.sql; - -import org.eclipse.dataplane.domain.controlplane.ControlPlane; - -/** - * Provides templates for SQL statements for managing control planes. Used by the - * {@link SqlControlPlaneStore} within PreparedStatements. Can be implemented for different SQL - * dialects. - */ -public interface ControlPlaneStatements { - - /** - * Provides the template for an upsert statement for control planes. The returned statement must - * contain placeholders for all properties of a control plane. - * - * @return the upsert statement template including the placeholders - */ - String upsertControlPlaneTemplate(); - - /** - * Provides the template for a find-by-id statement for control planes.The returned statement - * must contain a placeholder for the control plane id. - * - * @return the find-by-id statement template including the placeholder - */ - String findControlPlaneByIdTemplate(); - - /** - * Provides the template for a delete-by-id statement for control planes. The returned statement - * must contain a placeholder for the control plane id in the order shown in - * {@link SqlControlPlaneStore#save(ControlPlane)}. - * - * @return the delete-by-id statement template including the placeholder - */ - String deleteControlPlaneByIdTemplate(); - - /** - * Provides the template for a count-by-id statement for control planes. The returned statement - * must contain a placeholder for the control plane id. - * - * @return the count-by-id statement template including the placeholder - */ - String countControlPlaneByIdTemplate(); - -} diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/DataFlowStatements.java b/src/main/java/org/eclipse/dataplane/port/store/sql/DataFlowStatements.java deleted file mode 100644 index 9ba5b98..0000000 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/DataFlowStatements.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation - * - */ - -package org.eclipse.dataplane.port.store.sql; - -/** - * Provides templates for SQL statements for managing data flows. Used by the - * {@link SqlDataFlowStore} within PreparedStatements. Can be implemented for different SQL - * dialects. - */ -public interface DataFlowStatements { - - /** - * Provides the template for an upsert statement for data flows. The returned statement must - * contain placeholders for all properties of a data flow. - * - * @return the upsert statement template including the placeholders - */ - String upsertDataFlowTemplate(); - - /** - * Provides the template for a find-by-id statement for data flows.The returned statement must - * contain a placeholder for the data flow id. - * - * @return the find-by-id statement template including the placeholder - */ - String findDataFlowByIdTemplate(); - -} diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/SqlControlPlaneStore.java b/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresControlPlaneStore.java similarity index 75% rename from src/main/java/org/eclipse/dataplane/port/store/sql/SqlControlPlaneStore.java rename to src/main/java/org/eclipse/dataplane/port/store/sql/PostgresControlPlaneStore.java index d79a8b9..026bdec 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/SqlControlPlaneStore.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresControlPlaneStore.java @@ -26,20 +26,17 @@ import static java.lang.String.format; -public class SqlControlPlaneStore extends AbstractSqlStore implements ControlPlaneStore { +public class PostgresControlPlaneStore extends AbstractSqlStore implements ControlPlaneStore { - private final ControlPlaneStatements statements; - - public SqlControlPlaneStore(ObjectMapper objectMapper, String databaseUrl, String databaseUsername, String databasePassword, ControlPlaneStatements statements) { + public PostgresControlPlaneStore(ObjectMapper objectMapper, String databaseUrl, String databaseUsername, String databasePassword) { super(objectMapper, databaseUrl, databaseUsername, databasePassword); - this.statements = statements; } @Override public Result save(ControlPlane controlPlane) { var connection = getConnection(); - try (var statement = connection.prepareStatement(statements.upsertControlPlaneTemplate())) { + try (var statement = connection.prepareStatement(upsertControlPlaneTemplate())) { statement.setString(1, controlPlane.getId()); statement.setString(2, controlPlane.getEndpoint().toString()); statement.setString(3, toJson(controlPlane.getAuthorization())); @@ -57,7 +54,7 @@ public Result save(ControlPlane controlPlane) { public Result findById(String controlplaneId) { var connection = getConnection(); - try (var statement = connection.prepareStatement(statements.findControlPlaneByIdTemplate())) { + try (var statement = connection.prepareStatement(findControlPlaneByIdTemplate())) { statement.setString(1, controlplaneId); var resultSet = statement.executeQuery(); @@ -82,7 +79,7 @@ public Result findById(String controlplaneId) { public Result delete(String id) { var connection = getConnection(); - try (var statement = connection.prepareStatement(statements.deleteControlPlaneByIdTemplate())) { + try (var statement = connection.prepareStatement(deleteControlPlaneByIdTemplate())) { statement.setString(1, id); var rows = statement.executeUpdate(); if (rows < 1) { @@ -100,7 +97,7 @@ public Result delete(String id) { public boolean exists(String controlplaneId) { var connection = getConnection(); - try (var statement = connection.prepareStatement(statements.countControlPlaneByIdTemplate())) { + try (var statement = connection.prepareStatement(countControlPlaneByIdTemplate())) { statement.setString(1, controlplaneId); var resultSet = statement.executeQuery(); resultSet.next(); @@ -112,4 +109,22 @@ public boolean exists(String controlplaneId) { } } + private String upsertControlPlaneTemplate() { + return "INSERT INTO control_planes (id, endpoint, auth) VALUES (?, ?, ?::json)" + + " ON CONFLICT (id) DO UPDATE SET" + + " endpoint = EXCLUDED.endpoint," + + " auth = EXCLUDED.auth"; + } + + private String findControlPlaneByIdTemplate() { + return "SELECT * FROM control_planes WHERE id = ?"; + } + + private String deleteControlPlaneByIdTemplate() { + return "DELETE FROM control_planes WHERE id = ?"; + } + + private String countControlPlaneByIdTemplate() { + return "SELECT COUNT(*) FROM control_planes WHERE id = ?"; + } } diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/SqlDataFlowStore.java b/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresDataFlowStore.java similarity index 68% rename from src/main/java/org/eclipse/dataplane/port/store/sql/SqlDataFlowStore.java rename to src/main/java/org/eclipse/dataplane/port/store/sql/PostgresDataFlowStore.java index b5f89fe..f49b81f 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/SqlDataFlowStore.java +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresDataFlowStore.java @@ -27,20 +27,17 @@ import static java.lang.String.format; -public class SqlDataFlowStore extends AbstractSqlStore implements DataFlowStore { +public class PostgresDataFlowStore extends AbstractSqlStore implements DataFlowStore { - private final DataFlowStatements statements; - - public SqlDataFlowStore(ObjectMapper objectMapper, String databaseUrl, String databaseUsername, String databasePassword, DataFlowStatements statements) { + public PostgresDataFlowStore(ObjectMapper objectMapper, String databaseUrl, String databaseUsername, String databasePassword) { super(objectMapper, databaseUrl, databaseUsername, databasePassword); - this.statements = statements; } @Override public Result save(DataFlow dataFlow) { var connection = getConnection(); - try (var statement = connection.prepareStatement(statements.upsertDataFlowTemplate())) { + try (var statement = connection.prepareStatement(upsertDataFlowTemplate())) { statement.setString(1, dataFlow.getId()); statement.setString(2, dataFlow.getTransferType()); statement.setString(3, dataFlow.getType().name()); @@ -71,7 +68,7 @@ public Result save(DataFlow dataFlow) { public Result findById(String flowId) { var connection = getConnection(); - try (var statement = connection.prepareStatement(statements.findDataFlowByIdTemplate())) { + try (var statement = connection.prepareStatement(findDataFlowByIdTemplate())) { statement.setString(1, flowId); var resultSet = statement.executeQuery(); @@ -89,6 +86,8 @@ public Result findById(String flowId) { .counterPartyId(resultSet.getString("counter_party_id")) .dataspaceContext(resultSet.getString("dataspace_context")) .callbackAddress(URI.create(resultSet.getString("callback_address"))) + .suspensionReason(resultSet.getString("suspension_reason")) + .terminationReason(resultSet.getString("termination_reason")) .labels(fromJson(resultSet.getString("labels"), new TypeReference<>() {})) .metadata(fromJson(resultSet.getString("metadata"), new TypeReference<>() {})) .dataAddress(fromJson(resultSet.getString("data_address"), DataAddress.class)) @@ -96,12 +95,6 @@ public Result findById(String flowId) { .type(DataFlow.Type.valueOf(resultSet.getString("type"))) .build(); - if (dataFlow.getState() == DataFlow.State.SUSPENDED) { - dataFlow.transitionToSuspended(resultSet.getString("suspension_reason")); - } else if (dataFlow.getState() == DataFlow.State.TERMINATED) { - dataFlow.transitionToTerminated(resultSet.getString("termination_reason")); - } - return Result.success(dataFlow); } catch (Exception e) { return Result.failure(new PersistenceException(format("Failed to read DataFlow with id %s.", flowId), e)); @@ -109,4 +102,31 @@ public Result findById(String flowId) { closeConnection(connection); } } + + private String upsertDataFlowTemplate() { + return "INSERT INTO data_flows (id, transfer_type, type, state, dataset_id, agreement_id, participant_id," + + " counter_party_id, dataspace_context, callback_address, suspension_reason, termination_reason," + + " labels, metadata, data_address, controlplane_id) VALUES" + + " (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::json, ?::json, ?::json, ?)" + + " ON CONFLICT (id) DO UPDATE SET" + + " transfer_type = EXCLUDED.transfer_type," + + " type = EXCLUDED.type," + + " state = EXCLUDED.state," + + " dataset_id = EXCLUDED.dataset_id," + + " agreement_id = EXCLUDED.agreement_id," + + " participant_id = EXCLUDED.participant_id," + + " counter_party_id = EXCLUDED.counter_party_id," + + " dataspace_context = EXCLUDED.dataspace_context," + + " callback_address = EXCLUDED.callback_address," + + " suspension_reason = EXCLUDED.suspension_reason," + + " termination_reason = EXCLUDED.termination_reason," + + " labels = EXCLUDED.labels," + + " metadata = EXCLUDED.metadata," + + " data_address = EXCLUDED.data_address," + + " controlplane_id = EXCLUDED.controlplane_id"; + } + + private String findDataFlowByIdTemplate() { + return "SELECT * FROM data_flows WHERE id = ?"; + } } diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlControlPlaneStatements.java b/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlControlPlaneStatements.java deleted file mode 100644 index c53aae3..0000000 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlControlPlaneStatements.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation - * - */ - -package org.eclipse.dataplane.port.store.sql; - -public class PostgresqlControlPlaneStatements implements ControlPlaneStatements { - @Override - public String upsertControlPlaneTemplate() { - return "INSERT INTO control_planes (id, endpoint, auth) VALUES (?, ?, ?::json)" + - " ON CONFLICT (id) DO UPDATE SET" + - " endpoint = EXCLUDED.endpoint," + - " auth = EXCLUDED.auth"; - } - - @Override - public String findControlPlaneByIdTemplate() { - return "SELECT * FROM control_planes WHERE id = ?"; - } - - @Override - public String deleteControlPlaneByIdTemplate() { - return "DELETE FROM control_planes WHERE id = ?"; - } - - @Override - public String countControlPlaneByIdTemplate() { - return "SELECT COUNT(*) FROM control_planes WHERE id = ?"; - } -} diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlDataFlowStatements.java b/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlDataFlowStatements.java deleted file mode 100644 index e342816..0000000 --- a/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresqlDataFlowStatements.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation - * - */ - -package org.eclipse.dataplane.port.store.sql; - -public class PostgresqlDataFlowStatements implements DataFlowStatements { - @Override - public String upsertDataFlowTemplate() { - return "INSERT INTO data_flows (id, transfer_type, type, state, dataset_id, agreement_id, participant_id," + - " counter_party_id, dataspace_context, callback_address, suspension_reason, termination_reason," + - " labels, metadata, data_address, controlplane_id) VALUES" + - " (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::json, ?::json, ?::json, ?)" + - " ON CONFLICT (id) DO UPDATE SET" + - " transfer_type = EXCLUDED.transfer_type," + - " type = EXCLUDED.type," + - " state = EXCLUDED.state," + - " dataset_id = EXCLUDED.dataset_id," + - " agreement_id = EXCLUDED.agreement_id," + - " participant_id = EXCLUDED.participant_id," + - " counter_party_id = EXCLUDED.counter_party_id," + - " dataspace_context = EXCLUDED.dataspace_context," + - " callback_address = EXCLUDED.callback_address," + - " suspension_reason = EXCLUDED.suspension_reason," + - " termination_reason = EXCLUDED.termination_reason," + - " labels = EXCLUDED.labels," + - " metadata = EXCLUDED.metadata," + - " data_address = EXCLUDED.data_address," + - " controlplane_id = EXCLUDED.controlplane_id"; - } - - @Override - public String findDataFlowByIdTemplate() { - return "SELECT * FROM data_flows WHERE id = ?"; - } -} diff --git a/src/test/java/org/eclipse/dataplane/store/sql/PostgresControlPlaneStoreTest.java b/src/test/java/org/eclipse/dataplane/store/sql/PostgresControlPlaneStoreTest.java index 8840be0..a7ef3c0 100644 --- a/src/test/java/org/eclipse/dataplane/store/sql/PostgresControlPlaneStoreTest.java +++ b/src/test/java/org/eclipse/dataplane/store/sql/PostgresControlPlaneStoreTest.java @@ -16,8 +16,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.eclipse.dataplane.port.store.ControlPlaneStore; -import org.eclipse.dataplane.port.store.sql.PostgresqlControlPlaneStatements; -import org.eclipse.dataplane.port.store.sql.SqlControlPlaneStore; +import org.eclipse.dataplane.port.store.sql.PostgresControlPlaneStore; import org.eclipse.dataplane.store.ControlPlaneStoreTestBase; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -37,7 +36,7 @@ class PostgresControlPlaneStoreTest extends ControlPlaneStoreTestBase { private static final String PASSWORD = "password"; private final ObjectMapper mapper = new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false); - private SqlControlPlaneStore store; + private PostgresControlPlaneStore store; @Container static PostgreSQLContainer postgres = new PostgreSQLContainer(POSTGRES_IMAGE) @@ -59,7 +58,7 @@ static void cleanUp() { @BeforeEach void initStore() { - store = new SqlControlPlaneStore(mapper, postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword(), new PostgresqlControlPlaneStatements()); + store = new PostgresControlPlaneStore(mapper, postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword()); } @Override diff --git a/src/test/java/org/eclipse/dataplane/store/sql/PostgresDataFlowStoreTest.java b/src/test/java/org/eclipse/dataplane/store/sql/PostgresDataFlowStoreTest.java index 445a69d..e5c060d 100644 --- a/src/test/java/org/eclipse/dataplane/store/sql/PostgresDataFlowStoreTest.java +++ b/src/test/java/org/eclipse/dataplane/store/sql/PostgresDataFlowStoreTest.java @@ -16,8 +16,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.eclipse.dataplane.port.store.DataFlowStore; -import org.eclipse.dataplane.port.store.sql.PostgresqlDataFlowStatements; -import org.eclipse.dataplane.port.store.sql.SqlDataFlowStore; +import org.eclipse.dataplane.port.store.sql.PostgresDataFlowStore; import org.eclipse.dataplane.store.DataFlowStoreTestBase; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -37,7 +36,7 @@ class PostgresDataFlowStoreTest extends DataFlowStoreTestBase { private static final String PASSWORD = "password"; private final ObjectMapper mapper = new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false); - private SqlDataFlowStore store; + private PostgresDataFlowStore store; @Container static PostgreSQLContainer postgres = new PostgreSQLContainer(POSTGRES_IMAGE) @@ -59,7 +58,7 @@ static void cleanUp() { @BeforeEach void initStore() { - store = new SqlDataFlowStore(mapper, postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword(), new PostgresqlDataFlowStatements()); + store = new PostgresDataFlowStore(mapper, postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword()); } @Override