From 9e4dd5d9c9bba7a5736710d094a386da01bbd64d Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 21 May 2026 11:41:28 +0800 Subject: [PATCH 1/7] fix --- .../iotdb/confignode/persistence/TTLInfo.java | 17 +- .../impl/schema/SetTTLProcedure.java | 223 +++++++++++++++--- .../confignode/persistence/TTLInfoTest.java | 53 +++++ .../impl/schema/SetTTLProcedureTest.java | 199 ++++++++++++++++ 4 files changed, 455 insertions(+), 37 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java index 7b98ebba50b87..d40301b0c56cf 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java @@ -70,7 +70,8 @@ public TSStatus setTTL(SetTTLPlan plan) { try { // check ttl rule capacity final int tTlRuleCapacity = CommonDescriptor.getInstance().getConfig().getTTlRuleCapacity(); - if (getTTLCount() >= tTlRuleCapacity) { + final int newTTLRuleCount = calculateNewTTLRuleCount(plan); + if (newTTLRuleCount > 0 && ttlCache.getTtlCount() + newTTLRuleCount > tTlRuleCapacity) { TSStatus errorStatus = new TSStatus(TSStatusCode.OVERSIZE_TTL.getStatusCode()); errorStatus.setMessage( String.format( @@ -92,6 +93,20 @@ public TSStatus setTTL(SetTTLPlan plan) { return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } + private int calculateNewTTLRuleCount(SetTTLPlan plan) { + int newTTLRuleCount = getNewTTLRuleCount(plan.getPathPattern()); + if (plan.isDataBase()) { + String[] pathNodes = Arrays.copyOf(plan.getPathPattern(), plan.getPathPattern().length + 1); + pathNodes[pathNodes.length - 1] = IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD; + newTTLRuleCount += getNewTTLRuleCount(pathNodes); + } + return newTTLRuleCount; + } + + private int getNewTTLRuleCount(String[] pathNodes) { + return ttlCache.getLastNodeTTL(pathNodes) == TTLCache.NULL_TTL ? 1 : 0; + } + /** Only used for upgrading from database level ttl to device level ttl. */ public void setTTL(Map databaseTTLMap) throws IllegalPathException { lock.writeLock().lock(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java index b90f2df87d5c3..b4031ec133872 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java @@ -22,8 +22,10 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSetTTLReq; +import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.schema.ttl.TTLCache; import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType; import org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager; import org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext; @@ -47,14 +49,19 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Collections; import java.util.Map; import java.util.Objects; public class SetTTLProcedure extends StateMachineProcedure { private static final Logger LOGGER = LoggerFactory.getLogger(SetTTLProcedure.class); + private static final long TTL_NOT_EXIST = Long.MIN_VALUE; private SetTTLPlan plan; + private long previousTTL = TTL_NOT_EXIST; + private long previousDatabaseWildcardTTL = TTL_NOT_EXIST; + private boolean previousTTLStateCaptured = false; public SetTTLProcedure(final boolean isGeneratedByPipe) { super(isGeneratedByPipe); @@ -86,18 +93,9 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, SetTTLState state) } } - private void setConfigNodeTTL(ConfigNodeProcedureEnv env) { - TSStatus res; - try { - res = - env.getConfigManager() - .getConsensusManager() - .write(isGeneratedByPipe ? new PipeEnrichedPlan(this.plan) : this.plan); - } catch (ConsensusException e) { - LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE, e); - res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); - res.setMessage(e.getMessage()); - } + protected void setConfigNodeTTL(final ConfigNodeProcedureEnv env) { + capturePreviousTTLState(env); + final TSStatus res = writeConfigNodePlan(env, plan); if (res.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.info(ProcedureMessages.FAILED_TO_EXECUTE_PLAN_BECAUSE, plan, res.message); setFailure(new ProcedureException(new IoTDBException(res))); @@ -106,35 +104,171 @@ private void setConfigNodeTTL(ConfigNodeProcedureEnv env) { } } - private void updateDataNodeTTL(ConfigNodeProcedureEnv env) { - Map dataNodeLocationMap = + protected void updateDataNodeTTL(final ConfigNodeProcedureEnv env) { + final Map dataNodeLocationMap = env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations(); - DataNodeAsyncRequestContext clientHandler = - new DataNodeAsyncRequestContext<>( - CnToDnAsyncRequestType.SET_TTL, - new TSetTTLReq( - Collections.singletonList(String.join(".", plan.getPathPattern())), - plan.getTTL(), - plan.isDataBase()), - dataNodeLocationMap); + final DataNodeAsyncRequestContext clientHandler = + sendTTLRequest( + dataNodeLocationMap, + buildSetTTLReq(plan.getPathPattern(), plan.getTTL(), plan.isDataBase())); + if (hasFailedDataNode(clientHandler)) { + LOGGER.error("Failed to update ttl cache of dataNode."); + setFailure( + new ProcedureException( + new MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED))); + } + } + + private void capturePreviousTTLState(final ConfigNodeProcedureEnv env) { + if (previousTTLStateCaptured) { + return; + } + final Map ttlMap = env.getConfigManager().getTTLManager().getAllTTL(); + previousTTL = getTTLOrDefault(ttlMap, plan.getPathPattern()); + if (plan.isDataBase()) { + previousDatabaseWildcardTTL = + getTTLOrDefault(ttlMap, getDatabaseWildcardPathPattern(plan.getPathPattern())); + } + previousTTLStateCaptured = true; + } + + protected TSStatus writeConfigNodePlan( + final ConfigNodeProcedureEnv env, final SetTTLPlan setTTLPlan) { + try { + return env.getConfigManager() + .getConsensusManager() + .write(isGeneratedByPipe ? new PipeEnrichedPlan(setTTLPlan) : setTTLPlan); + } catch (ConsensusException e) { + LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE, e); + final TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + return res; + } + } + + protected DataNodeAsyncRequestContext sendTTLRequest( + final Map dataNodeLocationMap, final TSetTTLReq req) { + final DataNodeAsyncRequestContext clientHandler = + new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.SET_TTL, req, dataNodeLocationMap); CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler); - Map statusMap = clientHandler.getResponseMap(); - for (TSStatus status : statusMap.values()) { - // all dataNodes must clear the related schemaengine cache + return clientHandler; + } + + private TSetTTLReq buildSetTTLReq( + final String[] pathPattern, final long ttl, final boolean isDataBase) { + return new TSetTTLReq( + Collections.singletonList(String.join(".", pathPattern)), ttl, isDataBase); + } + + private boolean hasFailedDataNode( + final DataNodeAsyncRequestContext clientHandler) { + if (!clientHandler.getRequestIndices().isEmpty()) { + return true; + } + for (TSStatus status : clientHandler.getResponseMap().values()) { if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE); - setFailure( - new ProcedureException( - new MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED))); - return; + return true; } } + return false; + } + + private long getTTLOrDefault(final Map ttlMap, final String[] pathPattern) { + return ttlMap.getOrDefault(String.join(".", pathPattern), TTL_NOT_EXIST); + } + + private String[] getDatabaseWildcardPathPattern(final String[] pathPattern) { + final String[] pathNodes = Arrays.copyOf(pathPattern, pathPattern.length + 1); + pathNodes[pathNodes.length - 1] = IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD; + return pathNodes; + } + + private void rollbackConfigNodeTTL(final ConfigNodeProcedureEnv env) throws ProcedureException { + restoreTTLOnConfigNode(env, plan.getPathPattern(), previousTTL); + if (plan.isDataBase()) { + restoreTTLOnConfigNode( + env, getDatabaseWildcardPathPattern(plan.getPathPattern()), previousDatabaseWildcardTTL); + } + } + + private void restoreTTLOnConfigNode( + final ConfigNodeProcedureEnv env, final String[] pathPattern, final long ttl) + throws ProcedureException { + final SetTTLPlan rollbackPlan = + new SetTTLPlan(pathPattern, ttl == TTL_NOT_EXIST ? TTLCache.NULL_TTL : ttl); + rollbackPlan.setDataBase(false); + final TSStatus status = writeConfigNodePlan(env, rollbackPlan); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new ProcedureException( + new MetadataException( + "Rollback ConfigNode ttl failed for " + + String.join(".", pathPattern) + + ": " + + status.getMessage())); + } + } + + private void rollbackDataNodeTTL(final ConfigNodeProcedureEnv env) throws ProcedureException { + final Map dataNodeLocationMap = + env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations(); + restoreTTLOnDataNodes(dataNodeLocationMap, plan.getPathPattern(), previousTTL); + if (plan.isDataBase()) { + restoreTTLOnDataNodes( + dataNodeLocationMap, + getDatabaseWildcardPathPattern(plan.getPathPattern()), + previousDatabaseWildcardTTL); + } + } + + private void restoreTTLOnDataNodes( + final Map dataNodeLocationMap, + final String[] pathPattern, + final long ttl) + throws ProcedureException { + if (dataNodeLocationMap.isEmpty()) { + return; + } + final DataNodeAsyncRequestContext clientHandler = + sendTTLRequest( + dataNodeLocationMap, + buildSetTTLReq(pathPattern, ttl == TTL_NOT_EXIST ? TTLCache.NULL_TTL : ttl, false)); + if (hasFailedDataNode(clientHandler)) { + throw new ProcedureException( + new MetadataException( + "Rollback dataNode ttl cache failed for " + String.join(".", pathPattern))); + } } @Override - protected void rollbackState( - ConfigNodeProcedureEnv configNodeProcedureEnv, SetTTLState setTTLState) - throws IOException, InterruptedException, ProcedureException {} + protected void rollbackState(final ConfigNodeProcedureEnv env, final SetTTLState setTTLState) + throws IOException, InterruptedException, ProcedureException { + if (setTTLState != SetTTLState.UPDATE_DATANODE_CACHE || !previousTTLStateCaptured) { + return; + } + ProcedureException rollbackFailure = null; + try { + rollbackConfigNodeTTL(env); + } catch (ProcedureException e) { + LOGGER.error("Failed to rollback ConfigNode ttl state.", e); + rollbackFailure = e; + } + try { + rollbackDataNodeTTL(env); + } catch (ProcedureException e) { + LOGGER.error("Failed to rollback DataNode ttl cache.", e); + if (rollbackFailure == null) { + rollbackFailure = e; + } + } + if (rollbackFailure != null) { + throw rollbackFailure; + } + } + + @Override + protected boolean isRollbackSupported(final SetTTLState state) { + return state == SetTTLState.UPDATE_DATANODE_CACHE; + } @Override protected SetTTLState getState(int stateId) { @@ -159,6 +293,9 @@ public void serialize(DataOutputStream stream) throws IOException { : ProcedureType.SET_TTL_PROCEDURE.getTypeCode()); super.serialize(stream); ReadWriteIOUtils.write(plan.serializeToByteBuffer(), stream); + stream.writeBoolean(previousTTLStateCaptured); + stream.writeLong(previousTTL); + stream.writeLong(previousDatabaseWildcardTTL); } @Override @@ -167,6 +304,11 @@ public void deserialize(ByteBuffer byteBuffer) { try { ReadWriteIOUtils.readInt(byteBuffer); this.plan = (SetTTLPlan) ConfigPhysicalPlan.Factory.create(byteBuffer); + if (byteBuffer.remaining() >= 17) { + this.previousTTLStateCaptured = byteBuffer.get() != 0; + this.previousTTL = byteBuffer.getLong(); + this.previousDatabaseWildcardTTL = byteBuffer.getLong(); + } } catch (IOException e) { LOGGER.error(ProcedureMessages.IO_ERROR_WHEN_DESERIALIZE_SETTTL_PLAN, e); } @@ -180,12 +322,21 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - return this.plan.equals(((SetTTLProcedure) o).plan) - && this.isGeneratedByPipe == (((SetTTLProcedure) o).isGeneratedByPipe); + final SetTTLProcedure that = (SetTTLProcedure) o; + return this.isGeneratedByPipe == that.isGeneratedByPipe + && this.previousTTL == that.previousTTL + && this.previousDatabaseWildcardTTL == that.previousDatabaseWildcardTTL + && this.previousTTLStateCaptured == that.previousTTLStateCaptured + && this.plan.equals(that.plan); } @Override public int hashCode() { - return Objects.hash(plan, isGeneratedByPipe); + return Objects.hash( + plan, + isGeneratedByPipe, + previousTTL, + previousDatabaseWildcardTTL, + previousTTLStateCaptured); } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TTLInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TTLInfoTest.java index 42a23d35cb9f4..bdb02a2d5a5ff 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TTLInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TTLInfoTest.java @@ -50,6 +50,7 @@ public class TTLInfoTest { private final File snapshotDir = new File(BASE_OUTPUT_PATH, "ttlInfo-snapshot"); private final long ttl = 123435565323L; private long[] originTTLArr; + private int originTTlRuleCapacity; @Before public void setup() throws IOException { @@ -57,6 +58,7 @@ public void setup() throws IOException { snapshotDir.mkdirs(); } originTTLArr = CommonDescriptor.getInstance().getConfig().getTierTTLInMs(); + originTTlRuleCapacity = CommonDescriptor.getInstance().getConfig().getTTlRuleCapacity(); long[] ttlArr = new long[2]; ttlArr[0] = 10000000L; ttlArr[1] = ttl; @@ -70,6 +72,7 @@ public void tearDown() throws IOException { FileUtils.deleteDirectory(snapshotDir); } CommonDescriptor.getInstance().getConfig().setTierTTLInMs(originTTLArr); + CommonDescriptor.getInstance().getConfig().setTTlRuleCapacity(originTTlRuleCapacity); } @Test @@ -245,6 +248,56 @@ public void testTooManyTTL() { status.message); } + @Test + public void testUpdateExistingTTLWhenCapacityIsReached() { + CommonDescriptor.getInstance().getConfig().setTTlRuleCapacity(2); + + SetTTLPlan setTTLPlan = + new SetTTLPlan(PathNodesGenerator.splitPathToNodes("root.sg1.d1.**"), 1000); + assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), ttlInfo.setTTL(setTTLPlan).code); + assertEquals(2, ttlInfo.getTTLCount()); + + setTTLPlan = new SetTTLPlan(PathNodesGenerator.splitPathToNodes("root.sg1.d1.**"), 2000); + assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), ttlInfo.setTTL(setTTLPlan).code); + assertEquals(2, ttlInfo.getTTLCount()); + assertEquals( + Long.valueOf(2000), + ttlInfo.showTTL(new ShowTTLPlan()).getPathTTLMap().get("root.sg1.d1.**")); + } + + @Test + public void testUpdateExistingTTLWhenCurrentStateIsAlreadyOversize() { + SetTTLPlan setTTLPlan = + new SetTTLPlan(PathNodesGenerator.splitPathToNodes("root.sg1.d1.**"), 1000); + assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), ttlInfo.setTTL(setTTLPlan).code); + assertEquals(2, ttlInfo.getTTLCount()); + + CommonDescriptor.getInstance().getConfig().setTTlRuleCapacity(1); + + setTTLPlan = new SetTTLPlan(PathNodesGenerator.splitPathToNodes("root.sg1.d1.**"), 2000); + assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), ttlInfo.setTTL(setTTLPlan).code); + assertEquals(2, ttlInfo.getTTLCount()); + assertEquals( + Long.valueOf(2000), + ttlInfo.showTTL(new ShowTTLPlan()).getPathTTLMap().get("root.sg1.d1.**")); + } + + @Test + public void testDatabaseTTLShouldReserveTwoSlots() { + CommonDescriptor.getInstance().getConfig().setTTlRuleCapacity(2); + + SetTTLPlan setTTLPlan = new SetTTLPlan(PathNodesGenerator.splitPathToNodes("root.sg1"), 1000); + setTTLPlan.setDataBase(true); + + final TSStatus status = ttlInfo.setTTL(setTTLPlan); + assertEquals(TSStatusCode.OVERSIZE_TTL.getStatusCode(), status.code); + assertEquals(1, ttlInfo.getTTLCount()); + assertEquals(1, ttlInfo.showTTL(new ShowTTLPlan()).getPathTTLMap().size()); + assertEquals( + Long.valueOf(Long.MAX_VALUE), + ttlInfo.showTTL(new ShowTTLPlan()).getPathTTLMap().get("root.**")); + } + @Test public void testSnapshot() throws TException, IOException, IllegalPathException { // set ttl diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java index 5042eb1dd0f13..22c6e160554bb 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java @@ -19,19 +19,36 @@ package org.apache.iotdb.confignode.procedure.impl.schema; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TSetTTLReq; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType; +import org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext; import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan; +import org.apache.iotdb.confignode.manager.ConfigManager; +import org.apache.iotdb.confignode.manager.TTLManager; +import org.apache.iotdb.confignode.manager.node.NodeManager; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.confignode.procedure.state.schema.SetTTLState; import org.apache.iotdb.confignode.procedure.store.ProcedureFactory; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.utils.PublicBAOS; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; public class SetTTLProcedureTest { @@ -65,4 +82,186 @@ public void serializeDeserializeTest() throws IOException, IllegalPathException buffer.clear(); byteArrayOutputStream.reset(); } + + @Test + public void serializeDeserializeTestWithCapturedRollbackState() throws Exception { + final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); + + final SetTTLPlan setTTLPlan = + new SetTTLPlan(Arrays.asList(new PartialPath("root.db").getNodes()), 2000L); + setTTLPlan.setDataBase(true); + final TestingSetTTLProcedure procedure = new TestingSetTTLProcedure(setTTLPlan); + + final Map ttlMap = new HashMap<>(); + ttlMap.put("root.**", Long.MAX_VALUE); + ttlMap.put("root.db", 500L); + ttlMap.put("root.db.**", 600L); + + procedure.executeFromState(mockProcedureEnv(ttlMap), SetTTLState.SET_CONFIGNODE_TTL); + + procedure.serialize(outputStream); + final ByteBuffer buffer = + ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + final SetTTLProcedure deserializedProcedure = + (SetTTLProcedure) ProcedureFactory.getInstance().create(buffer); + Assert.assertTrue(procedure.equals(deserializedProcedure)); + } + + @Test + public void rollbackStateShouldUnsetNewTTLWhenPreviousStateDidNotExist() throws Exception { + final SetTTLPlan setTTLPlan = + new SetTTLPlan(Arrays.asList(new PartialPath("root.test.sg1.**").getNodes()), 1000L); + final TestingSetTTLProcedure procedure = new TestingSetTTLProcedure(setTTLPlan); + procedure.failFirstDataNodeUpdateForTest(); + + final ConfigNodeProcedureEnv env = + mockProcedureEnv(Collections.singletonMap("root.**", Long.MAX_VALUE)); + + procedure.executeFromState(env, SetTTLState.SET_CONFIGNODE_TTL); + procedure.executeFromState(env, SetTTLState.UPDATE_DATANODE_CACHE); + Assert.assertTrue(procedure.isFailed()); + + procedure.rollbackState(env, SetTTLState.UPDATE_DATANODE_CACHE); + + Assert.assertEquals(2, procedure.getWrittenPlans().size()); + assertPlan(procedure.getWrittenPlans().get(0), "root.test.sg1.**", 1000L, false); + assertPlan(procedure.getWrittenPlans().get(1), "root.test.sg1.**", -1L, false); + + Assert.assertEquals(2, procedure.getRequests().size()); + assertRequest(procedure.getRequests().get(0), "root.test.sg1.**", 1000L, false); + assertRequest(procedure.getRequests().get(1), "root.test.sg1.**", -1L, false); + } + + @Test + public void rollbackStateShouldRestoreDatabaseWildcardTTLSeparately() throws Exception { + final SetTTLPlan setTTLPlan = + new SetTTLPlan(Arrays.asList(new PartialPath("root.db").getNodes()), 2000L); + setTTLPlan.setDataBase(true); + final TestingSetTTLProcedure procedure = new TestingSetTTLProcedure(setTTLPlan); + procedure.failFirstDataNodeUpdateForTest(); + + final Map ttlMap = new HashMap<>(); + ttlMap.put("root.**", Long.MAX_VALUE); + ttlMap.put("root.db", 500L); + ttlMap.put("root.db.**", 600L); + final ConfigNodeProcedureEnv env = mockProcedureEnv(ttlMap); + + procedure.executeFromState(env, SetTTLState.SET_CONFIGNODE_TTL); + procedure.executeFromState(env, SetTTLState.UPDATE_DATANODE_CACHE); + Assert.assertTrue(procedure.isFailed()); + + procedure.rollbackState(env, SetTTLState.UPDATE_DATANODE_CACHE); + + Assert.assertEquals(3, procedure.getWrittenPlans().size()); + assertPlan(procedure.getWrittenPlans().get(0), "root.db", 2000L, true); + assertPlan(procedure.getWrittenPlans().get(1), "root.db", 500L, false); + assertPlan(procedure.getWrittenPlans().get(2), "root.db.**", 600L, false); + + Assert.assertEquals(3, procedure.getRequests().size()); + assertRequest(procedure.getRequests().get(0), "root.db", 2000L, true); + assertRequest(procedure.getRequests().get(1), "root.db", 500L, false); + assertRequest(procedure.getRequests().get(2), "root.db.**", 600L, false); + } + + private ConfigNodeProcedureEnv mockProcedureEnv(final Map ttlMap) { + final ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class); + final ConfigManager configManager = Mockito.mock(ConfigManager.class); + final TTLManager ttlManager = Mockito.mock(TTLManager.class); + final NodeManager nodeManager = Mockito.mock(NodeManager.class); + + final TDataNodeLocation dataNodeLocation = new TDataNodeLocation(); + dataNodeLocation.setDataNodeId(1); + + Mockito.when(env.getConfigManager()).thenReturn(configManager); + Mockito.when(configManager.getTTLManager()).thenReturn(ttlManager); + Mockito.when(ttlManager.getAllTTL()).thenReturn(ttlMap); + Mockito.when(configManager.getNodeManager()).thenReturn(nodeManager); + Mockito.when(nodeManager.getRegisteredDataNodeLocations()) + .thenReturn(Collections.singletonMap(1, dataNodeLocation)); + return env; + } + + private void assertPlan( + final SetTTLPlan plan, final String path, final long ttl, final boolean isDataBase) { + Assert.assertEquals(path, String.join(".", plan.getPathPattern())); + Assert.assertEquals(ttl, plan.getTTL()); + Assert.assertEquals(isDataBase, plan.isDataBase()); + } + + private void assertRequest( + final TSetTTLReq req, final String path, final long ttl, final boolean isDataBase) { + Assert.assertEquals(Collections.singletonList(path), req.getPathPattern()); + Assert.assertEquals(ttl, req.getTTL()); + Assert.assertEquals(isDataBase, req.isDataBase); + } + + private static class TestingSetTTLProcedure extends SetTTLProcedure { + + private final List requests = new ArrayList<>(); + private final List writtenPlans = new ArrayList<>(); + private boolean failFirstDataNodeUpdate = false; + private int requestCount = 0; + + private TestingSetTTLProcedure(final SetTTLPlan plan) { + super(plan, false); + } + + private void failFirstDataNodeUpdateForTest() { + failFirstDataNodeUpdate = true; + } + + private List getRequests() { + return requests; + } + + private List getWrittenPlans() { + return writtenPlans; + } + + @Override + protected TSStatus writeConfigNodePlan( + final ConfigNodeProcedureEnv env, final SetTTLPlan setTTLPlan) { + writtenPlans.add(copyPlan(setTTLPlan)); + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } + + @Override + protected DataNodeAsyncRequestContext sendTTLRequest( + final Map dataNodeLocationMap, final TSetTTLReq req) { + requests.add(copyRequest(req)); + + final DataNodeAsyncRequestContext clientHandler = + new DataNodeAsyncRequestContext<>( + CnToDnAsyncRequestType.SET_TTL, copyRequest(req), dataNodeLocationMap); + final List requestIds = new ArrayList<>(clientHandler.getNodeLocationMap().keySet()); + final boolean shouldFail = failFirstDataNodeUpdate && requestCount++ == 0; + + for (Integer requestId : requestIds) { + clientHandler + .getResponseMap() + .put( + requestId, + new TSStatus( + shouldFail + ? TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode() + : TSStatusCode.SUCCESS_STATUS.getStatusCode())); + if (!shouldFail) { + clientHandler.getNodeLocationMap().remove(requestId); + } + } + return clientHandler; + } + + private SetTTLPlan copyPlan(final SetTTLPlan plan) { + final SetTTLPlan copiedPlan = + new SetTTLPlan(Arrays.asList(plan.getPathPattern()), plan.getTTL()); + copiedPlan.setDataBase(plan.isDataBase()); + return copiedPlan; + } + + private TSetTTLReq copyRequest(final TSetTTLReq req) { + return new TSetTTLReq(new ArrayList<>(req.getPathPattern()), req.getTTL(), req.isDataBase); + } + } } From 05e9f835a94ef14dea192d2448c0c051485d889c Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 21 May 2026 11:41:54 +0800 Subject: [PATCH 2/7] Update SetTTLProcedure.java --- .../iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java index b4031ec133872..b2b9ffb4d72e2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java @@ -112,7 +112,7 @@ protected void updateDataNodeTTL(final ConfigNodeProcedureEnv env) { dataNodeLocationMap, buildSetTTLReq(plan.getPathPattern(), plan.getTTL(), plan.isDataBase())); if (hasFailedDataNode(clientHandler)) { - LOGGER.error("Failed to update ttl cache of dataNode."); + LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE); setFailure( new ProcedureException( new MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED))); From 629f3193afde739fa3c5d72d0ae38015a21cadd7 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 21 May 2026 12:13:10 +0800 Subject: [PATCH 3/7] fix --- .../impl/schema/SetTTLProcedure.java | 4 +- .../impl/schema/SetTTLProcedureTest.java | 51 ++++++++++++++++++- 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java index b2b9ffb4d72e2..4d1ed2cc14003 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java @@ -302,8 +302,10 @@ public void serialize(DataOutputStream stream) throws IOException { public void deserialize(ByteBuffer byteBuffer) { super.deserialize(byteBuffer); try { - ReadWriteIOUtils.readInt(byteBuffer); + final int length = ReadWriteIOUtils.readInt(byteBuffer); + final int position = byteBuffer.position(); this.plan = (SetTTLPlan) ConfigPhysicalPlan.Factory.create(byteBuffer); + byteBuffer.position(position + length); if (byteBuffer.remaining() >= 17) { this.previousTTLStateCaptured = byteBuffer.get() != 0; this.previousTTL = byteBuffer.getLong(); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java index 22c6e160554bb..06c84729c6757 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java @@ -42,6 +42,7 @@ import java.io.DataOutputStream; import java.io.IOException; +import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -105,7 +106,8 @@ public void serializeDeserializeTestWithCapturedRollbackState() throws Exception ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); final SetTTLProcedure deserializedProcedure = (SetTTLProcedure) ProcedureFactory.getInstance().create(buffer); - Assert.assertTrue(procedure.equals(deserializedProcedure)); + assertSerializedProcedure( + deserializedProcedure, "root.db", 2000L, true, true, 500L, 600L, false); } @Test @@ -196,6 +198,53 @@ private void assertRequest( Assert.assertEquals(isDataBase, req.isDataBase); } + private void assertSerializedProcedure( + final SetTTLProcedure procedure, + final String path, + final long ttl, + final boolean isDataBase, + final boolean previousTTLStateCaptured, + final long previousTTL, + final long previousDatabaseWildcardTTL, + final boolean isGeneratedByPipe) + throws Exception { + final Field planField = findField(SetTTLProcedure.class, "plan"); + planField.setAccessible(true); + assertPlan((SetTTLPlan) planField.get(procedure), path, ttl, isDataBase); + + final Field previousTTLStateCapturedField = + findField(SetTTLProcedure.class, "previousTTLStateCaptured"); + previousTTLStateCapturedField.setAccessible(true); + Assert.assertEquals(previousTTLStateCaptured, previousTTLStateCapturedField.get(procedure)); + + final Field previousTTLField = findField(SetTTLProcedure.class, "previousTTL"); + previousTTLField.setAccessible(true); + Assert.assertEquals(previousTTL, previousTTLField.get(procedure)); + + final Field previousDatabaseWildcardTTLField = + findField(SetTTLProcedure.class, "previousDatabaseWildcardTTL"); + previousDatabaseWildcardTTLField.setAccessible(true); + Assert.assertEquals( + previousDatabaseWildcardTTL, previousDatabaseWildcardTTLField.get(procedure)); + + final Field isGeneratedByPipeField = findField(SetTTLProcedure.class, "isGeneratedByPipe"); + isGeneratedByPipeField.setAccessible(true); + Assert.assertEquals(isGeneratedByPipe, isGeneratedByPipeField.get(procedure)); + } + + private Field findField(final Class clazz, final String fieldName) + throws NoSuchFieldException { + Class current = clazz; + while (current != null) { + try { + return current.getDeclaredField(fieldName); + } catch (NoSuchFieldException e) { + current = current.getSuperclass(); + } + } + throw new NoSuchFieldException(fieldName); + } + private static class TestingSetTTLProcedure extends SetTTLProcedure { private final List requests = new ArrayList<>(); From 59c9b9deba1f9b039642347facbf3d84efa3f2ad Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 22 May 2026 10:52:28 +0800 Subject: [PATCH 4/7] ttlgras --- .../iotdb/confignode/persistence/TTLInfo.java | 17 +++++++++-------- .../procedure/impl/schema/SetTTLProcedure.java | 1 + .../confignode/persistence/TTLInfoTest.java | 3 ++- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java index d40301b0c56cf..f416fd5f82ed8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java @@ -71,13 +71,14 @@ public TSStatus setTTL(SetTTLPlan plan) { // check ttl rule capacity final int tTlRuleCapacity = CommonDescriptor.getInstance().getConfig().getTTlRuleCapacity(); final int newTTLRuleCount = calculateNewTTLRuleCount(plan); - if (newTTLRuleCount > 0 && ttlCache.getTtlCount() + newTTLRuleCount > tTlRuleCapacity) { + final int requestedTTLRuleCount = ttlCache.getTtlCount() + newTTLRuleCount; + if (newTTLRuleCount > 0 && requestedTTLRuleCount > tTlRuleCapacity) { TSStatus errorStatus = new TSStatus(TSStatusCode.OVERSIZE_TTL.getStatusCode()); errorStatus.setMessage( String.format( - "The number of TTL rules has reached the limit (%d). Please delete " - + "some existing rules first.", - tTlRuleCapacity)); + "The number of TTL rules has reached the limit " + + "(capacity: %d, requested total: %d). Please delete some existing rules first.", + tTlRuleCapacity, requestedTTLRuleCount)); return errorStatus; } ttlCache.setTTL(plan.getPathPattern(), plan.getTTL()); @@ -94,17 +95,17 @@ public TSStatus setTTL(SetTTLPlan plan) { } private int calculateNewTTLRuleCount(SetTTLPlan plan) { - int newTTLRuleCount = getNewTTLRuleCount(plan.getPathPattern()); + int newTTLRuleCount = isNewTTLRule(plan.getPathPattern()) ? 1 : 0; if (plan.isDataBase()) { String[] pathNodes = Arrays.copyOf(plan.getPathPattern(), plan.getPathPattern().length + 1); pathNodes[pathNodes.length - 1] = IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD; - newTTLRuleCount += getNewTTLRuleCount(pathNodes); + newTTLRuleCount += isNewTTLRule(pathNodes) ? 1 : 0; } return newTTLRuleCount; } - private int getNewTTLRuleCount(String[] pathNodes) { - return ttlCache.getLastNodeTTL(pathNodes) == TTLCache.NULL_TTL ? 1 : 0; + private boolean isNewTTLRule(String[] pathNodes) { + return ttlCache.getLastNodeTTL(pathNodes) == TTLCache.NULL_TTL; } /** Only used for upgrading from database level ttl to device level ttl. */ diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java index 4d1ed2cc14003..1491372bc9b22 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java @@ -56,6 +56,7 @@ public class SetTTLProcedure extends StateMachineProcedure { private static final Logger LOGGER = LoggerFactory.getLogger(SetTTLProcedure.class); + // Distinguishes no previous TTL from TTLCache.NULL_TTL, the explicit unset marker for rollback. private static final long TTL_NOT_EXIST = Long.MIN_VALUE; private SetTTLPlan plan; diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TTLInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TTLInfoTest.java index bdb02a2d5a5ff..e959b88dab246 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TTLInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TTLInfoTest.java @@ -244,7 +244,8 @@ public void testTooManyTTL() { final TSStatus status = ttlInfo.setTTL(setTTLPlan); assertEquals(TSStatusCode.OVERSIZE_TTL.getStatusCode(), status.code); assertEquals( - "The number of TTL rules has reached the limit (1000). Please delete some existing rules first.", + "The number of TTL rules has reached the limit " + + "(capacity: 1000, requested total: 1001). Please delete some existing rules first.", status.message); } From 6614bbd686b5427d619b63b816cea8b838dc0db4 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 22 May 2026 11:27:36 +0800 Subject: [PATCH 5/7] fix --- .../iotdb/confignode/manager/TTLManager.java | 4 ++ .../iotdb/confignode/persistence/TTLInfo.java | 9 +++++ .../impl/schema/SetTTLProcedure.java | 31 ++++++++------ .../procedure/state/schema/SetTTLState.java | 4 +- .../confignode/persistence/TTLInfoTest.java | 12 ++++++ .../impl/schema/SetTTLProcedureTest.java | 40 ++++++++++++++++--- 6 files changed, 82 insertions(+), 18 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TTLManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TTLManager.java index dc6ae4f37a194..284b77004beac 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TTLManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TTLManager.java @@ -127,6 +127,10 @@ public Map getAllTTL() { return ((ShowTTLResp) showTTL(new ShowTTLPlan())).getPathTTLMap(); } + public long getTTL(final String[] pathPattern) { + return ttlInfo.getTTL(pathPattern); + } + public int getTTLCount() { return ttlInfo.getTTLCount(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java index f416fd5f82ed8..4921b03851c0d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java @@ -175,6 +175,15 @@ public int getTTLCount() { } } + public long getTTL(final String[] pathPattern) { + lock.readLock().lock(); + try { + return ttlCache.getLastNodeTTL(pathPattern); + } finally { + lock.readLock().unlock(); + } + } + /** * Get the maximum ttl of the corresponding database level. * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java index 1491372bc9b22..3cd38e742269d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java @@ -79,6 +79,10 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, SetTTLState state) long startTime = System.currentTimeMillis(); try { switch (state) { + case CAPTURE_PREVIOUS_TTL: + capturePreviousTTLState(env); + setNextState(SetTTLState.SET_CONFIGNODE_TTL); + return Flow.HAS_MORE_STATE; case SET_CONFIGNODE_TTL: setConfigNodeTTL(env); return Flow.HAS_MORE_STATE; @@ -94,8 +98,12 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, SetTTLState state) } } - protected void setConfigNodeTTL(final ConfigNodeProcedureEnv env) { - capturePreviousTTLState(env); + void setConfigNodeTTL(final ConfigNodeProcedureEnv env) { + if (!previousTTLStateCaptured) { + capturePreviousTTLState(env); + setNextState(SetTTLState.SET_CONFIGNODE_TTL); + return; + } final TSStatus res = writeConfigNodePlan(env, plan); if (res.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.info(ProcedureMessages.FAILED_TO_EXECUTE_PLAN_BECAUSE, plan, res.message); @@ -105,7 +113,7 @@ protected void setConfigNodeTTL(final ConfigNodeProcedureEnv env) { } } - protected void updateDataNodeTTL(final ConfigNodeProcedureEnv env) { + void updateDataNodeTTL(final ConfigNodeProcedureEnv env) { final Map dataNodeLocationMap = env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations(); final DataNodeAsyncRequestContext clientHandler = @@ -124,17 +132,15 @@ private void capturePreviousTTLState(final ConfigNodeProcedureEnv env) { if (previousTTLStateCaptured) { return; } - final Map ttlMap = env.getConfigManager().getTTLManager().getAllTTL(); - previousTTL = getTTLOrDefault(ttlMap, plan.getPathPattern()); + previousTTL = getTTLOrDefault(env, plan.getPathPattern()); if (plan.isDataBase()) { previousDatabaseWildcardTTL = - getTTLOrDefault(ttlMap, getDatabaseWildcardPathPattern(plan.getPathPattern())); + getTTLOrDefault(env, getDatabaseWildcardPathPattern(plan.getPathPattern())); } previousTTLStateCaptured = true; } - protected TSStatus writeConfigNodePlan( - final ConfigNodeProcedureEnv env, final SetTTLPlan setTTLPlan) { + TSStatus writeConfigNodePlan(final ConfigNodeProcedureEnv env, final SetTTLPlan setTTLPlan) { try { return env.getConfigManager() .getConsensusManager() @@ -147,7 +153,7 @@ protected TSStatus writeConfigNodePlan( } } - protected DataNodeAsyncRequestContext sendTTLRequest( + DataNodeAsyncRequestContext sendTTLRequest( final Map dataNodeLocationMap, final TSetTTLReq req) { final DataNodeAsyncRequestContext clientHandler = new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.SET_TTL, req, dataNodeLocationMap); @@ -174,8 +180,9 @@ private boolean hasFailedDataNode( return false; } - private long getTTLOrDefault(final Map ttlMap, final String[] pathPattern) { - return ttlMap.getOrDefault(String.join(".", pathPattern), TTL_NOT_EXIST); + private long getTTLOrDefault(final ConfigNodeProcedureEnv env, final String[] pathPattern) { + final long ttl = env.getConfigManager().getTTLManager().getTTL(pathPattern); + return ttl == TTLCache.NULL_TTL ? TTL_NOT_EXIST : ttl; } private String[] getDatabaseWildcardPathPattern(final String[] pathPattern) { @@ -283,7 +290,7 @@ protected int getStateId(SetTTLState setTTLState) { @Override protected SetTTLState getInitialState() { - return SetTTLState.SET_CONFIGNODE_TTL; + return SetTTLState.CAPTURE_PREVIOUS_TTL; } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/SetTTLState.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/SetTTLState.java index fbdc026fc709b..4dd3063ea3f62 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/SetTTLState.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/SetTTLState.java @@ -19,6 +19,8 @@ package org.apache.iotdb.confignode.procedure.state.schema; public enum SetTTLState { + // Keep existing state ordinals stable for persisted procedures. SET_CONFIGNODE_TTL, - UPDATE_DATANODE_CACHE + UPDATE_DATANODE_CACHE, + CAPTURE_PREVIOUS_TTL } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TTLInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TTLInfoTest.java index e959b88dab246..e424671fb3a54 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TTLInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TTLInfoTest.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.schema.ttl.TTLCache; import org.apache.iotdb.confignode.consensus.request.read.ttl.ShowTTLPlan; import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan; import org.apache.iotdb.confignode.consensus.response.ttl.ShowTTLResp; @@ -211,6 +212,17 @@ public void testSetAndUnsetTTL() throws IllegalPathException { Assert.assertEquals(4, ttlInfo.getTTLCount()); } + @Test + public void testGetTTLReturnsExactPathTTL() throws IllegalPathException { + PartialPath path = new PartialPath("root.test.db1.**"); + ttlInfo.setTTL(new SetTTLPlan(Arrays.asList(path.getNodes()), 121322323L)); + + Assert.assertEquals(121322323L, ttlInfo.getTTL(path.getNodes())); + Assert.assertEquals( + TTLCache.NULL_TTL, ttlInfo.getTTL(new PartialPath("root.test.db1").getNodes())); + Assert.assertEquals(Long.MAX_VALUE, ttlInfo.getTTL(new PartialPath("root.**").getNodes())); + } + @Test public void testUnsetNonExistTTL() throws IllegalPathException { assertEquals( diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java index 06c84729c6757..67b15efba25d0 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java @@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSetTTLReq; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.schema.ttl.TTLCache; import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType; import org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext; import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan; @@ -99,7 +100,7 @@ public void serializeDeserializeTestWithCapturedRollbackState() throws Exception ttlMap.put("root.db", 500L); ttlMap.put("root.db.**", 600L); - procedure.executeFromState(mockProcedureEnv(ttlMap), SetTTLState.SET_CONFIGNODE_TTL); + procedure.executeFromState(mockProcedureEnv(ttlMap), SetTTLState.CAPTURE_PREVIOUS_TTL); procedure.serialize(outputStream); final ByteBuffer buffer = @@ -110,6 +111,29 @@ public void serializeDeserializeTestWithCapturedRollbackState() throws Exception deserializedProcedure, "root.db", 2000L, true, true, 500L, 600L, false); } + @Test + public void setConfigNodeTTLShouldNotWriteBeforePreviousStateIsCaptured() throws Exception { + final SetTTLPlan setTTLPlan = + new SetTTLPlan(Arrays.asList(new PartialPath("root.db").getNodes()), 2000L); + setTTLPlan.setDataBase(true); + final TestingSetTTLProcedure procedure = new TestingSetTTLProcedure(setTTLPlan); + + final Map ttlMap = new HashMap<>(); + ttlMap.put("root.**", Long.MAX_VALUE); + ttlMap.put("root.db", 500L); + ttlMap.put("root.db.**", 600L); + + procedure.executeFromState(mockProcedureEnv(ttlMap), SetTTLState.SET_CONFIGNODE_TTL); + + Assert.assertTrue(procedure.getWrittenPlans().isEmpty()); + assertSerializedProcedure(procedure, "root.db", 2000L, true, true, 500L, 600L, false); + + procedure.executeFromState(mockProcedureEnv(ttlMap), SetTTLState.SET_CONFIGNODE_TTL); + + Assert.assertEquals(1, procedure.getWrittenPlans().size()); + assertPlan(procedure.getWrittenPlans().get(0), "root.db", 2000L, true); + } + @Test public void rollbackStateShouldUnsetNewTTLWhenPreviousStateDidNotExist() throws Exception { final SetTTLPlan setTTLPlan = @@ -120,6 +144,7 @@ public void rollbackStateShouldUnsetNewTTLWhenPreviousStateDidNotExist() throws final ConfigNodeProcedureEnv env = mockProcedureEnv(Collections.singletonMap("root.**", Long.MAX_VALUE)); + procedure.executeFromState(env, SetTTLState.CAPTURE_PREVIOUS_TTL); procedure.executeFromState(env, SetTTLState.SET_CONFIGNODE_TTL); procedure.executeFromState(env, SetTTLState.UPDATE_DATANODE_CACHE); Assert.assertTrue(procedure.isFailed()); @@ -149,6 +174,7 @@ public void rollbackStateShouldRestoreDatabaseWildcardTTLSeparately() throws Exc ttlMap.put("root.db.**", 600L); final ConfigNodeProcedureEnv env = mockProcedureEnv(ttlMap); + procedure.executeFromState(env, SetTTLState.CAPTURE_PREVIOUS_TTL); procedure.executeFromState(env, SetTTLState.SET_CONFIGNODE_TTL); procedure.executeFromState(env, SetTTLState.UPDATE_DATANODE_CACHE); Assert.assertTrue(procedure.isFailed()); @@ -177,7 +203,12 @@ private ConfigNodeProcedureEnv mockProcedureEnv(final Map ttlMap) Mockito.when(env.getConfigManager()).thenReturn(configManager); Mockito.when(configManager.getTTLManager()).thenReturn(ttlManager); - Mockito.when(ttlManager.getAllTTL()).thenReturn(ttlMap); + Mockito.when(ttlManager.getTTL(Mockito.any(String[].class))) + .thenAnswer( + invocation -> { + final String[] pathPattern = invocation.getArgument(0); + return ttlMap.getOrDefault(String.join(".", pathPattern), TTLCache.NULL_TTL); + }); Mockito.when(configManager.getNodeManager()).thenReturn(nodeManager); Mockito.when(nodeManager.getRegisteredDataNodeLocations()) .thenReturn(Collections.singletonMap(1, dataNodeLocation)); @@ -269,14 +300,13 @@ private List getWrittenPlans() { } @Override - protected TSStatus writeConfigNodePlan( - final ConfigNodeProcedureEnv env, final SetTTLPlan setTTLPlan) { + TSStatus writeConfigNodePlan(final ConfigNodeProcedureEnv env, final SetTTLPlan setTTLPlan) { writtenPlans.add(copyPlan(setTTLPlan)); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } @Override - protected DataNodeAsyncRequestContext sendTTLRequest( + DataNodeAsyncRequestContext sendTTLRequest( final Map dataNodeLocationMap, final TSetTTLReq req) { requests.add(copyRequest(req)); From 2b04150aaf59d486ed8a6cad600d314e281414f0 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 22 May 2026 11:44:04 +0800 Subject: [PATCH 6/7] gr --- .../impl/schema/SetTTLProcedure.java | 11 ++++- .../impl/schema/SetTTLProcedureTest.java | 42 +++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java index 3cd38e742269d..dca79a02366f6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java @@ -58,6 +58,7 @@ public class SetTTLProcedure extends StateMachineProcedure= 17) { + if (byteBuffer.remaining() >= ROLLBACK_STATE_BYTES) { this.previousTTLStateCaptured = byteBuffer.get() != 0; this.previousTTL = byteBuffer.getLong(); this.previousDatabaseWildcardTTL = byteBuffer.getLong(); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java index 67b15efba25d0..ae23386cfea4b 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java @@ -31,12 +31,16 @@ import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.confignode.manager.TTLManager; import org.apache.iotdb.confignode.manager.node.NodeManager; +import org.apache.iotdb.confignode.procedure.Procedure; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.confignode.procedure.state.ProcedureState; import org.apache.iotdb.confignode.procedure.state.schema.SetTTLState; import org.apache.iotdb.confignode.procedure.store.ProcedureFactory; +import org.apache.iotdb.confignode.procedure.store.ProcedureType; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.utils.PublicBAOS; +import org.apache.tsfile.utils.ReadWriteIOUtils; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -111,6 +115,25 @@ public void serializeDeserializeTestWithCapturedRollbackState() throws Exception deserializedProcedure, "root.db", 2000L, true, true, 500L, 600L, false); } + @Test + public void deserializeOldFormatWithoutRollbackStateTest() throws Exception { + final SetTTLPlan setTTLPlan = + new SetTTLPlan(Arrays.asList(new PartialPath("root.db").getNodes()), 2000L); + setTTLPlan.setDataBase(true); + + final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); + writeOldFormatProcedure(outputStream, setTTLPlan); + + final ByteBuffer buffer = + ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + final SetTTLProcedure deserializedProcedure = + (SetTTLProcedure) ProcedureFactory.getInstance().create(buffer); + + assertSerializedProcedure( + deserializedProcedure, "root.db", 2000L, true, false, Long.MIN_VALUE, Long.MIN_VALUE, false); + } + @Test public void setConfigNodeTTLShouldNotWriteBeforePreviousStateIsCaptured() throws Exception { final SetTTLPlan setTTLPlan = @@ -229,6 +252,25 @@ private void assertRequest( Assert.assertEquals(isDataBase, req.isDataBase); } + private void writeOldFormatProcedure(final DataOutputStream stream, final SetTTLPlan plan) + throws IOException { + stream.writeShort(ProcedureType.SET_TTL_PROCEDURE.getTypeCode()); + // Procedure fields. + stream.writeLong(Procedure.NO_PROC_ID); + stream.writeInt(ProcedureState.INITIALIZING.ordinal()); + stream.writeLong(0L); + stream.writeLong(0L); + stream.writeLong(Procedure.NO_PROC_ID); + stream.writeLong(Procedure.NO_TIMEOUT); + stream.writeInt(-1); // no stack indexes + stream.write((byte) 0); // no exception + stream.writeInt(-1); // no result + stream.write((byte) 0); // no lock + // StateMachineProcedure fields. + stream.writeInt(0); // no states + ReadWriteIOUtils.write(plan.serializeToByteBuffer(), stream); + } + private void assertSerializedProcedure( final SetTTLProcedure procedure, final String path, From d0a056896206416d8ccdfa6dd0078d8e6ed04c12 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 22 May 2026 16:37:30 +0800 Subject: [PATCH 7/7] Update SetTTLProcedureTest.java --- .../procedure/impl/schema/SetTTLProcedureTest.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java index ae23386cfea4b..cb09c23659c39 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java @@ -131,7 +131,14 @@ public void deserializeOldFormatWithoutRollbackStateTest() throws Exception { (SetTTLProcedure) ProcedureFactory.getInstance().create(buffer); assertSerializedProcedure( - deserializedProcedure, "root.db", 2000L, true, false, Long.MIN_VALUE, Long.MIN_VALUE, false); + deserializedProcedure, + "root.db", + 2000L, + true, + false, + Long.MIN_VALUE, + Long.MIN_VALUE, + false); } @Test