-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Fix: Correct portability_worker_id for child process logs in Python SDK harness #38246
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 2 commits
3245ea8
c3e8a91
fca903f
edda6bb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -314,23 +314,47 @@ func launchSDKProcess() error { | |||||||||||||||
|
|
||||||||||||||||
| var wg sync.WaitGroup | ||||||||||||||||
| wg.Add(len(workerIds)) | ||||||||||||||||
| for _, workerId := range workerIds { | ||||||||||||||||
| go func(workerId string) { | ||||||||||||||||
| defer wg.Done() | ||||||||||||||||
|
|
||||||||||||||||
| bufLogger := tools.NewBufferedLogger(logger) | ||||||||||||||||
| errorCount := 0 | ||||||||||||||||
| for { | ||||||||||||||||
| childPids.mu.Lock() | ||||||||||||||||
| if childPids.canceled { | ||||||||||||||||
| childPids.mu.Unlock() | ||||||||||||||||
| return | ||||||||||||||||
| } | ||||||||||||||||
| logger.Printf(ctx, "Executing Python (worker %v): python %v", workerId, strings.Join(args, " ")) | ||||||||||||||||
| cmd := StartCommandEnv(map[string]string{"WORKER_ID": workerId}, os.Stdin, bufLogger, bufLogger, "python", args...) | ||||||||||||||||
| childPids.v = append(childPids.v, cmd.Process.Pid) | ||||||||||||||||
| childPids.mu.Unlock() | ||||||||||||||||
|
|
||||||||||||||||
| for _, workerId := range workerIds { | ||||||||||||||||
| go func(workerId string) { | ||||||||||||||||
| defer wg.Done() | ||||||||||||||||
|
|
||||||||||||||||
| workerCtx := grpcx.WriteWorkerID(ctx, workerId) | ||||||||||||||||
|
|
||||||||||||||||
| // Create a separate logger per worker so that each worker initializes | ||||||||||||||||
| // its own Fn logging stream with the correct worker_id metadata. | ||||||||||||||||
| // Shared loggers would reuse the first stream and cause incorrect | ||||||||||||||||
| // portability_worker_id attribution across workers. | ||||||||||||||||
| workerLogger := &tools.Logger{ | ||||||||||||||||
| Endpoint: *loggingEndpoint, | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| bufLogger := tools.NewBufferedLogger(workerLogger) | ||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this won't work because we need to pass beam/sdks/go/container/tools/buffered_logging.go Lines 45 to 47 in 25370f5
That's why I think we need to use beam/sdks/go/container/tools/buffered_logging.go Lines 51 to 53 in 25370f5
That
TL:DR need to plumb bufLogger := tools.NewBufferedLoggerWithFlushInterval(workerCtx, workerLogger, 100*time.Millisecond)
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good observation — thanks for pointing this out. Currently, stdout/stderr logs are emitted via I agree that making this configurable (rather than changing the default) would likely be a safer approach, since altering the default log level could have broader impact on existing users. For this PR, I’d prefer to keep the current behavior unchanged and focus on ensuring correct worker context propagation. But I think exposing configurability for log level could be a valuable follow-up improvement.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you meant to reply to other comment. For this particular thread I just want to clarify that passing |
||||||||||||||||
|
|
||||||||||||||||
| errorCount := 0 | ||||||||||||||||
| for { | ||||||||||||||||
| childPids.mu.Lock() | ||||||||||||||||
| if childPids.canceled { | ||||||||||||||||
| childPids.mu.Unlock() | ||||||||||||||||
| return | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| workerLogger.Printf(workerCtx, | ||||||||||||||||
| "Executing Python (worker %v): python %v", | ||||||||||||||||
| workerId, | ||||||||||||||||
| strings.Join(args, " "), | ||||||||||||||||
| ) | ||||||||||||||||
|
|
||||||||||||||||
| cmd := StartCommandEnv( | ||||||||||||||||
| map[string]string{"WORKER_ID": workerId}, | ||||||||||||||||
| os.Stdin, | ||||||||||||||||
| bufLogger, | ||||||||||||||||
| bufLogger, | ||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @tvalentyn I just noticed that the process stderr/stdout is output as debug logs via
Is it possible to either
|
||||||||||||||||
| "python", | ||||||||||||||||
| args..., | ||||||||||||||||
| ) | ||||||||||||||||
|
|
||||||||||||||||
| childPids.v = append(childPids.v, cmd.Process.Pid) | ||||||||||||||||
| childPids.mu.Unlock() | ||||||||||||||||
| if err := cmd.Wait(); err != nil { | ||||||||||||||||
| // Retry on fatal errors, like OOMs and segfaults, not just | ||||||||||||||||
| // DoFns throwing exceptions. | ||||||||||||||||
|
|
||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my experiment I got this working using
context.Background(), which creates a new root context, instead of using the existingctx.WriteWorkerIddoes not mutate the existingctxbut creates a new one via&valueCtx{parent, key, val}Having said that, it seems to me creating a new context with
context.Background()is the cleanest option.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch — thanks for digging into this.
You're right that
WriteWorkerIDdoes not overwrite existing metadata but instead appends to it viametadata.Join. That means reusing the parentctxcould result in multiple worker IDs being accumulated, which is definitely not what we want.Using
context.Background()to create a clean root context per worker makes sense and avoids unintended metadata propagation from the parent context.I’ll update the implementation to use a fresh context for each worker to ensure isolation and avoid side effects in logging.