-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Fixed TTL problems #17735
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Fixed TTL problems #17735
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,8 +22,10 @@ | |
| import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; | ||
| import org.apache.iotdb.common.rpc.thrift.TSStatus; | ||
| import org.apache.iotdb.common.rpc.thrift.TSetTTLReq; | ||
| import org.apache.iotdb.commons.conf.IoTDBConstant; | ||
| import org.apache.iotdb.commons.exception.IoTDBException; | ||
| import org.apache.iotdb.commons.exception.MetadataException; | ||
| import org.apache.iotdb.commons.schema.ttl.TTLCache; | ||
| import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType; | ||
| import org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager; | ||
| import org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext; | ||
|
|
@@ -47,14 +49,21 @@ | |
| import java.io.DataOutputStream; | ||
| import java.io.IOException; | ||
| import java.nio.ByteBuffer; | ||
| import java.util.Arrays; | ||
| import java.util.Collections; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
|
|
||
| public class SetTTLProcedure extends StateMachineProcedure<ConfigNodeProcedureEnv, SetTTLState> { | ||
| private static final Logger LOGGER = LoggerFactory.getLogger(SetTTLProcedure.class); | ||
| // Distinguishes no previous TTL from TTLCache.NULL_TTL, the explicit unset marker for rollback. | ||
| private static final long TTL_NOT_EXIST = Long.MIN_VALUE; | ||
| private static final int ROLLBACK_STATE_BYTES = Byte.BYTES + Long.BYTES * 2; | ||
|
|
||
| private SetTTLPlan plan; | ||
| private long previousTTL = TTL_NOT_EXIST; | ||
| private long previousDatabaseWildcardTTL = TTL_NOT_EXIST; | ||
| private boolean previousTTLStateCaptured = false; | ||
|
|
||
| public SetTTLProcedure(final boolean isGeneratedByPipe) { | ||
| super(isGeneratedByPipe); | ||
|
|
@@ -71,6 +80,10 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, SetTTLState state) | |
| long startTime = System.currentTimeMillis(); | ||
| try { | ||
| switch (state) { | ||
| case CAPTURE_PREVIOUS_TTL: | ||
| capturePreviousTTLState(env); | ||
| setNextState(SetTTLState.SET_CONFIGNODE_TTL); | ||
| return Flow.HAS_MORE_STATE; | ||
| case SET_CONFIGNODE_TTL: | ||
| setConfigNodeTTL(env); | ||
| return Flow.HAS_MORE_STATE; | ||
|
|
@@ -86,18 +99,13 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, SetTTLState state) | |
| } | ||
| } | ||
|
|
||
| private void setConfigNodeTTL(ConfigNodeProcedureEnv env) { | ||
| TSStatus res; | ||
| try { | ||
| res = | ||
| env.getConfigManager() | ||
| .getConsensusManager() | ||
| .write(isGeneratedByPipe ? new PipeEnrichedPlan(this.plan) : this.plan); | ||
| } catch (ConsensusException e) { | ||
| LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE, e); | ||
| res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); | ||
| res.setMessage(e.getMessage()); | ||
| void setConfigNodeTTL(final ConfigNodeProcedureEnv env) { | ||
| if (!previousTTLStateCaptured) { | ||
| capturePreviousTTLState(env); | ||
| setNextState(SetTTLState.SET_CONFIGNODE_TTL); | ||
| return; | ||
| } | ||
| final TSStatus res = writeConfigNodePlan(env, plan); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (medium)
Suggestion: split capture into its own pre-state (e.g. |
||
| if (res.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { | ||
| LOGGER.info(ProcedureMessages.FAILED_TO_EXECUTE_PLAN_BECAUSE, plan, res.message); | ||
| setFailure(new ProcedureException(new IoTDBException(res))); | ||
|
|
@@ -106,35 +114,177 @@ private void setConfigNodeTTL(ConfigNodeProcedureEnv env) { | |
| } | ||
| } | ||
|
|
||
| private void updateDataNodeTTL(ConfigNodeProcedureEnv env) { | ||
| Map<Integer, TDataNodeLocation> dataNodeLocationMap = | ||
| void updateDataNodeTTL(final ConfigNodeProcedureEnv env) { | ||
| final Map<Integer, TDataNodeLocation> dataNodeLocationMap = | ||
| env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations(); | ||
| DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler = | ||
| new DataNodeAsyncRequestContext<>( | ||
| CnToDnAsyncRequestType.SET_TTL, | ||
| new TSetTTLReq( | ||
| Collections.singletonList(String.join(".", plan.getPathPattern())), | ||
| plan.getTTL(), | ||
| plan.isDataBase()), | ||
| dataNodeLocationMap); | ||
| final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler = | ||
| sendTTLRequest( | ||
| dataNodeLocationMap, | ||
| buildSetTTLReq(plan.getPathPattern(), plan.getTTL(), plan.isDataBase())); | ||
| if (hasFailedDataNode(clientHandler)) { | ||
| LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE); | ||
| setFailure( | ||
| new ProcedureException( | ||
| new MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED))); | ||
| } | ||
| } | ||
|
|
||
| private void capturePreviousTTLState(final ConfigNodeProcedureEnv env) { | ||
| if (previousTTLStateCaptured) { | ||
| return; | ||
| } | ||
| previousTTL = getTTLOrDefault(env, plan.getPathPattern()); | ||
| if (plan.isDataBase()) { | ||
| previousDatabaseWildcardTTL = | ||
| getTTLOrDefault(env, getDatabaseWildcardPathPattern(plan.getPathPattern())); | ||
| } | ||
| previousTTLStateCaptured = true; | ||
| } | ||
|
|
||
| TSStatus writeConfigNodePlan(final ConfigNodeProcedureEnv env, final SetTTLPlan setTTLPlan) { | ||
| try { | ||
| return env.getConfigManager() | ||
| .getConsensusManager() | ||
| .write(isGeneratedByPipe ? new PipeEnrichedPlan(setTTLPlan) : setTTLPlan); | ||
| } catch (ConsensusException e) { | ||
| LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE, e); | ||
| final TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); | ||
| res.setMessage(e.getMessage()); | ||
| return res; | ||
| } | ||
| } | ||
|
|
||
| DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> sendTTLRequest( | ||
| final Map<Integer, TDataNodeLocation> dataNodeLocationMap, final TSetTTLReq req) { | ||
| final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler = | ||
| new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.SET_TTL, req, dataNodeLocationMap); | ||
| CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler); | ||
| Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap(); | ||
| for (TSStatus status : statusMap.values()) { | ||
| // all dataNodes must clear the related schemaengine cache | ||
| return clientHandler; | ||
| } | ||
|
|
||
| private TSetTTLReq buildSetTTLReq( | ||
| final String[] pathPattern, final long ttl, final boolean isDataBase) { | ||
| return new TSetTTLReq( | ||
| Collections.singletonList(String.join(".", pathPattern)), ttl, isDataBase); | ||
| } | ||
|
|
||
| private boolean hasFailedDataNode( | ||
| final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler) { | ||
| if (!clientHandler.getRequestIndices().isEmpty()) { | ||
| return true; | ||
| } | ||
| for (TSStatus status : clientHandler.getResponseMap().values()) { | ||
| if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { | ||
| LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE); | ||
| setFailure( | ||
| new ProcedureException( | ||
| new MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED))); | ||
| return; | ||
| return true; | ||
| } | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| private long getTTLOrDefault(final ConfigNodeProcedureEnv env, final String[] pathPattern) { | ||
| final long ttl = env.getConfigManager().getTTLManager().getTTL(pathPattern); | ||
| return ttl == TTLCache.NULL_TTL ? TTL_NOT_EXIST : ttl; | ||
| } | ||
|
|
||
| private String[] getDatabaseWildcardPathPattern(final String[] pathPattern) { | ||
| final String[] pathNodes = Arrays.copyOf(pathPattern, pathPattern.length + 1); | ||
| pathNodes[pathNodes.length - 1] = IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD; | ||
| return pathNodes; | ||
| } | ||
|
|
||
| private void rollbackConfigNodeTTL(final ConfigNodeProcedureEnv env) throws ProcedureException { | ||
| restoreTTLOnConfigNode(env, plan.getPathPattern(), previousTTL); | ||
| if (plan.isDataBase()) { | ||
| restoreTTLOnConfigNode( | ||
| env, getDatabaseWildcardPathPattern(plan.getPathPattern()), previousDatabaseWildcardTTL); | ||
| } | ||
| } | ||
|
|
||
| private void restoreTTLOnConfigNode( | ||
| final ConfigNodeProcedureEnv env, final String[] pathPattern, final long ttl) | ||
| throws ProcedureException { | ||
| // TTL_NOT_EXIST means the original ttl was absent; NULL_TTL asks the executor to unset it. | ||
| final SetTTLPlan rollbackPlan = | ||
| new SetTTLPlan(pathPattern, ttl == TTL_NOT_EXIST ? TTLCache.NULL_TTL : ttl); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
| // Database rollback restores the database path and db.** separately, so avoid auto-expansion. | ||
| rollbackPlan.setDataBase(false); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The explicit |
||
| final TSStatus status = writeConfigNodePlan(env, rollbackPlan); | ||
| if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { | ||
| throw new ProcedureException( | ||
| new MetadataException( | ||
| "Rollback ConfigNode ttl failed for " | ||
| + String.join(".", pathPattern) | ||
| + ": " | ||
| + status.getMessage())); | ||
| } | ||
| } | ||
|
|
||
| private void rollbackDataNodeTTL(final ConfigNodeProcedureEnv env) throws ProcedureException { | ||
| final Map<Integer, TDataNodeLocation> dataNodeLocationMap = | ||
| env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations(); | ||
| restoreTTLOnDataNodes(dataNodeLocationMap, plan.getPathPattern(), previousTTL); | ||
| if (plan.isDataBase()) { | ||
| restoreTTLOnDataNodes( | ||
| dataNodeLocationMap, | ||
| getDatabaseWildcardPathPattern(plan.getPathPattern()), | ||
| previousDatabaseWildcardTTL); | ||
| } | ||
| } | ||
|
|
||
| private void restoreTTLOnDataNodes( | ||
| final Map<Integer, TDataNodeLocation> dataNodeLocationMap, | ||
| final String[] pathPattern, | ||
| final long ttl) | ||
| throws ProcedureException { | ||
| if (dataNodeLocationMap.isEmpty()) { | ||
| return; | ||
| } | ||
| final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler = | ||
| sendTTLRequest( | ||
| dataNodeLocationMap, | ||
| buildSetTTLReq(pathPattern, ttl == TTL_NOT_EXIST ? TTLCache.NULL_TTL : ttl, false)); | ||
| if (hasFailedDataNode(clientHandler)) { | ||
| throw new ProcedureException( | ||
| new MetadataException( | ||
| "Rollback dataNode ttl cache failed for " + String.join(".", pathPattern))); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Best-effort rollback: restore both sides, throw the earliest failure, and suppress later ones. | ||
| */ | ||
| @Override | ||
| protected void rollbackState( | ||
| ConfigNodeProcedureEnv configNodeProcedureEnv, SetTTLState setTTLState) | ||
| throws IOException, InterruptedException, ProcedureException {} | ||
| protected void rollbackState(final ConfigNodeProcedureEnv env, final SetTTLState setTTLState) | ||
| throws IOException, InterruptedException, ProcedureException { | ||
| if (setTTLState != SetTTLState.UPDATE_DATANODE_CACHE || !previousTTLStateCaptured) { | ||
| return; | ||
| } | ||
| ProcedureException rollbackFailure = null; | ||
| try { | ||
| rollbackConfigNodeTTL(env); | ||
| } catch (ProcedureException e) { | ||
| LOGGER.error("Failed to rollback ConfigNode ttl state.", e); | ||
| rollbackFailure = e; | ||
| } | ||
| try { | ||
| rollbackDataNodeTTL(env); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Two suggestions on the rollback error-aggregation strategy:
|
||
| } catch (ProcedureException e) { | ||
| LOGGER.error("Failed to rollback DataNode ttl cache.", e); | ||
| if (rollbackFailure == null) { | ||
| rollbackFailure = e; | ||
| } else { | ||
| rollbackFailure.addSuppressed(e); | ||
| } | ||
| } | ||
| if (rollbackFailure != null) { | ||
| throw rollbackFailure; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| protected boolean isRollbackSupported(final SetTTLState state) { | ||
| return state == SetTTLState.UPDATE_DATANODE_CACHE; | ||
| } | ||
|
|
||
| @Override | ||
| protected SetTTLState getState(int stateId) { | ||
|
|
@@ -148,7 +298,7 @@ protected int getStateId(SetTTLState setTTLState) { | |
|
|
||
| @Override | ||
| protected SetTTLState getInitialState() { | ||
| return SetTTLState.SET_CONFIGNODE_TTL; | ||
| return SetTTLState.CAPTURE_PREVIOUS_TTL; | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -159,14 +309,25 @@ public void serialize(DataOutputStream stream) throws IOException { | |
| : ProcedureType.SET_TTL_PROCEDURE.getTypeCode()); | ||
| super.serialize(stream); | ||
| ReadWriteIOUtils.write(plan.serializeToByteBuffer(), stream); | ||
| stream.writeBoolean(previousTTLStateCaptured); | ||
| stream.writeLong(previousTTL); | ||
| stream.writeLong(previousDatabaseWildcardTTL); | ||
| } | ||
|
|
||
| @Override | ||
| public void deserialize(ByteBuffer byteBuffer) { | ||
| super.deserialize(byteBuffer); | ||
| try { | ||
| ReadWriteIOUtils.readInt(byteBuffer); | ||
| final int length = ReadWriteIOUtils.readInt(byteBuffer); | ||
| final int position = byteBuffer.position(); | ||
| this.plan = (SetTTLPlan) ConfigPhysicalPlan.Factory.create(byteBuffer); | ||
| // The serialized plan buffer may include padding; skip to the actual payload end. | ||
| byteBuffer.position(position + length); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This Reason: |
||
| if (byteBuffer.remaining() >= ROLLBACK_STATE_BYTES) { | ||
| this.previousTTLStateCaptured = byteBuffer.get() != 0; | ||
| this.previousTTL = byteBuffer.getLong(); | ||
| this.previousDatabaseWildcardTTL = byteBuffer.getLong(); | ||
| } | ||
| } catch (IOException e) { | ||
| LOGGER.error(ProcedureMessages.IO_ERROR_WHEN_DESERIALIZE_SETTTL_PLAN, e); | ||
| } | ||
|
|
@@ -180,12 +341,21 @@ public boolean equals(Object o) { | |
| if (o == null || getClass() != o.getClass()) { | ||
| return false; | ||
| } | ||
| return this.plan.equals(((SetTTLProcedure) o).plan) | ||
| && this.isGeneratedByPipe == (((SetTTLProcedure) o).isGeneratedByPipe); | ||
| final SetTTLProcedure that = (SetTTLProcedure) o; | ||
| return this.isGeneratedByPipe == that.isGeneratedByPipe | ||
| && this.previousTTL == that.previousTTL | ||
| && this.previousDatabaseWildcardTTL == that.previousDatabaseWildcardTTL | ||
| && this.previousTTLStateCaptured == that.previousTTLStateCaptured | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Including |
||
| && this.plan.equals(that.plan); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hash(plan, isGeneratedByPipe); | ||
| return Objects.hash( | ||
| plan, | ||
| isGeneratedByPipe, | ||
| previousTTL, | ||
| previousDatabaseWildcardTTL, | ||
| previousTTLStateCaptured); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth a one-line comment explaining why this sentinel is
Long.MIN_VALUErather thanTTLCache.NULL_TTL(-1). They have different meanings on the rollback path:TTL_NOT_EXISTmeans "no TTL was set before this procedure", whileNULL_TTLis the explicit "unset" marker thatConfigPlanExecutorinterprets to route tounsetTTL. Conflating the two would corrupt rollback behavior.