Fix: Correct portability_worker_id for child process logs in Python SDK harness#38246
Fix: Correct portability_worker_id for child process logs in Python SDK harness#38246AtharvUrunkar wants to merge 2 commits intoapache:masterfrom
Conversation
…ython SDK harness
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses an issue where child process logs in the Python SDK harness were incorrectly attributed due to a shared logger instance. By ensuring each worker initializes its own logging stream with the appropriate metadata, the fix guarantees accurate log identification across multiple workers. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
| go func(workerId string) { | ||
| defer wg.Done() | ||
|
|
||
| workerCtx := grpcx.WriteWorkerID(ctx, workerId) |
There was a problem hiding this comment.
- Does this WriteWorkerID call rewrite the metadata or it appends it every time?
- This seems to conflict with the ID used in Line 179, I wonder if this would cause unintended side-effects for logs emitted within boot.go later? should we try to have separate contexts?
There was a problem hiding this comment.
In my experiment I got this working using context.Background(), which creates a new root context, instead of using the existing ctx.
- Looking at the code I believe it will append every time
func WriteWorkerID(ctx context.Context, id string) context.Context {
md := metadata.New(map[string]string{
idKey: id,
})
if old, ok := metadata.FromOutgoingContext(ctx); ok {
md = metadata.Join(md, old)
}
return metadata.NewOutgoingContext(ctx, md)
}// Join joins any number of mds into a single MD.
//
// The order of values for each key is determined by the order in which the mds
// containing those values are presented to Join.
func Join(mds ...MD) MD {
out := MD{}
for _, md := range mds {
for k, v := range md {
out[k] = append(out[k], v...)
}
}
return out
}- It seems like
WriteWorkerIddoes not mutate the existingctxbut creates a new one via&valueCtx{parent, key, val}
func NewOutgoingContext(ctx context.Context, md MD) context.Context {
return context.WithValue(ctx, mdOutgoingKey{}, rawMD{md: md})
}
func WithValue(parent Context, key, val any) Context {
if parent == nil {
panic("cannot create context from nil parent")
}
if key == nil {
panic("nil key")
}
if !reflectlite.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}
// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
Context
key, val any
}Having said that, it seems to me creating a new context with context.Background() is the cleanest option.
|
Do you by chance have a screenshot of logs illustrating the effect of this change? thanks! |
| Endpoint: *loggingEndpoint, | ||
| } | ||
|
|
||
| bufLogger := tools.NewBufferedLogger(workerLogger) |
There was a problem hiding this comment.
I think this won't work because we need to pass workerCtx to BufferedLogger. NewBufferedLogger creates its own periodicFlushContext: context.Background() which ignores workerCtx:
beam/sdks/go/container/tools/buffered_logging.go
Lines 45 to 47 in 25370f5
That's why I think we need to use NewBufferedLoggerWithFlushInterval which accepts a ctx argument:
beam/sdks/go/container/tools/buffered_logging.go
Lines 51 to 53 in 25370f5
That ctx is then forwarded to the Fn logger here:
TL:DR need to plumb workerCtx with something like:
bufLogger := tools.NewBufferedLoggerWithFlushInterval(workerCtx, workerLogger, 100*time.Millisecond)| map[string]string{"WORKER_ID": workerId}, | ||
| os.Stdin, | ||
| bufLogger, | ||
| bufLogger, |
There was a problem hiding this comment.
@tvalentyn I just noticed that the process stderr/stdout is output as debug logs via FlushAtDebug in
Is it possible to either
- Make this default
FlushAtInfo - Or let users configure this to the log level they desire?
Closes #38214
Summary
Fix: Correct
portability_worker_idfor child process logs in Python SDK harness.Problem
Child process logs are captured via
boot.gousing a sharedtools.Logger. Since the Fn logging stream is initialized on the first log call, all subsequent logs reuse the same stream, causing incorrectportability_worker_id(e.g.,sdk-0-0instead ofsdk-0-0_sibling_X).Root Cause
A shared logger instance is used across multiple workers. The Fn logging client initializes a single stream with metadata from the first worker, which is then reused for all workers.
Fix
tools.Loggerper workerBufferedLoggerwith worker-specific context usinggrpcx.WriteWorkerIDValidation
GOOS=linux)portability_worker_idattribution for child process logsNotes
This change is limited to worker logging initialization and does not modify existing logging behavior for Python SDK logs.