Skip to content

Fix: Correct portability_worker_id for child process logs in Python SDK harness#38246

Open
AtharvUrunkar wants to merge 2 commits intoapache:masterfrom
AtharvUrunkar:fix-worker-id-logging
Open

Fix: Correct portability_worker_id for child process logs in Python SDK harness#38246
AtharvUrunkar wants to merge 2 commits intoapache:masterfrom
AtharvUrunkar:fix-worker-id-logging

Conversation

@AtharvUrunkar
Copy link
Copy Markdown

Closes #38214

Summary

Fix: Correct portability_worker_id for child process logs in Python SDK harness.

Problem

Child process logs are captured via boot.go using a shared tools.Logger. Since the Fn logging stream is initialized on the first log call, all subsequent logs reuse the same stream, causing incorrect portability_worker_id (e.g., sdk-0-0 instead of sdk-0-0_sibling_X).

Root Cause

A shared logger instance is used across multiple workers. The Fn logging client initializes a single stream with metadata from the first worker, which is then reused for all workers.

Fix

  • Create a separate tools.Logger per worker
  • Initialize BufferedLogger with worker-specific context using grpcx.WriteWorkerID
  • Ensure each worker establishes its own logging stream with correct metadata

Validation

  • Verified successful build targeting Linux (GOOS=linux)
  • Ensured each worker initializes an independent logger with worker-specific context
  • Prevents reuse of a shared Fn logging stream, ensuring correct portability_worker_id attribution for child process logs

Notes

This change is limited to worker logging initialization and does not modify existing logging behavior for Python SDK logs.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses an issue where child process logs in the Python SDK harness were incorrectly attributed due to a shared logger instance. By ensuring each worker initializes its own logging stream with the appropriate metadata, the fix guarantees accurate log identification across multiple workers.

Highlights

  • Worker-specific logging: Implemented a per-worker logger instance to ensure each child process maintains its own Fn logging stream.
  • Contextual metadata: Integrated grpcx.WriteWorkerID to correctly attribute logs to the specific portability_worker_id.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@github-actions
Copy link
Copy Markdown
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

go func(workerId string) {
defer wg.Done()

workerCtx := grpcx.WriteWorkerID(ctx, workerId)
Copy link
Copy Markdown
Contributor

@tvalentyn tvalentyn Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Does this WriteWorkerID call rewrite the metadata or it appends it every time?
  2. This seems to conflict with the ID used in Line 179, I wonder if this would cause unintended side-effects for logs emitted within boot.go later? should we try to have separate contexts?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my experiment I got this working using context.Background(), which creates a new root context, instead of using the existing ctx.

  1. Looking at the code I believe it will append every time
func WriteWorkerID(ctx context.Context, id string) context.Context {
	md := metadata.New(map[string]string{
		idKey: id,
	})
	if old, ok := metadata.FromOutgoingContext(ctx); ok {
		md = metadata.Join(md, old)
	}
	return metadata.NewOutgoingContext(ctx, md)
}
// Join joins any number of mds into a single MD.
//
// The order of values for each key is determined by the order in which the mds
// containing those values are presented to Join.
func Join(mds ...MD) MD {
	out := MD{}
	for _, md := range mds {
		for k, v := range md {
			out[k] = append(out[k], v...)
		}
	}
	return out
}
  1. It seems like WriteWorkerId does not mutate the existing ctx but creates a new one via &valueCtx{parent, key, val}
func NewOutgoingContext(ctx context.Context, md MD) context.Context {
	return context.WithValue(ctx, mdOutgoingKey{}, rawMD{md: md})
}

func WithValue(parent Context, key, val any) Context {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
	if key == nil {
		panic("nil key")
	}
	if !reflectlite.TypeOf(key).Comparable() {
		panic("key is not comparable")
	}
	return &valueCtx{parent, key, val}
}
// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
	Context
	key, val any
}

Having said that, it seems to me creating a new context with context.Background() is the cleanest option.

@tvalentyn
Copy link
Copy Markdown
Contributor

Do you by chance have a screenshot of logs illustrating the effect of this change? thanks!

Endpoint: *loggingEndpoint,
}

bufLogger := tools.NewBufferedLogger(workerLogger)
Copy link
Copy Markdown
Contributor

@cozos cozos Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this won't work because we need to pass workerCtx to BufferedLogger. NewBufferedLogger creates its own periodicFlushContext: context.Background() which ignores workerCtx:

func NewBufferedLogger(logger *Logger) *BufferedLogger {
return &BufferedLogger{logger: logger, lastFlush: time.Now(), flushInterval: defaultFlushInterval, periodicFlushContext: context.Background(), now: time.Now}
}

That's why I think we need to use NewBufferedLoggerWithFlushInterval which accepts a ctx argument:

func NewBufferedLoggerWithFlushInterval(ctx context.Context, logger *Logger, interval time.Duration) *BufferedLogger {
return &BufferedLogger{logger: logger, lastFlush: time.Now(), flushInterval: interval, periodicFlushContext: ctx, now: time.Now}
}

That ctx is then forwarded to the Fn logger here:

b.FlushAtDebug(b.periodicFlushContext)

TL:DR need to plumb workerCtx with something like:

bufLogger := tools.NewBufferedLoggerWithFlushInterval(workerCtx, workerLogger, 100*time.Millisecond)

map[string]string{"WORKER_ID": workerId},
os.Stdin,
bufLogger,
bufLogger,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tvalentyn I just noticed that the process stderr/stdout is output as debug logs via FlushAtDebug in

b.FlushAtDebug(b.periodicFlushContext)

Is it possible to either

  1. Make this default FlushAtInfo
  2. Or let users configure this to the log level they desire?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Python SDK harness logs from child processes missing correct portability_worker_id in Dataflow

3 participants