Skip to content

Commit e886a1a

Browse files
committed
NIFI-15856 - JsonRecordSetWriter silently ignores Timestamp Format
1 parent 638b225 commit e886a1a

File tree

3 files changed

+157
-2
lines changed
  • nifi-extension-bundles

3 files changed

+157
-2
lines changed

nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/WriteJsonResult.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,17 +67,24 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
6767
private final String mimeType;
6868
private final boolean prettyPrint;
6969
private final boolean allowScientificNotation;
70+
private final boolean reuseInputSerialization;
7071

7172
private static final ObjectMapper objectMapper = new ObjectMapper();
7273

7374
public WriteJsonResult(final ComponentLog logger, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccess, final OutputStream out, final boolean prettyPrint,
7475
final NullSuppression nullSuppression, final OutputGrouping outputGrouping, final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException {
75-
this(logger, recordSchema, schemaAccess, out, prettyPrint, nullSuppression, outputGrouping, dateFormat, timeFormat, timestampFormat, "application/json", false);
76+
this(logger, recordSchema, schemaAccess, out, prettyPrint, nullSuppression, outputGrouping, dateFormat, timeFormat, timestampFormat, "application/json", false, true);
7677
}
7778

7879
public WriteJsonResult(final ComponentLog logger, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccess, final OutputStream out, final boolean prettyPrint,
7980
final NullSuppression nullSuppression, final OutputGrouping outputGrouping, final String dateFormat, final String timeFormat, final String timestampFormat,
8081
final String mimeType, final boolean allowScientificNotation) throws IOException {
82+
this(logger, recordSchema, schemaAccess, out, prettyPrint, nullSuppression, outputGrouping, dateFormat, timeFormat, timestampFormat, mimeType, allowScientificNotation, true);
83+
}
84+
85+
public WriteJsonResult(final ComponentLog logger, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccess, final OutputStream out, final boolean prettyPrint,
86+
final NullSuppression nullSuppression, final OutputGrouping outputGrouping, final String dateFormat, final String timeFormat, final String timestampFormat,
87+
final String mimeType, final boolean allowScientificNotation, final boolean reuseInputSerialization) throws IOException {
8188

8289
super(out);
8390
this.logger = logger;
@@ -87,6 +94,7 @@ public WriteJsonResult(final ComponentLog logger, final RecordSchema recordSchem
8794
this.outputGrouping = outputGrouping;
8895
this.mimeType = mimeType;
8996
this.allowScientificNotation = allowScientificNotation;
97+
this.reuseInputSerialization = reuseInputSerialization;
9098

9199
this.dateFormat = dateFormat;
92100
this.timeFormat = timeFormat;
@@ -171,6 +179,10 @@ public WriteResult writeRawRecord(final Record record) throws IOException {
171179
}
172180

173181
private boolean isUseSerializeForm(final Record record, final RecordSchema writeSchema) {
182+
if (!reuseInputSerialization) {
183+
return false;
184+
}
185+
174186
final Optional<SerializedForm> serializedForm = record.getSerializedForm();
175187
if (serializedForm.isEmpty()) {
176188
return false;

nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,13 +123,28 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
123123
.allowableValues("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
124124
.dependsOn(COMPRESSION_FORMAT, COMPRESSION_FORMAT_GZIP)
125125
.build();
126+
public static final PropertyDescriptor REUSE_INPUT_SERIALIZATION = new PropertyDescriptor.Builder()
127+
.name("Reuse Input Serialization")
128+
.description("""
129+
Controls a throughput optimization that only applies to pure JSON pass-through flows. When set to true, and all of the \
130+
following conditions are met, the writer will emit the record's original JSON bytes verbatim instead of re-serializing from typed field \
131+
values: (1) the upstream reader is JsonTreeReader, (2) no data change, (3) the reader's and writer's record schemas are identical, \
132+
(4) the writer is not using compression, and (5) the Pretty Print JSON and Allow Scientific Notation settings are compatible with the \
133+
cached bytes. When the optimization kicks in, the Timestamp Format, Date Format, Time Format, and Suppress Null Values properties \
134+
have no effect on those records. Set this to false to have those properties honored uniformly for every record.""")
135+
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
136+
.allowableValues("true", "false")
137+
.defaultValue("true")
138+
.required(true)
139+
.build();
126140

127141
private volatile boolean prettyPrint;
128142
private volatile boolean allowScientificNotation;
129143
private volatile NullSuppression nullSuppression;
130144
private volatile OutputGrouping outputGrouping;
131145
private volatile String compressionFormat;
132146
private volatile int compressionLevel;
147+
private volatile boolean reuseInputSerialization;
133148

134149
@Override
135150
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -140,6 +155,7 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
140155
properties.add(OUTPUT_GROUPING);
141156
properties.add(COMPRESSION_FORMAT);
142157
properties.add(COMPRESSION_LEVEL);
158+
properties.add(REUSE_INPUT_SERIALIZATION);
143159
return properties;
144160
}
145161

@@ -197,6 +213,7 @@ public void onEnabled(final ConfigurationContext context) {
197213

198214
this.compressionFormat = context.getProperty(COMPRESSION_FORMAT).getValue();
199215
this.compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
216+
this.reuseInputSerialization = context.getProperty(REUSE_INPUT_SERIALIZATION).asBoolean();
200217
}
201218

202219
@Override
@@ -241,7 +258,7 @@ public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchem
241258
}
242259

243260
return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema, variables), compressionOut, prettyPrint, nullSuppression, outputGrouping,
244-
getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), mimeType, allowScientificNotation);
261+
getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), mimeType, allowScientificNotation, reuseInputSerialization);
245262
}
246263

247264
}

nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.util.Map;
5353

5454
import static org.junit.jupiter.api.Assertions.assertEquals;
55+
import static org.junit.jupiter.api.Assertions.assertFalse;
5556
import static org.junit.jupiter.api.Assertions.assertTrue;
5657

5758
class TestWriteJsonResult {
@@ -631,4 +632,129 @@ void testChoiceArrayOfStringsOrArrayOfRecords() throws IOException {
631632
final String output = new String(data, StandardCharsets.UTF_8);
632633
assertEquals(json, output);
633634
}
635+
636+
@Test
637+
void testReuseInputSerializationDefaultTrueUsesFastPath() throws IOException {
638+
final List<RecordField> fields = new ArrayList<>();
639+
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
640+
fields.add(new RecordField("age", RecordFieldType.INT.getDataType()));
641+
final RecordSchema schema = new SimpleRecordSchema(fields);
642+
643+
final Map<String, Object> values = new HashMap<>();
644+
values.put("name", "John Doe");
645+
values.put("age", 42);
646+
647+
final String rawForm = "{\"name\":\"John Doe\",\"age\":42,\"ignoredExtra\":\"preserved\"}";
648+
final SerializedForm serializedForm = SerializedForm.of(rawForm, "application/json");
649+
final Record record = new MapRecord(schema, values, serializedForm);
650+
651+
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
652+
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
653+
NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, RecordFieldType.DATE.getDefaultFormat(),
654+
RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat())) {
655+
writer.write(RecordSet.of(schema, record));
656+
}
657+
658+
final String output = baos.toString(StandardCharsets.UTF_8);
659+
assertTrue(output.contains("\"ignoredExtra\":\"preserved\""),
660+
"Default constructor preserves legacy fast-path behavior: raw bytes should be emitted verbatim");
661+
}
662+
663+
@Test
664+
void testReuseInputSerializationFalseForcesReserialization() throws IOException {
665+
final List<RecordField> fields = new ArrayList<>();
666+
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
667+
fields.add(new RecordField("age", RecordFieldType.INT.getDataType()));
668+
final RecordSchema schema = new SimpleRecordSchema(fields);
669+
670+
final Map<String, Object> values = new HashMap<>();
671+
values.put("name", "John Doe");
672+
values.put("age", 42);
673+
674+
final String rawForm = "{\"name\":\"John Doe\",\"age\":42,\"ignoredExtra\":\"preserved\"}";
675+
final SerializedForm serializedForm = SerializedForm.of(rawForm, "application/json");
676+
final Record record = new MapRecord(schema, values, serializedForm);
677+
678+
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
679+
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
680+
NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, RecordFieldType.DATE.getDefaultFormat(),
681+
RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(),
682+
"application/json", false, false)) {
683+
writer.write(RecordSet.of(schema, record));
684+
}
685+
686+
final String output = baos.toString(StandardCharsets.UTF_8);
687+
assertFalse(output.contains("ignoredExtra"),
688+
"When Reuse Input Serialization is false, the writer must re-serialize from typed values and ignore raw bytes");
689+
assertEquals("[{\"name\":\"John Doe\",\"age\":42}]", output);
690+
}
691+
692+
@Test
693+
void testReuseInputSerializationFalseHonorsTimestampFormat() throws IOException {
694+
final List<RecordField> fields = new ArrayList<>();
695+
fields.add(new RecordField("event", RecordFieldType.TIMESTAMP.getDataType()));
696+
final RecordSchema schema = new SimpleRecordSchema(fields);
697+
698+
final Timestamp eventTimestamp = Timestamp.valueOf("2025-03-20 17:33:11.000");
699+
final Map<String, Object> values = new HashMap<>();
700+
values.put("event", eventTimestamp);
701+
702+
final String rawForm = "{\"event\":\"2025-03-20T17:33:11.000+0000\"}";
703+
final SerializedForm serializedForm = SerializedForm.of(rawForm, "application/json");
704+
final Record record = new MapRecord(schema, values, serializedForm);
705+
706+
final ByteArrayOutputStream fastPathBaos = new ByteArrayOutputStream();
707+
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), fastPathBaos, false,
708+
NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, RecordFieldType.DATE.getDefaultFormat(),
709+
RecordFieldType.TIME.getDefaultFormat(), "yyyy-MM-dd'T'HH:mm:ss.SSSX",
710+
"application/json", false, true)) {
711+
writer.write(RecordSet.of(schema, record));
712+
}
713+
714+
assertTrue(fastPathBaos.toString(StandardCharsets.UTF_8).contains("+0000"),
715+
"With Reuse Input Serialization enabled, raw '+0000' form is passed through even though Timestamp Format would normalize to 'Z'");
716+
717+
final ByteArrayOutputStream slowPathBaos = new ByteArrayOutputStream();
718+
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), slowPathBaos, false,
719+
NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, RecordFieldType.DATE.getDefaultFormat(),
720+
RecordFieldType.TIME.getDefaultFormat(), "yyyy-MM-dd'T'HH:mm:ss.SSSX",
721+
"application/json", false, false)) {
722+
writer.write(RecordSet.of(schema, record));
723+
}
724+
725+
final String slowPathOutput = slowPathBaos.toString(StandardCharsets.UTF_8);
726+
assertFalse(slowPathOutput.contains("+0000"),
727+
"With Reuse Input Serialization disabled, writer's Timestamp Format must be applied even when SerializedForm is present");
728+
assertTrue(slowPathOutput.contains("\"event\":\"2025-03-20T17:33:11.000"),
729+
"Re-serialized timestamp should reflect the configured format");
730+
}
731+
732+
@Test
733+
void testReuseInputSerializationFalseHonorsSuppressNulls() throws IOException {
734+
final List<RecordField> fields = new ArrayList<>();
735+
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
736+
fields.add(new RecordField("middleName", RecordFieldType.STRING.getDataType()));
737+
final RecordSchema schema = new SimpleRecordSchema(fields);
738+
739+
final Map<String, Object> values = new HashMap<>();
740+
values.put("name", "John Doe");
741+
values.put("middleName", null);
742+
743+
final String rawForm = "{\"name\":\"John Doe\",\"middleName\":null}";
744+
final SerializedForm serializedForm = SerializedForm.of(rawForm, "application/json");
745+
final Record record = new MapRecord(schema, values, serializedForm);
746+
747+
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
748+
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
749+
NullSuppression.ALWAYS_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, RecordFieldType.DATE.getDefaultFormat(),
750+
RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(),
751+
"application/json", false, false)) {
752+
writer.write(RecordSet.of(schema, record));
753+
}
754+
755+
final String output = baos.toString(StandardCharsets.UTF_8);
756+
assertFalse(output.contains("middleName"),
757+
"Suppress Null Values must be honored when Reuse Input Serialization is false, even though the input JSON contained the null field");
758+
assertEquals("[{\"name\":\"John Doe\"}]", output);
759+
}
634760
}

0 commit comments

Comments
 (0)