Skip to content

Commit f7c280c

Browse files
authored
add waitForReady to logging stream and control stream to handle sdk starting before the runner. (#37982)
1 parent 9fe425f commit f7c280c

2 files changed

Lines changed: 18 additions & 20 deletions

File tree

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -144,16 +144,23 @@ public static void main(String[] args) throws Exception {
144144
@VisibleForTesting
145145
public static void main(Function<String, String> environmentVarGetter) throws Exception {
146146
JvmInitializers.runOnStartup();
147-
System.out.format("SDK Fn Harness started%n");
148-
System.out.format("Harness ID %s%n", environmentVarGetter.apply(HARNESS_ID));
149-
System.out.format(
150-
"Logging location %s%n", environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR));
151-
System.out.format(
152-
"Control location %s%n", environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR));
153-
System.out.format(
154-
"Status location %s%n", environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR));
147+
148+
Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor =
149+
getApiServiceDescriptor(environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR));
150+
Endpoints.ApiServiceDescriptor controlApiServiceDescriptor =
151+
getApiServiceDescriptor(environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR));
152+
Endpoints.ApiServiceDescriptor statusApiServiceDescriptor =
153+
environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR) == null
154+
? null
155+
: getApiServiceDescriptor(environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR));
155156
String id = environmentVarGetter.apply(HARNESS_ID);
156157

158+
System.out.format("SDK Fn Harness started%n");
159+
System.out.format("Harness ID %s%n", id);
160+
System.out.format("Logging location %s%n", loggingApiServiceDescriptor);
161+
System.out.format("Control location %s%n", controlApiServiceDescriptor);
162+
System.out.format("Status location %s%n", statusApiServiceDescriptor);
163+
157164
String pipelineOptionsJson = environmentVarGetter.apply(PIPELINE_OPTIONS);
158165
// Try looking for a file first. If that exists it should override PIPELINE_OPTIONS to avoid
159166
// maxing out the kernel's environment space
@@ -179,16 +186,6 @@ public static void main(Function<String, String> environmentVarGetter) throws Ex
179186

180187
PipelineOptions options = PipelineOptionsTranslation.fromJson(pipelineOptionsJson);
181188

182-
Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor =
183-
getApiServiceDescriptor(environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR));
184-
185-
Endpoints.ApiServiceDescriptor controlApiServiceDescriptor =
186-
getApiServiceDescriptor(environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR));
187-
188-
Endpoints.ApiServiceDescriptor statusApiServiceDescriptor =
189-
environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR) == null
190-
? null
191-
: getApiServiceDescriptor(environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR));
192189
String runnerCapabilitesOrNull = environmentVarGetter.apply(RUNNER_CAPABILITIES);
193190
Set<String> runnerCapabilites =
194191
runnerCapabilitesOrNull == null
@@ -415,7 +412,7 @@ private BeamFnApi.ProcessBundleDescriptor loadDescriptor(String id) {
415412
// directExecutor() when building the channel.
416413
BeamFnControlClient control =
417414
new BeamFnControlClient(
418-
controlStub.withExecutor(MoreExecutors.directExecutor()),
415+
controlStub.withExecutor(MoreExecutors.directExecutor()).withWaitForReady(),
419416
outboundObserverFactory,
420417
executorService,
421418
handlers);

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,8 @@ public StreamWriter(ManagedChannel channel) {
214214
this.streamPhaser = new AdvancingPhaser(1);
215215
this.channel = channel;
216216

217-
BeamFnLoggingGrpc.BeamFnLoggingStub stub = BeamFnLoggingGrpc.newStub(channel);
217+
BeamFnLoggingGrpc.BeamFnLoggingStub stub =
218+
BeamFnLoggingGrpc.newStub(channel).withWaitForReady();
218219
this.inboundObserver = new LogControlObserver();
219220
this.outboundObserver =
220221
new DirectStreamObserver<BeamFnApi.LogEntry.List>(

0 commit comments

Comments
 (0)