Skip to content

Zstd compression for thrift serialization (storm cluster state)#8653

Open
GGraziadei wants to merge 2 commits into
apache:masterfrom
GGraziadei:8652-zstd-compression-cluster-state
Open

Zstd compression for thrift serialization (storm cluster state)#8653
GGraziadei wants to merge 2 commits into
apache:masterfrom
GGraziadei:8652-zstd-compression-cluster-state

Conversation

@GGraziadei
Copy link
Copy Markdown

What is the purpose of the change

This PR introduces a new serialization delegate, ZstdThriftSerializationDelegate, designed to optimize the transfer of metadata between Storm components. By leveraging zstd compression on top of Thrift serialization, we can significantly reduce the network footprint, especially in large-scale topologies.
This PR enables zstd compression only for metadata, ref StormClusterStateImpl.

How was the change tested

  • Unit test (added a new ad-hoc test class)
  • Integration test
  • Smoke test, performed on a Storm cluster.

In the context of #8652

Copy link
Copy Markdown
Contributor

@rzo1 rzo1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drive-by review focused on the new Zstd code paths.

The most important item is the rolling-upgrade compatibility bug: the bridge falls back to raw Thrift instead of Gzip, and defaults.yaml now points at the pure-Zstd delegate, which means existing clusters can't read their own ZK state after upgrade (existing state is Gzip-compressed Thrift). See the inline comments on ZstdBridgeThriftSerializationDelegate.java and conf/defaults.yaml.

The rest is a mix of correctness/cleanup (stale javadoc, wrong magic-number comment, unused import, duplicate <scope>compile</scope>, redundant zstd-jni dependency) and hardening suggestions that the project's security model explicitly welcomes even though they are not vulnerabilities under Storm's threat model (decompression cap, narrower exception handling, level cap, dropping the static-state coupling in ZstdUtils.compress).

Comment thread pom.xml
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>${zstd-jni.version}</version>
<scope>compile</scope>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compile is the default Maven scope — this line is redundant and can be dropped.

Also worth checking whether the explicit zstd-jni dependency is needed at all: commons-compress already pulls it transitively, and the new code only uses org.apache.commons.compress.compressors.zstandard.*. If nothing in the codebase imports com.github.luben.zstd.* directly, this whole <dependency> entry plus the zstd-jni.version property at the top of the file can go.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed the redundant compile tags as suggested.

Regarding the zstd-jni dependency: I have confirmed that it is marked as <optional>true</optional> in the commons-compress 1.28.0 POM. I must keep this explicit dependency to ensure the artifact is included in the classpath.

Comment thread storm-client/pom.xml
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<scope>compile</scope>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as in the root pom — compile is the default scope, drop the <scope> line. And if no module-level code imports com.github.luben.zstd.* directly, the whole <dependency> entry is redundant with the commons-compress one right above it.

Comment thread conf/defaults.yaml
storm.auth.simple-white-list.users: []
storm.cluster.state.store: "org.apache.storm.cluster.ZKStateStorageFactory"
storm.meta.serialization.delegate: "org.apache.storm.serialization.GzipThriftSerializationDelegate"
storm.meta.serialization.delegate: "org.apache.storm.serialization.ZstdThriftSerializationDelegate"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switching the default to the pure Zstd delegate breaks rolling upgrades: every existing cluster has Gzip-compressed Thrift sitting in ZooKeeper, and ZstdThriftSerializationDelegate cannot read it. The new bridge class is presumably designed for exactly this case — once its fallback is fixed (see comment on that file), please point the default here at ZstdBridgeThriftSerializationDelegate instead. Operators can opt into the pure Zstd delegate once they are sure no legacy Gzip state remains in ZK.

package org.apache.storm.serialization;

import java.util.Map;
import java.util.zip.GZIPInputStream;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused import. Its presence suggests the original intent was to fall back to Gzip rather than raw Thrift — see comment on the defaultDelegate field below.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry, I have done copy/paste from the gzip bridge delegate.
Removed.

public class ZstdBridgeThriftSerializationDelegate implements SerializationDelegate {

/**
* Zstandard magic number 0xFD2FB52. In a byte array (little-endian format): [0x28, 0xB5, 0x2F, 0xFD]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Zstandard magic number is 0xFD2FB528 (32 bits / 8 hex digits), not 0xFD2FB52. The bytes themselves below are correct; just the comment is missing a digit.


try (ByteArrayInputStream bis = new ByteArrayInputStream(data);
ZstdCompressorInputStream zstdIn = new ZstdCompressorInputStream(bis);
ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardening suggestion — not a vulnerability under Storm's threat model since ZK sits inside the trusted boundary, but worth doing as defense-in-depth: this IOUtils.copy will happily decompress an unbounded amount of data into memory. Zstd routinely achieves >100× ratios and pathological frames can reach >1,000,000×, so a small crafted frame can OOM the JVM. Suggest a configurable cap (e.g. storm.compression.zstd.max.decompressed.bytes, default in the tens of MiB — well above any realistic metadata size) enforced via BoundedInputStream or a manual copy loop. The same caveat applies to Utils.gunzip in principle, but Zstd raises the ceiling significantly.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. I am fixing.

ZstdCompressorInputStream zstdIn = new ZstdCompressorInputStream(bis);
ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
IOUtils.copy(zstdIn, bos);
return bos.toByteArray();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the compress path: narrow to IOException, otherwise InterruptedException is swallowed.

* @throws RuntimeException wrapping an {@link IOException} if the compression fails.
*/
public static byte[] compress(byte[] data) {
if (data == null || data.length == 0) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning the raw input for empty data means the bridge's isZstd() check sees no magic header and routes through the fallback path. Works by accident for empty input today, but it's an asymmetry that will bite later. Either drop the early-return so output always carries a valid Zstd frame, or document the contract explicitly.

Copy link
Copy Markdown
Author

@GGraziadei GGraziadei May 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. I am proposing an alternative to this method (now moved to the utility class).

I found in the documentation a safe method to do the same comparison better: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/invoke/MethodHandles.html#byteArrayViewVarHandle(java.lang.Class,java.nio.ByteOrder)

private static final int ZSTD_MAGIC_INT = 0xFD2FB528;

private static final VarHandle INT_HANDLE = MethodHandles.byteArrayViewVarHandle(int[].class, ByteOrder.LITTLE_ENDIAN);
...
public static boolean isZstd(byte[] bytes) {
    if (bytes == null) {
        return false;
    }
    return (int) INT_HANDLE.get(bytes, 0) == ZSTD_MAGIC_INT;
}


public static class ZstdLevelValidator extends Validator {
private static final int MIN_LEVEL = 1;
private static final int MAX_LEVEL = 22;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Levels 20–22 are Zstd "ultra" mode and require dramatically more working memory per call — rarely worth it for metadata-sized payloads, and they make the cluster easy to footgun with a single storm.yaml typo. Suggest capping at 19, or gating ultra levels behind an explicit opt-in (e.g. storm.compression.zstd.allow.ultra).

@@ -0,0 +1,141 @@
/**
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few coverage gaps worth filling while this file is fresh:

  1. No direct test of ZstdThriftSerializationDelegate — it is only exercised via the bridge.
  2. No test that Gzip-encoded bytes deserialize correctly through the bridge — which is the whole point of having a bridge, and as currently implemented this scenario is broken (see comment on defaultDelegate in ZstdBridgeThriftSerializationDelegate.java).
  3. No negative test for malformed bytes (random data with no recognizable framing).
  4. Once a decompression cap is added, please add a bomb-style test (small input, very large declared expansion).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants