Skip to content

Commit c0cf8b2

Browse files
committed
NIFI-15862: Moved Processors to using Virtual Threads with a Semaphore bounding how many tasks can be run at once
1 parent 349344c commit c0cf8b2

File tree

16 files changed

+1692
-880
lines changed

16 files changed

+1692
-880
lines changed

nifi-framework-api/src/main/java/org/apache/nifi/diagnostics/ThreadDumpTask.java

Lines changed: 33 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -16,114 +16,49 @@
1616
*/
1717
package org.apache.nifi.diagnostics;
1818

19-
import java.lang.management.LockInfo;
19+
import com.sun.management.HotSpotDiagnosticMXBean;
20+
21+
import java.io.IOException;
2022
import java.lang.management.ManagementFactory;
21-
import java.lang.management.MonitorInfo;
22-
import java.lang.management.ThreadInfo;
23-
import java.lang.management.ThreadMXBean;
24-
import java.util.ArrayList;
23+
import java.nio.file.Files;
24+
import java.nio.file.Path;
2525
import java.util.Collections;
26-
import java.util.Comparator;
27-
import java.util.List;
28-
import java.util.Objects;
2926

27+
/**
28+
* Captures a textual dump of every thread in the JVM. Uses
29+
* {@link HotSpotDiagnosticMXBean#dumpThreads(String, HotSpotDiagnosticMXBean.ThreadDumpFormat)}
30+
* so that virtual threads are included in the dump alongside platform threads.
31+
*/
3032
public class ThreadDumpTask implements DiagnosticTask {
31-
@Override
32-
public DiagnosticsDumpElement captureDump(boolean verbose) {
33-
final ThreadMXBean mbean = ManagementFactory.getThreadMXBean();
34-
35-
final ThreadInfo[] infos = mbean.dumpAllThreads(true, true);
36-
final long[] deadlockedThreadIds = mbean.findDeadlockedThreads();
37-
final long[] monitorDeadlockThreadIds = mbean.findMonitorDeadlockedThreads();
38-
39-
final List<ThreadInfo> sortedInfos = new ArrayList<>(infos.length);
40-
Collections.addAll(sortedInfos, infos);
41-
sortedInfos.sort(new Comparator<>() {
42-
@Override
43-
public int compare(ThreadInfo o1, ThreadInfo o2) {
44-
return o1.getThreadName().toLowerCase().compareTo(o2.getThreadName().toLowerCase());
45-
}
46-
});
4733

34+
@Override
35+
public DiagnosticsDumpElement captureDump(final boolean verbose) {
4836
final StringBuilder sb = new StringBuilder();
49-
for (final ThreadInfo info : sortedInfos) {
50-
sb.append("\n");
51-
sb.append("\"").append(info.getThreadName()).append("\" Id=");
52-
sb.append(info.getThreadId()).append(" ");
53-
sb.append(info.getThreadState().toString()).append(" ");
54-
55-
switch (info.getThreadState()) {
56-
case BLOCKED:
57-
case TIMED_WAITING:
58-
case WAITING:
59-
sb.append(" on ");
60-
sb.append(info.getLockInfo());
61-
break;
62-
default:
63-
break;
64-
}
65-
66-
if (info.isSuspended()) {
67-
sb.append(" (suspended)");
68-
}
69-
if (info.isInNative()) {
70-
sb.append(" (in native code)");
71-
}
72-
73-
if (deadlockedThreadIds != null) {
74-
for (final long id : deadlockedThreadIds) {
75-
if (id == info.getThreadId()) {
76-
sb.append(" ** DEADLOCKED THREAD **");
77-
}
78-
}
79-
}
8037

81-
if (monitorDeadlockThreadIds != null) {
82-
for (final long id : monitorDeadlockThreadIds) {
83-
if (id == info.getThreadId()) {
84-
sb.append(" ** MONITOR-DEADLOCKED THREAD **");
85-
}
86-
}
38+
Path tempDirectory = null;
39+
try {
40+
final HotSpotDiagnosticMXBean diagnosticMXBean = ManagementFactory.getPlatformMXBean(HotSpotDiagnosticMXBean.class);
41+
// dumpThreads requires that the destination file does not already exist. Creating a private
42+
// temporary directory and writing to a fresh filename inside it avoids a time-of-check to
43+
// time-of-use race that would exist if we created a temp file and then deleted it before the
44+
// JNI call.
45+
tempDirectory = Files.createTempDirectory("nifi-thread-dump-");
46+
final Path tempFile = tempDirectory.resolve("thread-dump.txt");
47+
try {
48+
diagnosticMXBean.dumpThreads(tempFile.toString(), HotSpotDiagnosticMXBean.ThreadDumpFormat.TEXT_PLAIN);
49+
sb.append(Files.readString(tempFile));
50+
} finally {
51+
Files.deleteIfExists(tempFile);
8752
}
88-
89-
final StackTraceElement[] stackTraces = info.getStackTrace();
90-
for (final StackTraceElement element : stackTraces) {
91-
sb.append("\n\tat ").append(element);
92-
93-
final MonitorInfo[] monitors = info.getLockedMonitors();
94-
for (final MonitorInfo monitor : monitors) {
95-
if (Objects.equals(monitor.getLockedStackFrame(), element)) {
96-
sb.append("\n\t- waiting on ").append(monitor);
97-
}
98-
}
99-
}
100-
101-
final LockInfo[] lockInfos = info.getLockedSynchronizers();
102-
if (lockInfos.length > 0) {
103-
sb.append("\n\t");
104-
sb.append("Number of Locked Synchronizers: ").append(lockInfos.length);
105-
for (final LockInfo lockInfo : lockInfos) {
106-
sb.append("\n\t- ").append(lockInfo.toString());
53+
} catch (final IOException e) {
54+
sb.append("Failed to capture thread dump: ").append(e.getMessage());
55+
} finally {
56+
if (tempDirectory != null) {
57+
try {
58+
Files.deleteIfExists(tempDirectory);
59+
} catch (final IOException ignored) {
10760
}
10861
}
109-
110-
sb.append("\n");
111-
}
112-
113-
if (deadlockedThreadIds != null && deadlockedThreadIds.length > 0) {
114-
sb.append("\n\nDEADLOCK DETECTED!");
115-
sb.append("\nThe following thread IDs are deadlocked:");
116-
for (final long id : deadlockedThreadIds) {
117-
sb.append("\n").append(id);
118-
}
119-
}
120-
121-
if (monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length > 0) {
122-
sb.append("\n\nMONITOR DEADLOCK DETECTED!");
123-
sb.append("\nThe following thread IDs are deadlocked:");
124-
for (final long id : monitorDeadlockThreadIds) {
125-
sb.append("\n").append(id);
126-
}
12762
}
12863

12964
return new StandardDiagnosticsDumpElement("Thread Dump", Collections.singletonList(sb.toString()));

nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/scheduling/LifecycleState.java

Lines changed: 10 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,8 @@
2222
import org.slf4j.Logger;
2323
import org.slf4j.LoggerFactory;
2424

25-
import java.util.Collection;
26-
import java.util.Collections;
27-
import java.util.HashSet;
2825
import java.util.Map;
29-
import java.util.Set;
3026
import java.util.WeakHashMap;
31-
import java.util.concurrent.ScheduledFuture;
3227
import java.util.concurrent.atomic.AtomicBoolean;
3328
import java.util.concurrent.atomic.AtomicInteger;
3429

@@ -38,7 +33,6 @@ public class LifecycleState {
3833
private final Object componentId;
3934
private final AtomicInteger activeThreadCount = new AtomicInteger(0);
4035
private final AtomicBoolean scheduled = new AtomicBoolean(false);
41-
private final Set<ScheduledFuture<?>> futures = new HashSet<>();
4236
private final AtomicBoolean mustCallOnStoppedMethods = new AtomicBoolean(false);
4337
private volatile long lastStopTime = -1;
4438
private volatile boolean terminated = false;
@@ -111,7 +105,16 @@ public synchronized void setScheduled(final boolean scheduled) {
111105
mustCallOnStoppedMethods.set(true);
112106

113107
if (!scheduled) {
114-
lastStopTime = System.currentTimeMillis();
108+
// Force the stop time to strictly advance. System.currentTimeMillis() has millisecond resolution, so a
109+
// second stop that happens within the same millisecond as the previous one would otherwise read the same
110+
// value; that would defeat the race check in VirtualThreadSchedulingAgent, which relies on this field
111+
// changing to signal that a prior scheduling generation has ended.
112+
final long previousStopTime = lastStopTime;
113+
long nextStopTime = System.currentTimeMillis();
114+
if (nextStopTime <= previousStopTime) {
115+
nextStopTime = previousStopTime + 1L;
116+
}
117+
lastStopTime = nextStopTime;
115118
}
116119
}
117120

@@ -135,25 +138,6 @@ public boolean mustCallOnStoppedMethods() {
135138
return mustCallOnStoppedMethods.getAndSet(false);
136139
}
137140

138-
/**
139-
* Establishes the list of relevant futures for this processor. Replaces any previously held futures.
140-
*
141-
* @param newFutures futures
142-
*/
143-
public synchronized void setFutures(final Collection<ScheduledFuture<?>> newFutures) {
144-
futures.clear();
145-
futures.addAll(newFutures);
146-
}
147-
148-
public synchronized void replaceFuture(final ScheduledFuture<?> oldFuture, final ScheduledFuture<?> newFuture) {
149-
futures.remove(oldFuture);
150-
futures.add(newFuture);
151-
}
152-
153-
public synchronized Set<ScheduledFuture<?>> getFutures() {
154-
return Collections.unmodifiableSet(futures);
155-
}
156-
157141
public synchronized void terminate() {
158142
this.terminated = true;
159143
activeThreadCount.set(0);

nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -130,12 +130,11 @@
130130
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
131131
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
132132
import org.apache.nifi.controller.repository.io.LimitedInputStream;
133-
import org.apache.nifi.controller.scheduling.CronSchedulingAgent;
134133
import org.apache.nifi.controller.scheduling.LifecycleStateManager;
135134
import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
136135
import org.apache.nifi.controller.scheduling.StandardLifecycleStateManager;
137136
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
138-
import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
137+
import org.apache.nifi.controller.scheduling.VirtualThreadSchedulingAgent;
139138
import org.apache.nifi.controller.serialization.FlowSerializationException;
140139
import org.apache.nifi.controller.serialization.FlowSerializer;
141140
import org.apache.nifi.controller.serialization.FlowSynchronizationException;
@@ -294,6 +293,7 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr
294293

295294
private final AtomicInteger maxTimerDrivenThreads;
296295
private final AtomicReference<FlowEngine> timerDrivenEngineRef;
296+
private final VirtualThreadSchedulingAgent virtualThreadSchedulingAgent;
297297

298298
private final ContentRepository contentRepository;
299299
private final FlowFileRepository flowFileRepository;
@@ -672,10 +672,9 @@ private FlowController(
672672
flowAnalyzer.initialize(controllerServiceProvider);
673673
}
674674

675-
final CronSchedulingAgent cronSchedulingAgent = new CronSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory);
676-
final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory, this.nifiProperties);
677-
processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenAgent);
678-
processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, cronSchedulingAgent);
675+
this.virtualThreadSchedulingAgent = new VirtualThreadSchedulingAgent(this, repositoryContextFactory, this.nifiProperties, maxTimerDrivenThreads.get());
676+
processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, virtualThreadSchedulingAgent);
677+
processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, virtualThreadSchedulingAgent);
679678

680679
startConnectablesAfterInitialization = new HashSet<>();
681680
startRemoteGroupPortsAfterInitialization = new HashSet<>();
@@ -2133,13 +2132,14 @@ public int getMaxTimerDrivenThreadCount() {
21332132
}
21342133

21352134
public int getActiveTimerDrivenThreadCount() {
2136-
return timerDrivenEngineRef.get().getActiveCount();
2135+
return timerDrivenEngineRef.get().getActiveCount() + virtualThreadSchedulingAgent.getActiveThreadCount();
21372136
}
21382137

21392138
public void setMaxTimerDrivenThreadCount(final int maxThreadCount) {
21402139
writeLock.lock();
21412140
try {
21422141
setMaxThreadCount(maxThreadCount, "Timer Driven", this.timerDrivenEngineRef.get(), this.maxTimerDrivenThreads);
2142+
virtualThreadSchedulingAgent.setMaxThreadCount(maxThreadCount);
21432143
} finally {
21442144
writeLock.unlock("setMaxTimerDrivenThreadCount");
21452145
}
@@ -2805,7 +2805,7 @@ public GroupStatusCounts getGroupStatusCounts(final ProcessGroup group) {
28052805
}
28062806

28072807
public int getActiveThreadCount() {
2808-
return timerDrivenEngineRef.get().getActiveCount();
2808+
return getActiveTimerDrivenThreadCount();
28092809
}
28102810

28112811
//

0 commit comments

Comments
 (0)