Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ public Map<String, Long> getAllTTL() {
return ((ShowTTLResp) showTTL(new ShowTTLPlan())).getPathTTLMap();
}

public long getTTL(final String[] pathPattern) {
return ttlInfo.getTTL(pathPattern);
}

public int getTTLCount() {
return ttlInfo.getTTLCount();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,15 @@ public TSStatus setTTL(SetTTLPlan plan) {
try {
// check ttl rule capacity
final int tTlRuleCapacity = CommonDescriptor.getInstance().getConfig().getTTlRuleCapacity();
if (getTTLCount() >= tTlRuleCapacity) {
final int newTTLRuleCount = calculateNewTTLRuleCount(plan);
final int requestedTTLRuleCount = ttlCache.getTtlCount() + newTTLRuleCount;
if (newTTLRuleCount > 0 && requestedTTLRuleCount > tTlRuleCapacity) {
TSStatus errorStatus = new TSStatus(TSStatusCode.OVERSIZE_TTL.getStatusCode());
errorStatus.setMessage(
String.format(
"The number of TTL rules has reached the limit (%d). Please delete "
+ "some existing rules first.",
tTlRuleCapacity));
"The number of TTL rules has reached the limit "
+ "(capacity: %d, requested total: %d). Please delete some existing rules first.",
tTlRuleCapacity, requestedTTLRuleCount));
return errorStatus;
}
ttlCache.setTTL(plan.getPathPattern(), plan.getTTL());
Expand All @@ -92,6 +94,20 @@ public TSStatus setTTL(SetTTLPlan plan) {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}

private int calculateNewTTLRuleCount(SetTTLPlan plan) {
int newTTLRuleCount = isNewTTLRule(plan.getPathPattern()) ? 1 : 0;
if (plan.isDataBase()) {
String[] pathNodes = Arrays.copyOf(plan.getPathPattern(), plan.getPathPattern().length + 1);
pathNodes[pathNodes.length - 1] = IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
newTTLRuleCount += isNewTTLRule(pathNodes) ? 1 : 0;
}
return newTTLRuleCount;
}

private boolean isNewTTLRule(String[] pathNodes) {
return ttlCache.getLastNodeTTL(pathNodes) == TTLCache.NULL_TTL;
}

/** Only used for upgrading from database level ttl to device level ttl. */
public void setTTL(Map<String, Long> databaseTTLMap) throws IllegalPathException {
lock.writeLock().lock();
Expand Down Expand Up @@ -159,6 +175,15 @@ public int getTTLCount() {
}
}

public long getTTL(final String[] pathPattern) {
lock.readLock().lock();
try {
return ttlCache.getLastNodeTTL(pathPattern);
} finally {
lock.readLock().unlock();
}
}

/**
* Get the maximum ttl of the corresponding database level.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.schema.ttl.TTLCache;
import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
import org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
import org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
Expand All @@ -47,14 +49,21 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;

public class SetTTLProcedure extends StateMachineProcedure<ConfigNodeProcedureEnv, SetTTLState> {
private static final Logger LOGGER = LoggerFactory.getLogger(SetTTLProcedure.class);
// Distinguishes no previous TTL from TTLCache.NULL_TTL, the explicit unset marker for rollback.
private static final long TTL_NOT_EXIST = Long.MIN_VALUE;
private static final int ROLLBACK_STATE_BYTES = Byte.BYTES + Long.BYTES * 2;

private SetTTLPlan plan;
private long previousTTL = TTL_NOT_EXIST;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth a one-line comment explaining why this sentinel is Long.MIN_VALUE rather than TTLCache.NULL_TTL (-1). They have different meanings on the rollback path: TTL_NOT_EXIST means "no TTL was set before this procedure", while NULL_TTL is the explicit "unset" marker that ConfigPlanExecutor interprets to route to unsetTTL. Conflating the two would corrupt rollback behavior.

private long previousDatabaseWildcardTTL = TTL_NOT_EXIST;
private boolean previousTTLStateCaptured = false;

public SetTTLProcedure(final boolean isGeneratedByPipe) {
super(isGeneratedByPipe);
Expand All @@ -71,6 +80,10 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, SetTTLState state)
long startTime = System.currentTimeMillis();
try {
switch (state) {
case CAPTURE_PREVIOUS_TTL:
capturePreviousTTLState(env);
setNextState(SetTTLState.SET_CONFIGNODE_TTL);
return Flow.HAS_MORE_STATE;
case SET_CONFIGNODE_TTL:
setConfigNodeTTL(env);
return Flow.HAS_MORE_STATE;
Expand All @@ -86,18 +99,13 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, SetTTLState state)
}
}

private void setConfigNodeTTL(ConfigNodeProcedureEnv env) {
TSStatus res;
try {
res =
env.getConfigManager()
.getConsensusManager()
.write(isGeneratedByPipe ? new PipeEnrichedPlan(this.plan) : this.plan);
} catch (ConsensusException e) {
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE, e);
res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
res.setMessage(e.getMessage());
void setConfigNodeTTL(final ConfigNodeProcedureEnv env) {
if (!previousTTLStateCaptured) {
capturePreviousTTLState(env);
setNextState(SetTTLState.SET_CONFIGNODE_TTL);
return;
}
final TSStatus res = writeConfigNodePlan(env, plan);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(medium) previousTTL can be overwritten with the new value after a crash.

capturePreviousTTLState and writeConfigNodePlan both run inside one executeFromState(SET_CONFIGNODE_TTL) invocation, but the procedure framework only persists previousTTLStateCaptured / previousTTL on state transitions. If the ConfigNode crashes after the consensus write succeeds but before setNextState(UPDATE_DATANODE_CACHE) is durable, recovery replays SET_CONFIGNODE_TTL: previousTTLStateCaptured is still false, so we re-run capture — and getAllTTL() now returns the new value, which gets recorded as the "previous" TTL. If the DataNode update then fails, the rollback restores the new value over itself, losing the original.

Suggestion: split capture into its own pre-state (e.g. CAPTURE_PREVIOUS_TTLSET_CONFIGNODE_TTLUPDATE_DATANODE_CACHE) so the captured snapshot is persisted with procedure state before the consensus write.

if (res.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.info(ProcedureMessages.FAILED_TO_EXECUTE_PLAN_BECAUSE, plan, res.message);
setFailure(new ProcedureException(new IoTDBException(res)));
Expand All @@ -106,35 +114,177 @@ private void setConfigNodeTTL(ConfigNodeProcedureEnv env) {
}
}

private void updateDataNodeTTL(ConfigNodeProcedureEnv env) {
Map<Integer, TDataNodeLocation> dataNodeLocationMap =
void updateDataNodeTTL(final ConfigNodeProcedureEnv env) {
final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.SET_TTL,
new TSetTTLReq(
Collections.singletonList(String.join(".", plan.getPathPattern())),
plan.getTTL(),
plan.isDataBase()),
dataNodeLocationMap);
final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
sendTTLRequest(
dataNodeLocationMap,
buildSetTTLReq(plan.getPathPattern(), plan.getTTL(), plan.isDataBase()));
if (hasFailedDataNode(clientHandler)) {
LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE);
setFailure(
new ProcedureException(
new MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED)));
}
}

private void capturePreviousTTLState(final ConfigNodeProcedureEnv env) {
if (previousTTLStateCaptured) {
return;
}
previousTTL = getTTLOrDefault(env, plan.getPathPattern());
if (plan.isDataBase()) {
previousDatabaseWildcardTTL =
getTTLOrDefault(env, getDatabaseWildcardPathPattern(plan.getPathPattern()));
}
previousTTLStateCaptured = true;
}

TSStatus writeConfigNodePlan(final ConfigNodeProcedureEnv env, final SetTTLPlan setTTLPlan) {
try {
return env.getConfigManager()
.getConsensusManager()
.write(isGeneratedByPipe ? new PipeEnrichedPlan(setTTLPlan) : setTTLPlan);
} catch (ConsensusException e) {
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE, e);
final TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
res.setMessage(e.getMessage());
return res;
}
}

DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> sendTTLRequest(
final Map<Integer, TDataNodeLocation> dataNodeLocationMap, final TSetTTLReq req) {
final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.SET_TTL, req, dataNodeLocationMap);
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
for (TSStatus status : statusMap.values()) {
// all dataNodes must clear the related schemaengine cache
return clientHandler;
}

private TSetTTLReq buildSetTTLReq(
final String[] pathPattern, final long ttl, final boolean isDataBase) {
return new TSetTTLReq(
Collections.singletonList(String.join(".", pathPattern)), ttl, isDataBase);
}

private boolean hasFailedDataNode(
final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler) {
if (!clientHandler.getRequestIndices().isEmpty()) {
return true;
}
for (TSStatus status : clientHandler.getResponseMap().values()) {
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE);
setFailure(
new ProcedureException(
new MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED)));
return;
return true;
}
}
return false;
}

private long getTTLOrDefault(final ConfigNodeProcedureEnv env, final String[] pathPattern) {
final long ttl = env.getConfigManager().getTTLManager().getTTL(pathPattern);
return ttl == TTLCache.NULL_TTL ? TTL_NOT_EXIST : ttl;
}

private String[] getDatabaseWildcardPathPattern(final String[] pathPattern) {
final String[] pathNodes = Arrays.copyOf(pathPattern, pathPattern.length + 1);
pathNodes[pathNodes.length - 1] = IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
return pathNodes;
}

private void rollbackConfigNodeTTL(final ConfigNodeProcedureEnv env) throws ProcedureException {
restoreTTLOnConfigNode(env, plan.getPathPattern(), previousTTL);
if (plan.isDataBase()) {
restoreTTLOnConfigNode(
env, getDatabaseWildcardPathPattern(plan.getPathPattern()), previousDatabaseWildcardTTL);
}
}

private void restoreTTLOnConfigNode(
final ConfigNodeProcedureEnv env, final String[] pathPattern, final long ttl)
throws ProcedureException {
// TTL_NOT_EXIST means the original ttl was absent; NULL_TTL asks the executor to unset it.
final SetTTLPlan rollbackPlan =
new SetTTLPlan(pathPattern, ttl == TTL_NOT_EXIST ? TTLCache.NULL_TTL : ttl);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ttl == TTL_NOT_EXIST ? TTLCache.NULL_TTL : ttl translation deserves an inline comment. It encodes a cross-file convention: a SetTTLPlan whose ttl == NULL_TTL is interpreted by ConfigPlanExecutor.SetTTL (file ConfigPlanExecutor.java:430) as a signal to call ttlInfo.unsetTTL rather than setTTL, and similarly StorageEngine.setTTL (StorageEngine.java:973) routes to unsetTTLForTree. Reading the rollback code in isolation, it's not obvious that writing -1 is what undoes the previous SET — a one-line note here would save a lot of code-spelunking.

// Database rollback restores the database path and db.** separately, so avoid auto-expansion.
rollbackPlan.setDataBase(false);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The explicit setDataBase(false) here is correct but non-obvious — please add a comment. The reason: in the database case, rollbackConfigNodeTTL already issues two separate restoreTTLOnConfigNode calls (one for db, one for db.**). If this inner SetTTLPlan carried isDataBase=true, TTLInfo.setTTL would auto-expand the wildcard again, overwriting the just-restored db.** value with the value being applied to db. Forcing isDataBase=false keeps each path independent.

final TSStatus status = writeConfigNodePlan(env, rollbackPlan);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new ProcedureException(
new MetadataException(
"Rollback ConfigNode ttl failed for "
+ String.join(".", pathPattern)
+ ": "
+ status.getMessage()));
}
}

private void rollbackDataNodeTTL(final ConfigNodeProcedureEnv env) throws ProcedureException {
final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
restoreTTLOnDataNodes(dataNodeLocationMap, plan.getPathPattern(), previousTTL);
if (plan.isDataBase()) {
restoreTTLOnDataNodes(
dataNodeLocationMap,
getDatabaseWildcardPathPattern(plan.getPathPattern()),
previousDatabaseWildcardTTL);
}
}

private void restoreTTLOnDataNodes(
final Map<Integer, TDataNodeLocation> dataNodeLocationMap,
final String[] pathPattern,
final long ttl)
throws ProcedureException {
if (dataNodeLocationMap.isEmpty()) {
return;
}
final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
sendTTLRequest(
dataNodeLocationMap,
buildSetTTLReq(pathPattern, ttl == TTL_NOT_EXIST ? TTLCache.NULL_TTL : ttl, false));
if (hasFailedDataNode(clientHandler)) {
throw new ProcedureException(
new MetadataException(
"Rollback dataNode ttl cache failed for " + String.join(".", pathPattern)));
}
}

/**
* Best-effort rollback: restore both sides, throw the earliest failure, and suppress later ones.
*/
@Override
protected void rollbackState(
ConfigNodeProcedureEnv configNodeProcedureEnv, SetTTLState setTTLState)
throws IOException, InterruptedException, ProcedureException {}
protected void rollbackState(final ConfigNodeProcedureEnv env, final SetTTLState setTTLState)
throws IOException, InterruptedException, ProcedureException {
if (setTTLState != SetTTLState.UPDATE_DATANODE_CACHE || !previousTTLStateCaptured) {
return;
}
ProcedureException rollbackFailure = null;
try {
rollbackConfigNodeTTL(env);
} catch (ProcedureException e) {
LOGGER.error("Failed to rollback ConfigNode ttl state.", e);
rollbackFailure = e;
}
try {
rollbackDataNodeTTL(env);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two suggestions on the rollback error-aggregation strategy:

  1. Only the first exception survives; the second is logged but lost. Use rollbackFailure.addSuppressed(e) so a postmortem can still see both.
  2. Add a method-level comment clarifying the policy: "best-effort — if one side fails, the other is still attempted; the earliest exception is what propagates." Otherwise readers might expect fail-fast semantics.

} catch (ProcedureException e) {
LOGGER.error("Failed to rollback DataNode ttl cache.", e);
if (rollbackFailure == null) {
rollbackFailure = e;
} else {
rollbackFailure.addSuppressed(e);
}
}
if (rollbackFailure != null) {
throw rollbackFailure;
}
}

@Override
protected boolean isRollbackSupported(final SetTTLState state) {
return state == SetTTLState.UPDATE_DATANODE_CACHE;
}

@Override
protected SetTTLState getState(int stateId) {
Expand All @@ -148,7 +298,7 @@ protected int getStateId(SetTTLState setTTLState) {

@Override
protected SetTTLState getInitialState() {
return SetTTLState.SET_CONFIGNODE_TTL;
return SetTTLState.CAPTURE_PREVIOUS_TTL;
}

@Override
Expand All @@ -159,14 +309,25 @@ public void serialize(DataOutputStream stream) throws IOException {
: ProcedureType.SET_TTL_PROCEDURE.getTypeCode());
super.serialize(stream);
ReadWriteIOUtils.write(plan.serializeToByteBuffer(), stream);
stream.writeBoolean(previousTTLStateCaptured);
stream.writeLong(previousTTL);
stream.writeLong(previousDatabaseWildcardTTL);
}

@Override
public void deserialize(ByteBuffer byteBuffer) {
super.deserialize(byteBuffer);
try {
ReadWriteIOUtils.readInt(byteBuffer);
final int length = ReadWriteIOUtils.readInt(byteBuffer);
final int position = byteBuffer.position();
this.plan = (SetTTLPlan) ConfigPhysicalPlan.Factory.create(byteBuffer);
// The serialized plan buffer may include padding; skip to the actual payload end.
byteBuffer.position(position + length);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This byteBuffer.position(position + length) is necessary (not redundant) — worth a comment so a future reader doesn't "clean it up".

Reason: serializeToByteBuffer() returns ByteBuffer.wrap(publicBAOS.getBuf(), 0, size()). ReadWriteIOUtils.write(ByteBuffer, OutputStream) then writes byteBuffer.capacity() (the full backing array length, which can be greater than the actual data size) followed by the whole array, padding included. Without forcing the position back to position + length, the new boolean + 2*long trailing fields would be read from those padding zero bytes.

if (byteBuffer.remaining() >= ROLLBACK_STATE_BYTES) {
this.previousTTLStateCaptured = byteBuffer.get() != 0;
this.previousTTL = byteBuffer.getLong();
this.previousDatabaseWildcardTTL = byteBuffer.getLong();
}
} catch (IOException e) {
LOGGER.error(ProcedureMessages.IO_ERROR_WHEN_DESERIALIZE_SETTTL_PLAN, e);
}
Expand All @@ -180,12 +341,21 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
return this.plan.equals(((SetTTLProcedure) o).plan)
&& this.isGeneratedByPipe == (((SetTTLProcedure) o).isGeneratedByPipe);
final SetTTLProcedure that = (SetTTLProcedure) o;
return this.isGeneratedByPipe == that.isGeneratedByPipe
&& this.previousTTL == that.previousTTL
&& this.previousDatabaseWildcardTTL == that.previousDatabaseWildcardTTL
&& this.previousTTLStateCaptured == that.previousTTLStateCaptured
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Including previousTTL / previousDatabaseWildcardTTL / previousTTLStateCaptured in equals/hashCode is consistent with the serialization change, but please confirm no caller dedups procedures by equals on the wait queue. If something does, two procedures with the same plan but different captured state would now be considered distinct, which could change idempotency behavior. If there's no such caller, ignore this comment.

&& this.plan.equals(that.plan);
}

@Override
public int hashCode() {
return Objects.hash(plan, isGeneratedByPipe);
return Objects.hash(
plan,
isGeneratedByPipe,
previousTTL,
previousDatabaseWildcardTTL,
previousTTLStateCaptured);
}
}
Loading
Loading