Zstd compression for thrift serialization (storm cluster state)#8653
Zstd compression for thrift serialization (storm cluster state)#8653GGraziadei wants to merge 2 commits into
Conversation
There was a problem hiding this comment.
Drive-by review focused on the new Zstd code paths.
The most important item is the rolling-upgrade compatibility bug: the bridge falls back to raw Thrift instead of Gzip, and defaults.yaml now points at the pure-Zstd delegate, which means existing clusters can't read their own ZK state after upgrade (existing state is Gzip-compressed Thrift). See the inline comments on ZstdBridgeThriftSerializationDelegate.java and conf/defaults.yaml.
The rest is a mix of correctness/cleanup (stale javadoc, wrong magic-number comment, unused import, duplicate <scope>compile</scope>, redundant zstd-jni dependency) and hardening suggestions that the project's security model explicitly welcomes even though they are not vulnerabilities under Storm's threat model (decompression cap, narrower exception handling, level cap, dropping the static-state coupling in ZstdUtils.compress).
| <groupId>com.github.luben</groupId> | ||
| <artifactId>zstd-jni</artifactId> | ||
| <version>${zstd-jni.version}</version> | ||
| <scope>compile</scope> |
There was a problem hiding this comment.
compile is the default Maven scope — this line is redundant and can be dropped.
Also worth checking whether the explicit zstd-jni dependency is needed at all: commons-compress already pulls it transitively, and the new code only uses org.apache.commons.compress.compressors.zstandard.*. If nothing in the codebase imports com.github.luben.zstd.* directly, this whole <dependency> entry plus the zstd-jni.version property at the top of the file can go.
There was a problem hiding this comment.
I have removed the redundant compile tags as suggested.
Regarding the zstd-jni dependency: I have confirmed that it is marked as <optional>true</optional> in the commons-compress 1.28.0 POM. I must keep this explicit dependency to ensure the artifact is included in the classpath.
| <dependency> | ||
| <groupId>com.github.luben</groupId> | ||
| <artifactId>zstd-jni</artifactId> | ||
| <scope>compile</scope> |
There was a problem hiding this comment.
Same as in the root pom — compile is the default scope, drop the <scope> line. And if no module-level code imports com.github.luben.zstd.* directly, the whole <dependency> entry is redundant with the commons-compress one right above it.
| storm.auth.simple-white-list.users: [] | ||
| storm.cluster.state.store: "org.apache.storm.cluster.ZKStateStorageFactory" | ||
| storm.meta.serialization.delegate: "org.apache.storm.serialization.GzipThriftSerializationDelegate" | ||
| storm.meta.serialization.delegate: "org.apache.storm.serialization.ZstdThriftSerializationDelegate" |
There was a problem hiding this comment.
Switching the default to the pure Zstd delegate breaks rolling upgrades: every existing cluster has Gzip-compressed Thrift sitting in ZooKeeper, and ZstdThriftSerializationDelegate cannot read it. The new bridge class is presumably designed for exactly this case — once its fallback is fixed (see comment on that file), please point the default here at ZstdBridgeThriftSerializationDelegate instead. Operators can opt into the pure Zstd delegate once they are sure no legacy Gzip state remains in ZK.
| package org.apache.storm.serialization; | ||
|
|
||
| import java.util.Map; | ||
| import java.util.zip.GZIPInputStream; |
There was a problem hiding this comment.
Unused import. Its presence suggests the original intent was to fall back to Gzip rather than raw Thrift — see comment on the defaultDelegate field below.
There was a problem hiding this comment.
I am sorry, I have done copy/paste from the gzip bridge delegate.
Removed.
| public class ZstdBridgeThriftSerializationDelegate implements SerializationDelegate { | ||
|
|
||
| /** | ||
| * Zstandard magic number 0xFD2FB52. In a byte array (little-endian format): [0x28, 0xB5, 0x2F, 0xFD] |
There was a problem hiding this comment.
The Zstandard magic number is 0xFD2FB528 (32 bits / 8 hex digits), not 0xFD2FB52. The bytes themselves below are correct; just the comment is missing a digit.
|
|
||
| try (ByteArrayInputStream bis = new ByteArrayInputStream(data); | ||
| ZstdCompressorInputStream zstdIn = new ZstdCompressorInputStream(bis); | ||
| ByteArrayOutputStream bos = new ByteArrayOutputStream()) { |
There was a problem hiding this comment.
Hardening suggestion — not a vulnerability under Storm's threat model since ZK sits inside the trusted boundary, but worth doing as defense-in-depth: this IOUtils.copy will happily decompress an unbounded amount of data into memory. Zstd routinely achieves >100× ratios and pathological frames can reach >1,000,000×, so a small crafted frame can OOM the JVM. Suggest a configurable cap (e.g. storm.compression.zstd.max.decompressed.bytes, default in the tens of MiB — well above any realistic metadata size) enforced via BoundedInputStream or a manual copy loop. The same caveat applies to Utils.gunzip in principle, but Zstd raises the ceiling significantly.
| ZstdCompressorInputStream zstdIn = new ZstdCompressorInputStream(bis); | ||
| ByteArrayOutputStream bos = new ByteArrayOutputStream()) { | ||
| IOUtils.copy(zstdIn, bos); | ||
| return bos.toByteArray(); |
There was a problem hiding this comment.
Same as the compress path: narrow to IOException, otherwise InterruptedException is swallowed.
| * @throws RuntimeException wrapping an {@link IOException} if the compression fails. | ||
| */ | ||
| public static byte[] compress(byte[] data) { | ||
| if (data == null || data.length == 0) { |
There was a problem hiding this comment.
Returning the raw input for empty data means the bridge's isZstd() check sees no magic header and routes through the fallback path. Works by accident for empty input today, but it's an asymmetry that will bite later. Either drop the early-return so output always carries a valid Zstd frame, or document the contract explicitly.
There was a problem hiding this comment.
Thank you. I am proposing an alternative to this method (now moved to the utility class).
I found in the documentation a safe method to do the same comparison better: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/invoke/MethodHandles.html#byteArrayViewVarHandle(java.lang.Class,java.nio.ByteOrder)
private static final int ZSTD_MAGIC_INT = 0xFD2FB528;
private static final VarHandle INT_HANDLE = MethodHandles.byteArrayViewVarHandle(int[].class, ByteOrder.LITTLE_ENDIAN);
...
public static boolean isZstd(byte[] bytes) {
if (bytes == null) {
return false;
}
return (int) INT_HANDLE.get(bytes, 0) == ZSTD_MAGIC_INT;
}
|
|
||
| public static class ZstdLevelValidator extends Validator { | ||
| private static final int MIN_LEVEL = 1; | ||
| private static final int MAX_LEVEL = 22; |
There was a problem hiding this comment.
Levels 20–22 are Zstd "ultra" mode and require dramatically more working memory per call — rarely worth it for metadata-sized payloads, and they make the cluster easy to footgun with a single storm.yaml typo. Suggest capping at 19, or gating ultra levels behind an explicit opt-in (e.g. storm.compression.zstd.allow.ultra).
| @@ -0,0 +1,141 @@ | |||
| /** | |||
There was a problem hiding this comment.
A few coverage gaps worth filling while this file is fresh:
- No direct test of
ZstdThriftSerializationDelegate— it is only exercised via the bridge. - No test that Gzip-encoded bytes deserialize correctly through the bridge — which is the whole point of having a bridge, and as currently implemented this scenario is broken (see comment on
defaultDelegateinZstdBridgeThriftSerializationDelegate.java). - No negative test for malformed bytes (random data with no recognizable framing).
- Once a decompression cap is added, please add a bomb-style test (small input, very large declared expansion).
What is the purpose of the change
This PR introduces a new serialization delegate,
ZstdThriftSerializationDelegate, designed to optimize the transfer of metadata between Storm components. By leveraging zstd compression on top of Thrift serialization, we can significantly reduce the network footprint, especially in large-scale topologies.This PR enables zstd compression only for metadata, ref
StormClusterStateImpl.How was the change tested
In the context of #8652