diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TTLManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TTLManager.java index dc6ae4f37a194..284b77004beac 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TTLManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TTLManager.java @@ -127,6 +127,10 @@ public Map getAllTTL() { return ((ShowTTLResp) showTTL(new ShowTTLPlan())).getPathTTLMap(); } + public long getTTL(final String[] pathPattern) { + return ttlInfo.getTTL(pathPattern); + } + public int getTTLCount() { return ttlInfo.getTTLCount(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java index 7b98ebba50b87..4921b03851c0d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java @@ -70,13 +70,15 @@ public TSStatus setTTL(SetTTLPlan plan) { try { // check ttl rule capacity final int tTlRuleCapacity = CommonDescriptor.getInstance().getConfig().getTTlRuleCapacity(); - if (getTTLCount() >= tTlRuleCapacity) { + final int newTTLRuleCount = calculateNewTTLRuleCount(plan); + final int requestedTTLRuleCount = ttlCache.getTtlCount() + newTTLRuleCount; + if (newTTLRuleCount > 0 && requestedTTLRuleCount > tTlRuleCapacity) { TSStatus errorStatus = new TSStatus(TSStatusCode.OVERSIZE_TTL.getStatusCode()); errorStatus.setMessage( String.format( - "The number of TTL rules has reached the limit (%d). Please delete " - + "some existing rules first.", - tTlRuleCapacity)); + "The number of TTL rules has reached the limit " + + "(capacity: %d, requested total: %d). Please delete some existing rules first.", + tTlRuleCapacity, requestedTTLRuleCount)); return errorStatus; } ttlCache.setTTL(plan.getPathPattern(), plan.getTTL()); @@ -92,6 +94,20 @@ public TSStatus setTTL(SetTTLPlan plan) { return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } + private int calculateNewTTLRuleCount(SetTTLPlan plan) { + int newTTLRuleCount = isNewTTLRule(plan.getPathPattern()) ? 1 : 0; + if (plan.isDataBase()) { + String[] pathNodes = Arrays.copyOf(plan.getPathPattern(), plan.getPathPattern().length + 1); + pathNodes[pathNodes.length - 1] = IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD; + newTTLRuleCount += isNewTTLRule(pathNodes) ? 1 : 0; + } + return newTTLRuleCount; + } + + private boolean isNewTTLRule(String[] pathNodes) { + return ttlCache.getLastNodeTTL(pathNodes) == TTLCache.NULL_TTL; + } + /** Only used for upgrading from database level ttl to device level ttl. */ public void setTTL(Map databaseTTLMap) throws IllegalPathException { lock.writeLock().lock(); @@ -159,6 +175,15 @@ public int getTTLCount() { } } + public long getTTL(final String[] pathPattern) { + lock.readLock().lock(); + try { + return ttlCache.getLastNodeTTL(pathPattern); + } finally { + lock.readLock().unlock(); + } + } + /** * Get the maximum ttl of the corresponding database level. * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java index b90f2df87d5c3..dca79a02366f6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java @@ -22,8 +22,10 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSetTTLReq; +import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.schema.ttl.TTLCache; import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType; import org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager; import org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext; @@ -47,14 +49,21 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Collections; import java.util.Map; import java.util.Objects; public class SetTTLProcedure extends StateMachineProcedure { private static final Logger LOGGER = LoggerFactory.getLogger(SetTTLProcedure.class); + // Distinguishes no previous TTL from TTLCache.NULL_TTL, the explicit unset marker for rollback. + private static final long TTL_NOT_EXIST = Long.MIN_VALUE; + private static final int ROLLBACK_STATE_BYTES = Byte.BYTES + Long.BYTES * 2; private SetTTLPlan plan; + private long previousTTL = TTL_NOT_EXIST; + private long previousDatabaseWildcardTTL = TTL_NOT_EXIST; + private boolean previousTTLStateCaptured = false; public SetTTLProcedure(final boolean isGeneratedByPipe) { super(isGeneratedByPipe); @@ -71,6 +80,10 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, SetTTLState state) long startTime = System.currentTimeMillis(); try { switch (state) { + case CAPTURE_PREVIOUS_TTL: + capturePreviousTTLState(env); + setNextState(SetTTLState.SET_CONFIGNODE_TTL); + return Flow.HAS_MORE_STATE; case SET_CONFIGNODE_TTL: setConfigNodeTTL(env); return Flow.HAS_MORE_STATE; @@ -86,18 +99,13 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, SetTTLState state) } } - private void setConfigNodeTTL(ConfigNodeProcedureEnv env) { - TSStatus res; - try { - res = - env.getConfigManager() - .getConsensusManager() - .write(isGeneratedByPipe ? new PipeEnrichedPlan(this.plan) : this.plan); - } catch (ConsensusException e) { - LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE, e); - res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); - res.setMessage(e.getMessage()); + void setConfigNodeTTL(final ConfigNodeProcedureEnv env) { + if (!previousTTLStateCaptured) { + capturePreviousTTLState(env); + setNextState(SetTTLState.SET_CONFIGNODE_TTL); + return; } + final TSStatus res = writeConfigNodePlan(env, plan); if (res.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.info(ProcedureMessages.FAILED_TO_EXECUTE_PLAN_BECAUSE, plan, res.message); setFailure(new ProcedureException(new IoTDBException(res))); @@ -106,35 +114,177 @@ private void setConfigNodeTTL(ConfigNodeProcedureEnv env) { } } - private void updateDataNodeTTL(ConfigNodeProcedureEnv env) { - Map dataNodeLocationMap = + void updateDataNodeTTL(final ConfigNodeProcedureEnv env) { + final Map dataNodeLocationMap = env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations(); - DataNodeAsyncRequestContext clientHandler = - new DataNodeAsyncRequestContext<>( - CnToDnAsyncRequestType.SET_TTL, - new TSetTTLReq( - Collections.singletonList(String.join(".", plan.getPathPattern())), - plan.getTTL(), - plan.isDataBase()), - dataNodeLocationMap); + final DataNodeAsyncRequestContext clientHandler = + sendTTLRequest( + dataNodeLocationMap, + buildSetTTLReq(plan.getPathPattern(), plan.getTTL(), plan.isDataBase())); + if (hasFailedDataNode(clientHandler)) { + LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE); + setFailure( + new ProcedureException( + new MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED))); + } + } + + private void capturePreviousTTLState(final ConfigNodeProcedureEnv env) { + if (previousTTLStateCaptured) { + return; + } + previousTTL = getTTLOrDefault(env, plan.getPathPattern()); + if (plan.isDataBase()) { + previousDatabaseWildcardTTL = + getTTLOrDefault(env, getDatabaseWildcardPathPattern(plan.getPathPattern())); + } + previousTTLStateCaptured = true; + } + + TSStatus writeConfigNodePlan(final ConfigNodeProcedureEnv env, final SetTTLPlan setTTLPlan) { + try { + return env.getConfigManager() + .getConsensusManager() + .write(isGeneratedByPipe ? new PipeEnrichedPlan(setTTLPlan) : setTTLPlan); + } catch (ConsensusException e) { + LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE, e); + final TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + return res; + } + } + + DataNodeAsyncRequestContext sendTTLRequest( + final Map dataNodeLocationMap, final TSetTTLReq req) { + final DataNodeAsyncRequestContext clientHandler = + new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.SET_TTL, req, dataNodeLocationMap); CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler); - Map statusMap = clientHandler.getResponseMap(); - for (TSStatus status : statusMap.values()) { - // all dataNodes must clear the related schemaengine cache + return clientHandler; + } + + private TSetTTLReq buildSetTTLReq( + final String[] pathPattern, final long ttl, final boolean isDataBase) { + return new TSetTTLReq( + Collections.singletonList(String.join(".", pathPattern)), ttl, isDataBase); + } + + private boolean hasFailedDataNode( + final DataNodeAsyncRequestContext clientHandler) { + if (!clientHandler.getRequestIndices().isEmpty()) { + return true; + } + for (TSStatus status : clientHandler.getResponseMap().values()) { if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE); - setFailure( - new ProcedureException( - new MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED))); - return; + return true; } } + return false; } + private long getTTLOrDefault(final ConfigNodeProcedureEnv env, final String[] pathPattern) { + final long ttl = env.getConfigManager().getTTLManager().getTTL(pathPattern); + return ttl == TTLCache.NULL_TTL ? TTL_NOT_EXIST : ttl; + } + + private String[] getDatabaseWildcardPathPattern(final String[] pathPattern) { + final String[] pathNodes = Arrays.copyOf(pathPattern, pathPattern.length + 1); + pathNodes[pathNodes.length - 1] = IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD; + return pathNodes; + } + + private void rollbackConfigNodeTTL(final ConfigNodeProcedureEnv env) throws ProcedureException { + restoreTTLOnConfigNode(env, plan.getPathPattern(), previousTTL); + if (plan.isDataBase()) { + restoreTTLOnConfigNode( + env, getDatabaseWildcardPathPattern(plan.getPathPattern()), previousDatabaseWildcardTTL); + } + } + + private void restoreTTLOnConfigNode( + final ConfigNodeProcedureEnv env, final String[] pathPattern, final long ttl) + throws ProcedureException { + // TTL_NOT_EXIST means the original ttl was absent; NULL_TTL asks the executor to unset it. + final SetTTLPlan rollbackPlan = + new SetTTLPlan(pathPattern, ttl == TTL_NOT_EXIST ? TTLCache.NULL_TTL : ttl); + // Database rollback restores the database path and db.** separately, so avoid auto-expansion. + rollbackPlan.setDataBase(false); + final TSStatus status = writeConfigNodePlan(env, rollbackPlan); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new ProcedureException( + new MetadataException( + "Rollback ConfigNode ttl failed for " + + String.join(".", pathPattern) + + ": " + + status.getMessage())); + } + } + + private void rollbackDataNodeTTL(final ConfigNodeProcedureEnv env) throws ProcedureException { + final Map dataNodeLocationMap = + env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations(); + restoreTTLOnDataNodes(dataNodeLocationMap, plan.getPathPattern(), previousTTL); + if (plan.isDataBase()) { + restoreTTLOnDataNodes( + dataNodeLocationMap, + getDatabaseWildcardPathPattern(plan.getPathPattern()), + previousDatabaseWildcardTTL); + } + } + + private void restoreTTLOnDataNodes( + final Map dataNodeLocationMap, + final String[] pathPattern, + final long ttl) + throws ProcedureException { + if (dataNodeLocationMap.isEmpty()) { + return; + } + final DataNodeAsyncRequestContext clientHandler = + sendTTLRequest( + dataNodeLocationMap, + buildSetTTLReq(pathPattern, ttl == TTL_NOT_EXIST ? TTLCache.NULL_TTL : ttl, false)); + if (hasFailedDataNode(clientHandler)) { + throw new ProcedureException( + new MetadataException( + "Rollback dataNode ttl cache failed for " + String.join(".", pathPattern))); + } + } + + /** + * Best-effort rollback: restore both sides, throw the earliest failure, and suppress later ones. + */ @Override - protected void rollbackState( - ConfigNodeProcedureEnv configNodeProcedureEnv, SetTTLState setTTLState) - throws IOException, InterruptedException, ProcedureException {} + protected void rollbackState(final ConfigNodeProcedureEnv env, final SetTTLState setTTLState) + throws IOException, InterruptedException, ProcedureException { + if (setTTLState != SetTTLState.UPDATE_DATANODE_CACHE || !previousTTLStateCaptured) { + return; + } + ProcedureException rollbackFailure = null; + try { + rollbackConfigNodeTTL(env); + } catch (ProcedureException e) { + LOGGER.error("Failed to rollback ConfigNode ttl state.", e); + rollbackFailure = e; + } + try { + rollbackDataNodeTTL(env); + } catch (ProcedureException e) { + LOGGER.error("Failed to rollback DataNode ttl cache.", e); + if (rollbackFailure == null) { + rollbackFailure = e; + } else { + rollbackFailure.addSuppressed(e); + } + } + if (rollbackFailure != null) { + throw rollbackFailure; + } + } + + @Override + protected boolean isRollbackSupported(final SetTTLState state) { + return state == SetTTLState.UPDATE_DATANODE_CACHE; + } @Override protected SetTTLState getState(int stateId) { @@ -148,7 +298,7 @@ protected int getStateId(SetTTLState setTTLState) { @Override protected SetTTLState getInitialState() { - return SetTTLState.SET_CONFIGNODE_TTL; + return SetTTLState.CAPTURE_PREVIOUS_TTL; } @Override @@ -159,14 +309,25 @@ public void serialize(DataOutputStream stream) throws IOException { : ProcedureType.SET_TTL_PROCEDURE.getTypeCode()); super.serialize(stream); ReadWriteIOUtils.write(plan.serializeToByteBuffer(), stream); + stream.writeBoolean(previousTTLStateCaptured); + stream.writeLong(previousTTL); + stream.writeLong(previousDatabaseWildcardTTL); } @Override public void deserialize(ByteBuffer byteBuffer) { super.deserialize(byteBuffer); try { - ReadWriteIOUtils.readInt(byteBuffer); + final int length = ReadWriteIOUtils.readInt(byteBuffer); + final int position = byteBuffer.position(); this.plan = (SetTTLPlan) ConfigPhysicalPlan.Factory.create(byteBuffer); + // The serialized plan buffer may include padding; skip to the actual payload end. + byteBuffer.position(position + length); + if (byteBuffer.remaining() >= ROLLBACK_STATE_BYTES) { + this.previousTTLStateCaptured = byteBuffer.get() != 0; + this.previousTTL = byteBuffer.getLong(); + this.previousDatabaseWildcardTTL = byteBuffer.getLong(); + } } catch (IOException e) { LOGGER.error(ProcedureMessages.IO_ERROR_WHEN_DESERIALIZE_SETTTL_PLAN, e); } @@ -180,12 +341,21 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - return this.plan.equals(((SetTTLProcedure) o).plan) - && this.isGeneratedByPipe == (((SetTTLProcedure) o).isGeneratedByPipe); + final SetTTLProcedure that = (SetTTLProcedure) o; + return this.isGeneratedByPipe == that.isGeneratedByPipe + && this.previousTTL == that.previousTTL + && this.previousDatabaseWildcardTTL == that.previousDatabaseWildcardTTL + && this.previousTTLStateCaptured == that.previousTTLStateCaptured + && this.plan.equals(that.plan); } @Override public int hashCode() { - return Objects.hash(plan, isGeneratedByPipe); + return Objects.hash( + plan, + isGeneratedByPipe, + previousTTL, + previousDatabaseWildcardTTL, + previousTTLStateCaptured); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/SetTTLState.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/SetTTLState.java index fbdc026fc709b..4dd3063ea3f62 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/SetTTLState.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/SetTTLState.java @@ -19,6 +19,8 @@ package org.apache.iotdb.confignode.procedure.state.schema; public enum SetTTLState { + // Keep existing state ordinals stable for persisted procedures. SET_CONFIGNODE_TTL, - UPDATE_DATANODE_CACHE + UPDATE_DATANODE_CACHE, + CAPTURE_PREVIOUS_TTL } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TTLInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TTLInfoTest.java index 42a23d35cb9f4..e424671fb3a54 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TTLInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TTLInfoTest.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.schema.ttl.TTLCache; import org.apache.iotdb.confignode.consensus.request.read.ttl.ShowTTLPlan; import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan; import org.apache.iotdb.confignode.consensus.response.ttl.ShowTTLResp; @@ -50,6 +51,7 @@ public class TTLInfoTest { private final File snapshotDir = new File(BASE_OUTPUT_PATH, "ttlInfo-snapshot"); private final long ttl = 123435565323L; private long[] originTTLArr; + private int originTTlRuleCapacity; @Before public void setup() throws IOException { @@ -57,6 +59,7 @@ public void setup() throws IOException { snapshotDir.mkdirs(); } originTTLArr = CommonDescriptor.getInstance().getConfig().getTierTTLInMs(); + originTTlRuleCapacity = CommonDescriptor.getInstance().getConfig().getTTlRuleCapacity(); long[] ttlArr = new long[2]; ttlArr[0] = 10000000L; ttlArr[1] = ttl; @@ -70,6 +73,7 @@ public void tearDown() throws IOException { FileUtils.deleteDirectory(snapshotDir); } CommonDescriptor.getInstance().getConfig().setTierTTLInMs(originTTLArr); + CommonDescriptor.getInstance().getConfig().setTTlRuleCapacity(originTTlRuleCapacity); } @Test @@ -208,6 +212,17 @@ public void testSetAndUnsetTTL() throws IllegalPathException { Assert.assertEquals(4, ttlInfo.getTTLCount()); } + @Test + public void testGetTTLReturnsExactPathTTL() throws IllegalPathException { + PartialPath path = new PartialPath("root.test.db1.**"); + ttlInfo.setTTL(new SetTTLPlan(Arrays.asList(path.getNodes()), 121322323L)); + + Assert.assertEquals(121322323L, ttlInfo.getTTL(path.getNodes())); + Assert.assertEquals( + TTLCache.NULL_TTL, ttlInfo.getTTL(new PartialPath("root.test.db1").getNodes())); + Assert.assertEquals(Long.MAX_VALUE, ttlInfo.getTTL(new PartialPath("root.**").getNodes())); + } + @Test public void testUnsetNonExistTTL() throws IllegalPathException { assertEquals( @@ -241,10 +256,61 @@ public void testTooManyTTL() { final TSStatus status = ttlInfo.setTTL(setTTLPlan); assertEquals(TSStatusCode.OVERSIZE_TTL.getStatusCode(), status.code); assertEquals( - "The number of TTL rules has reached the limit (1000). Please delete some existing rules first.", + "The number of TTL rules has reached the limit " + + "(capacity: 1000, requested total: 1001). Please delete some existing rules first.", status.message); } + @Test + public void testUpdateExistingTTLWhenCapacityIsReached() { + CommonDescriptor.getInstance().getConfig().setTTlRuleCapacity(2); + + SetTTLPlan setTTLPlan = + new SetTTLPlan(PathNodesGenerator.splitPathToNodes("root.sg1.d1.**"), 1000); + assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), ttlInfo.setTTL(setTTLPlan).code); + assertEquals(2, ttlInfo.getTTLCount()); + + setTTLPlan = new SetTTLPlan(PathNodesGenerator.splitPathToNodes("root.sg1.d1.**"), 2000); + assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), ttlInfo.setTTL(setTTLPlan).code); + assertEquals(2, ttlInfo.getTTLCount()); + assertEquals( + Long.valueOf(2000), + ttlInfo.showTTL(new ShowTTLPlan()).getPathTTLMap().get("root.sg1.d1.**")); + } + + @Test + public void testUpdateExistingTTLWhenCurrentStateIsAlreadyOversize() { + SetTTLPlan setTTLPlan = + new SetTTLPlan(PathNodesGenerator.splitPathToNodes("root.sg1.d1.**"), 1000); + assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), ttlInfo.setTTL(setTTLPlan).code); + assertEquals(2, ttlInfo.getTTLCount()); + + CommonDescriptor.getInstance().getConfig().setTTlRuleCapacity(1); + + setTTLPlan = new SetTTLPlan(PathNodesGenerator.splitPathToNodes("root.sg1.d1.**"), 2000); + assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), ttlInfo.setTTL(setTTLPlan).code); + assertEquals(2, ttlInfo.getTTLCount()); + assertEquals( + Long.valueOf(2000), + ttlInfo.showTTL(new ShowTTLPlan()).getPathTTLMap().get("root.sg1.d1.**")); + } + + @Test + public void testDatabaseTTLShouldReserveTwoSlots() { + CommonDescriptor.getInstance().getConfig().setTTlRuleCapacity(2); + + SetTTLPlan setTTLPlan = new SetTTLPlan(PathNodesGenerator.splitPathToNodes("root.sg1"), 1000); + setTTLPlan.setDataBase(true); + + final TSStatus status = ttlInfo.setTTL(setTTLPlan); + assertEquals(TSStatusCode.OVERSIZE_TTL.getStatusCode(), status.code); + assertEquals(1, ttlInfo.getTTLCount()); + assertEquals(1, ttlInfo.showTTL(new ShowTTLPlan()).getPathTTLMap().size()); + assertEquals( + Long.valueOf(Long.MAX_VALUE), + ttlInfo.showTTL(new ShowTTLPlan()).getPathTTLMap().get("root.**")); + } + @Test public void testSnapshot() throws TException, IOException, IllegalPathException { // set ttl diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java index 5042eb1dd0f13..cb09c23659c39 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java @@ -19,19 +19,42 @@ package org.apache.iotdb.confignode.procedure.impl.schema; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TSetTTLReq; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.schema.ttl.TTLCache; +import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType; +import org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext; import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan; +import org.apache.iotdb.confignode.manager.ConfigManager; +import org.apache.iotdb.confignode.manager.TTLManager; +import org.apache.iotdb.confignode.manager.node.NodeManager; +import org.apache.iotdb.confignode.procedure.Procedure; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.confignode.procedure.state.ProcedureState; +import org.apache.iotdb.confignode.procedure.state.schema.SetTTLState; import org.apache.iotdb.confignode.procedure.store.ProcedureFactory; +import org.apache.iotdb.confignode.procedure.store.ProcedureType; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.utils.PublicBAOS; +import org.apache.tsfile.utils.ReadWriteIOUtils; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import java.io.DataOutputStream; import java.io.IOException; +import java.lang.reflect.Field; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; public class SetTTLProcedureTest { @@ -65,4 +88,308 @@ public void serializeDeserializeTest() throws IOException, IllegalPathException buffer.clear(); byteArrayOutputStream.reset(); } + + @Test + public void serializeDeserializeTestWithCapturedRollbackState() throws Exception { + final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); + + final SetTTLPlan setTTLPlan = + new SetTTLPlan(Arrays.asList(new PartialPath("root.db").getNodes()), 2000L); + setTTLPlan.setDataBase(true); + final TestingSetTTLProcedure procedure = new TestingSetTTLProcedure(setTTLPlan); + + final Map ttlMap = new HashMap<>(); + ttlMap.put("root.**", Long.MAX_VALUE); + ttlMap.put("root.db", 500L); + ttlMap.put("root.db.**", 600L); + + procedure.executeFromState(mockProcedureEnv(ttlMap), SetTTLState.CAPTURE_PREVIOUS_TTL); + + procedure.serialize(outputStream); + final ByteBuffer buffer = + ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + final SetTTLProcedure deserializedProcedure = + (SetTTLProcedure) ProcedureFactory.getInstance().create(buffer); + assertSerializedProcedure( + deserializedProcedure, "root.db", 2000L, true, true, 500L, 600L, false); + } + + @Test + public void deserializeOldFormatWithoutRollbackStateTest() throws Exception { + final SetTTLPlan setTTLPlan = + new SetTTLPlan(Arrays.asList(new PartialPath("root.db").getNodes()), 2000L); + setTTLPlan.setDataBase(true); + + final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); + writeOldFormatProcedure(outputStream, setTTLPlan); + + final ByteBuffer buffer = + ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + final SetTTLProcedure deserializedProcedure = + (SetTTLProcedure) ProcedureFactory.getInstance().create(buffer); + + assertSerializedProcedure( + deserializedProcedure, + "root.db", + 2000L, + true, + false, + Long.MIN_VALUE, + Long.MIN_VALUE, + false); + } + + @Test + public void setConfigNodeTTLShouldNotWriteBeforePreviousStateIsCaptured() throws Exception { + final SetTTLPlan setTTLPlan = + new SetTTLPlan(Arrays.asList(new PartialPath("root.db").getNodes()), 2000L); + setTTLPlan.setDataBase(true); + final TestingSetTTLProcedure procedure = new TestingSetTTLProcedure(setTTLPlan); + + final Map ttlMap = new HashMap<>(); + ttlMap.put("root.**", Long.MAX_VALUE); + ttlMap.put("root.db", 500L); + ttlMap.put("root.db.**", 600L); + + procedure.executeFromState(mockProcedureEnv(ttlMap), SetTTLState.SET_CONFIGNODE_TTL); + + Assert.assertTrue(procedure.getWrittenPlans().isEmpty()); + assertSerializedProcedure(procedure, "root.db", 2000L, true, true, 500L, 600L, false); + + procedure.executeFromState(mockProcedureEnv(ttlMap), SetTTLState.SET_CONFIGNODE_TTL); + + Assert.assertEquals(1, procedure.getWrittenPlans().size()); + assertPlan(procedure.getWrittenPlans().get(0), "root.db", 2000L, true); + } + + @Test + public void rollbackStateShouldUnsetNewTTLWhenPreviousStateDidNotExist() throws Exception { + final SetTTLPlan setTTLPlan = + new SetTTLPlan(Arrays.asList(new PartialPath("root.test.sg1.**").getNodes()), 1000L); + final TestingSetTTLProcedure procedure = new TestingSetTTLProcedure(setTTLPlan); + procedure.failFirstDataNodeUpdateForTest(); + + final ConfigNodeProcedureEnv env = + mockProcedureEnv(Collections.singletonMap("root.**", Long.MAX_VALUE)); + + procedure.executeFromState(env, SetTTLState.CAPTURE_PREVIOUS_TTL); + procedure.executeFromState(env, SetTTLState.SET_CONFIGNODE_TTL); + procedure.executeFromState(env, SetTTLState.UPDATE_DATANODE_CACHE); + Assert.assertTrue(procedure.isFailed()); + + procedure.rollbackState(env, SetTTLState.UPDATE_DATANODE_CACHE); + + Assert.assertEquals(2, procedure.getWrittenPlans().size()); + assertPlan(procedure.getWrittenPlans().get(0), "root.test.sg1.**", 1000L, false); + assertPlan(procedure.getWrittenPlans().get(1), "root.test.sg1.**", -1L, false); + + Assert.assertEquals(2, procedure.getRequests().size()); + assertRequest(procedure.getRequests().get(0), "root.test.sg1.**", 1000L, false); + assertRequest(procedure.getRequests().get(1), "root.test.sg1.**", -1L, false); + } + + @Test + public void rollbackStateShouldRestoreDatabaseWildcardTTLSeparately() throws Exception { + final SetTTLPlan setTTLPlan = + new SetTTLPlan(Arrays.asList(new PartialPath("root.db").getNodes()), 2000L); + setTTLPlan.setDataBase(true); + final TestingSetTTLProcedure procedure = new TestingSetTTLProcedure(setTTLPlan); + procedure.failFirstDataNodeUpdateForTest(); + + final Map ttlMap = new HashMap<>(); + ttlMap.put("root.**", Long.MAX_VALUE); + ttlMap.put("root.db", 500L); + ttlMap.put("root.db.**", 600L); + final ConfigNodeProcedureEnv env = mockProcedureEnv(ttlMap); + + procedure.executeFromState(env, SetTTLState.CAPTURE_PREVIOUS_TTL); + procedure.executeFromState(env, SetTTLState.SET_CONFIGNODE_TTL); + procedure.executeFromState(env, SetTTLState.UPDATE_DATANODE_CACHE); + Assert.assertTrue(procedure.isFailed()); + + procedure.rollbackState(env, SetTTLState.UPDATE_DATANODE_CACHE); + + Assert.assertEquals(3, procedure.getWrittenPlans().size()); + assertPlan(procedure.getWrittenPlans().get(0), "root.db", 2000L, true); + assertPlan(procedure.getWrittenPlans().get(1), "root.db", 500L, false); + assertPlan(procedure.getWrittenPlans().get(2), "root.db.**", 600L, false); + + Assert.assertEquals(3, procedure.getRequests().size()); + assertRequest(procedure.getRequests().get(0), "root.db", 2000L, true); + assertRequest(procedure.getRequests().get(1), "root.db", 500L, false); + assertRequest(procedure.getRequests().get(2), "root.db.**", 600L, false); + } + + private ConfigNodeProcedureEnv mockProcedureEnv(final Map ttlMap) { + final ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class); + final ConfigManager configManager = Mockito.mock(ConfigManager.class); + final TTLManager ttlManager = Mockito.mock(TTLManager.class); + final NodeManager nodeManager = Mockito.mock(NodeManager.class); + + final TDataNodeLocation dataNodeLocation = new TDataNodeLocation(); + dataNodeLocation.setDataNodeId(1); + + Mockito.when(env.getConfigManager()).thenReturn(configManager); + Mockito.when(configManager.getTTLManager()).thenReturn(ttlManager); + Mockito.when(ttlManager.getTTL(Mockito.any(String[].class))) + .thenAnswer( + invocation -> { + final String[] pathPattern = invocation.getArgument(0); + return ttlMap.getOrDefault(String.join(".", pathPattern), TTLCache.NULL_TTL); + }); + Mockito.when(configManager.getNodeManager()).thenReturn(nodeManager); + Mockito.when(nodeManager.getRegisteredDataNodeLocations()) + .thenReturn(Collections.singletonMap(1, dataNodeLocation)); + return env; + } + + private void assertPlan( + final SetTTLPlan plan, final String path, final long ttl, final boolean isDataBase) { + Assert.assertEquals(path, String.join(".", plan.getPathPattern())); + Assert.assertEquals(ttl, plan.getTTL()); + Assert.assertEquals(isDataBase, plan.isDataBase()); + } + + private void assertRequest( + final TSetTTLReq req, final String path, final long ttl, final boolean isDataBase) { + Assert.assertEquals(Collections.singletonList(path), req.getPathPattern()); + Assert.assertEquals(ttl, req.getTTL()); + Assert.assertEquals(isDataBase, req.isDataBase); + } + + private void writeOldFormatProcedure(final DataOutputStream stream, final SetTTLPlan plan) + throws IOException { + stream.writeShort(ProcedureType.SET_TTL_PROCEDURE.getTypeCode()); + // Procedure fields. + stream.writeLong(Procedure.NO_PROC_ID); + stream.writeInt(ProcedureState.INITIALIZING.ordinal()); + stream.writeLong(0L); + stream.writeLong(0L); + stream.writeLong(Procedure.NO_PROC_ID); + stream.writeLong(Procedure.NO_TIMEOUT); + stream.writeInt(-1); // no stack indexes + stream.write((byte) 0); // no exception + stream.writeInt(-1); // no result + stream.write((byte) 0); // no lock + // StateMachineProcedure fields. + stream.writeInt(0); // no states + ReadWriteIOUtils.write(plan.serializeToByteBuffer(), stream); + } + + private void assertSerializedProcedure( + final SetTTLProcedure procedure, + final String path, + final long ttl, + final boolean isDataBase, + final boolean previousTTLStateCaptured, + final long previousTTL, + final long previousDatabaseWildcardTTL, + final boolean isGeneratedByPipe) + throws Exception { + final Field planField = findField(SetTTLProcedure.class, "plan"); + planField.setAccessible(true); + assertPlan((SetTTLPlan) planField.get(procedure), path, ttl, isDataBase); + + final Field previousTTLStateCapturedField = + findField(SetTTLProcedure.class, "previousTTLStateCaptured"); + previousTTLStateCapturedField.setAccessible(true); + Assert.assertEquals(previousTTLStateCaptured, previousTTLStateCapturedField.get(procedure)); + + final Field previousTTLField = findField(SetTTLProcedure.class, "previousTTL"); + previousTTLField.setAccessible(true); + Assert.assertEquals(previousTTL, previousTTLField.get(procedure)); + + final Field previousDatabaseWildcardTTLField = + findField(SetTTLProcedure.class, "previousDatabaseWildcardTTL"); + previousDatabaseWildcardTTLField.setAccessible(true); + Assert.assertEquals( + previousDatabaseWildcardTTL, previousDatabaseWildcardTTLField.get(procedure)); + + final Field isGeneratedByPipeField = findField(SetTTLProcedure.class, "isGeneratedByPipe"); + isGeneratedByPipeField.setAccessible(true); + Assert.assertEquals(isGeneratedByPipe, isGeneratedByPipeField.get(procedure)); + } + + private Field findField(final Class clazz, final String fieldName) + throws NoSuchFieldException { + Class current = clazz; + while (current != null) { + try { + return current.getDeclaredField(fieldName); + } catch (NoSuchFieldException e) { + current = current.getSuperclass(); + } + } + throw new NoSuchFieldException(fieldName); + } + + private static class TestingSetTTLProcedure extends SetTTLProcedure { + + private final List requests = new ArrayList<>(); + private final List writtenPlans = new ArrayList<>(); + private boolean failFirstDataNodeUpdate = false; + private int requestCount = 0; + + private TestingSetTTLProcedure(final SetTTLPlan plan) { + super(plan, false); + } + + private void failFirstDataNodeUpdateForTest() { + failFirstDataNodeUpdate = true; + } + + private List getRequests() { + return requests; + } + + private List getWrittenPlans() { + return writtenPlans; + } + + @Override + TSStatus writeConfigNodePlan(final ConfigNodeProcedureEnv env, final SetTTLPlan setTTLPlan) { + writtenPlans.add(copyPlan(setTTLPlan)); + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } + + @Override + DataNodeAsyncRequestContext sendTTLRequest( + final Map dataNodeLocationMap, final TSetTTLReq req) { + requests.add(copyRequest(req)); + + final DataNodeAsyncRequestContext clientHandler = + new DataNodeAsyncRequestContext<>( + CnToDnAsyncRequestType.SET_TTL, copyRequest(req), dataNodeLocationMap); + final List requestIds = new ArrayList<>(clientHandler.getNodeLocationMap().keySet()); + final boolean shouldFail = failFirstDataNodeUpdate && requestCount++ == 0; + + for (Integer requestId : requestIds) { + clientHandler + .getResponseMap() + .put( + requestId, + new TSStatus( + shouldFail + ? TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode() + : TSStatusCode.SUCCESS_STATUS.getStatusCode())); + if (!shouldFail) { + clientHandler.getNodeLocationMap().remove(requestId); + } + } + return clientHandler; + } + + private SetTTLPlan copyPlan(final SetTTLPlan plan) { + final SetTTLPlan copiedPlan = + new SetTTLPlan(Arrays.asList(plan.getPathPattern()), plan.getTTL()); + copiedPlan.setDataBase(plan.isDataBase()); + return copiedPlan; + } + + private TSetTTLReq copyRequest(final TSetTTLReq req) { + return new TSetTTLReq(new ArrayList<>(req.getPathPattern()), req.getTTL(), req.isDataBase); + } + } }