Skip to content

Commit 57c681e

Browse files
committed
review
1 parent 8118de1 commit 57c681e

File tree

3 files changed

+34
-17
lines changed
  • nifi-extension-bundles

3 files changed

+34
-17
lines changed

nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/WriteJsonResult.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,25 @@ public WriteResult writeRawRecord(final Record record) throws IOException {
178178
return WriteResult.of(incrementRecordCount(), attributes);
179179
}
180180

181+
/**
182+
* Determines whether the record's original serialized JSON bytes can be emitted verbatim as a throughput optimization,
183+
* bypassing field-by-field re-serialization. All of the following conditions must hold for the fast path to apply:
184+
* <ol>
185+
* <li>The caller enabled the optimization (the {@code reuseInputSerialization} constructor argument is {@code true}).</li>
186+
* <li>The record carries a {@link SerializedForm} produced by the upstream reader. Today this is only set by
187+
* {@code JsonTreeRowRecordReader}; readers such as {@code JsonPathRowRecordReader} that transform the input
188+
* cannot reuse their input bytes and therefore never trigger the fast path.</li>
189+
* <li>The serialized form's MIME type matches the writer's configured MIME type and the reader's record schema is
190+
* equal to the writer's record schema (no projection, no field renames, no type coercion).</li>
191+
* <li>The cached bytes are a {@code String}.</li>
192+
* <li>The cached bytes' pretty-print state matches the writer's {@code prettyPrint} setting.</li>
193+
* <li>If scientific notation is disabled on the writer, the cached bytes do not contain scientific notation.</li>
194+
* </ol>
195+
* When the fast path is taken, the writer emits the cached bytes via {@link JsonGenerator#writeRawValue(String)} and
196+
* therefore does <em>not</em> apply the writer's Timestamp Format, Date Format, Time Format, or Suppress Null Values
197+
* settings to that record. Operators that need those writer-side properties to be honored uniformly must construct
198+
* this writer with {@code reuseInputSerialization = false}.
199+
*/
181200
private boolean isUseSerializeForm(final Record record, final RecordSchema writeSchema) {
182201
if (!reuseInputSerialization) {
183202
return false;

nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -124,14 +124,11 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
124124
.dependsOn(COMPRESSION_FORMAT, COMPRESSION_FORMAT_GZIP)
125125
.build();
126126
public static final PropertyDescriptor REUSE_INPUT_SERIALIZATION = new PropertyDescriptor.Builder()
127-
.name("Reuse Input Serialization")
127+
.name("Use Input Serialization")
128128
.description("""
129-
Controls a throughput optimization that only applies to pure JSON pass-through flows. When set to true, and all of the \
130-
following conditions are met, the writer will emit the record's original JSON bytes verbatim instead of re-serializing from typed field \
131-
values: (1) the upstream reader is JsonTreeReader, (2) no data change, (3) the reader's and writer's record schemas are identical, \
132-
(4) the writer is not using compression, and (5) the Pretty Print JSON and Allow Scientific Notation settings are compatible with the \
133-
cached bytes. When the optimization kicks in, the Timestamp Format, Date Format, Time Format, and Suppress Null Values properties \
134-
have no effect on those records. Set this to false to have those properties honored uniformly for every record.""")
129+
When set to true (default), the writer may emit the upstream reader's original JSON bytes verbatim when it can do so safely, as a \
130+
throughput optimization. In that case, the Timestamp Format, Date Format, Time Format, and Suppress Null Values properties may not be \
131+
applied to those records. Set this to false to force re-serialization so that these properties are honored uniformly for every record.""")
135132
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
136133
.allowableValues("true", "false")
137134
.defaultValue("true")

nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -656,8 +656,7 @@ void testReuseInputSerializationDefaultTrueUsesFastPath() throws IOException {
656656
}
657657

658658
final String output = baos.toString(StandardCharsets.UTF_8);
659-
assertTrue(output.contains("\"ignoredExtra\":\"preserved\""),
660-
"Default constructor preserves legacy fast-path behavior: raw bytes should be emitted verbatim");
659+
assertEquals("[{\"name\":\"John Doe\",\"age\":42,\"ignoredExtra\":\"preserved\"}]", output);
661660
}
662661

663662
@Test
@@ -685,7 +684,7 @@ void testReuseInputSerializationFalseForcesReserialization() throws IOException
685684

686685
final String output = baos.toString(StandardCharsets.UTF_8);
687686
assertFalse(output.contains("ignoredExtra"),
688-
"When Reuse Input Serialization is false, the writer must re-serialize from typed values and ignore raw bytes");
687+
"When Use Input Serialization is false, the writer must re-serialize from typed values and ignore raw bytes");
689688
assertEquals("[{\"name\":\"John Doe\",\"age\":42}]", output);
690689
}
691690

@@ -699,32 +698,34 @@ void testReuseInputSerializationFalseHonorsTimestampFormat() throws IOException
699698
final Map<String, Object> values = new HashMap<>();
700699
values.put("event", eventTimestamp);
701700

702-
final String rawForm = "{\"event\":\"2025-03-20T17:33:11.000+0000\"}";
701+
final String timestampValue = "2025-03-20T17:33:11.000+0000";
702+
final String timestampFormat = "yyyy-MM-dd'T'HH:mm:ss.SSSX";
703+
final String rawForm = "{\"event\":\"%s\"}".formatted(timestampValue);
703704
final SerializedForm serializedForm = SerializedForm.of(rawForm, "application/json");
704705
final Record record = new MapRecord(schema, values, serializedForm);
705706

706707
final ByteArrayOutputStream fastPathBaos = new ByteArrayOutputStream();
707708
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), fastPathBaos, false,
708709
NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, RecordFieldType.DATE.getDefaultFormat(),
709-
RecordFieldType.TIME.getDefaultFormat(), "yyyy-MM-dd'T'HH:mm:ss.SSSX",
710+
RecordFieldType.TIME.getDefaultFormat(), timestampFormat,
710711
"application/json", false, true)) {
711712
writer.write(RecordSet.of(schema, record));
712713
}
713714

714-
assertTrue(fastPathBaos.toString(StandardCharsets.UTF_8).contains("+0000"),
715-
"With Reuse Input Serialization enabled, raw '+0000' form is passed through even though Timestamp Format would normalize to 'Z'");
715+
assertTrue(fastPathBaos.toString(StandardCharsets.UTF_8).contains(timestampValue),
716+
"With Use Input Serialization enabled, raw '+0000' form is passed through even though Timestamp Format would normalize to 'Z'");
716717

717718
final ByteArrayOutputStream slowPathBaos = new ByteArrayOutputStream();
718719
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), slowPathBaos, false,
719720
NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, RecordFieldType.DATE.getDefaultFormat(),
720-
RecordFieldType.TIME.getDefaultFormat(), "yyyy-MM-dd'T'HH:mm:ss.SSSX",
721+
RecordFieldType.TIME.getDefaultFormat(), timestampFormat,
721722
"application/json", false, false)) {
722723
writer.write(RecordSet.of(schema, record));
723724
}
724725

725726
final String slowPathOutput = slowPathBaos.toString(StandardCharsets.UTF_8);
726727
assertFalse(slowPathOutput.contains("+0000"),
727-
"With Reuse Input Serialization disabled, writer's Timestamp Format must be applied even when SerializedForm is present");
728+
"With Use Input Serialization disabled, writer's Timestamp Format must be applied even when SerializedForm is present");
728729
assertTrue(slowPathOutput.contains("\"event\":\"2025-03-20T17:33:11.000"),
729730
"Re-serialized timestamp should reflect the configured format");
730731
}
@@ -754,7 +755,7 @@ void testReuseInputSerializationFalseHonorsSuppressNulls() throws IOException {
754755

755756
final String output = baos.toString(StandardCharsets.UTF_8);
756757
assertFalse(output.contains("middleName"),
757-
"Suppress Null Values must be honored when Reuse Input Serialization is false, even though the input JSON contained the null field");
758+
"Suppress Null Values must be honored when Use Input Serialization is false, even though the input JSON contained the null field");
758759
assertEquals("[{\"name\":\"John Doe\"}]", output);
759760
}
760761
}

0 commit comments

Comments
 (0)