Skip to content
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 41 additions & 17 deletions sdks/python/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,23 +314,47 @@ func launchSDKProcess() error {

var wg sync.WaitGroup
wg.Add(len(workerIds))
for _, workerId := range workerIds {
go func(workerId string) {
defer wg.Done()

bufLogger := tools.NewBufferedLogger(logger)
errorCount := 0
for {
childPids.mu.Lock()
if childPids.canceled {
childPids.mu.Unlock()
return
}
logger.Printf(ctx, "Executing Python (worker %v): python %v", workerId, strings.Join(args, " "))
cmd := StartCommandEnv(map[string]string{"WORKER_ID": workerId}, os.Stdin, bufLogger, bufLogger, "python", args...)
childPids.v = append(childPids.v, cmd.Process.Pid)
childPids.mu.Unlock()

for _, workerId := range workerIds {
go func(workerId string) {
defer wg.Done()

workerCtx := grpcx.WriteWorkerID(ctx, workerId)
Copy link
Copy Markdown
Contributor

@tvalentyn tvalentyn Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Does this WriteWorkerID call rewrite the metadata or it appends it every time?
  2. This seems to conflict with the ID used in Line 179, I wonder if this would cause unintended side-effects for logs emitted within boot.go later? should we try to have separate contexts?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my experiment I got this working using context.Background(), which creates a new root context, instead of using the existing ctx.

  1. Looking at the code I believe it will append every time
func WriteWorkerID(ctx context.Context, id string) context.Context {
	md := metadata.New(map[string]string{
		idKey: id,
	})
	if old, ok := metadata.FromOutgoingContext(ctx); ok {
		md = metadata.Join(md, old)
	}
	return metadata.NewOutgoingContext(ctx, md)
}
// Join joins any number of mds into a single MD.
//
// The order of values for each key is determined by the order in which the mds
// containing those values are presented to Join.
func Join(mds ...MD) MD {
	out := MD{}
	for _, md := range mds {
		for k, v := range md {
			out[k] = append(out[k], v...)
		}
	}
	return out
}
  1. It seems like WriteWorkerId does not mutate the existing ctx but creates a new one via &valueCtx{parent, key, val}
func NewOutgoingContext(ctx context.Context, md MD) context.Context {
	return context.WithValue(ctx, mdOutgoingKey{}, rawMD{md: md})
}

func WithValue(parent Context, key, val any) Context {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
	if key == nil {
		panic("nil key")
	}
	if !reflectlite.TypeOf(key).Comparable() {
		panic("key is not comparable")
	}
	return &valueCtx{parent, key, val}
}
// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
	Context
	key, val any
}

Having said that, it seems to me creating a new context with context.Background() is the cleanest option.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — thanks for digging into this.

You're right that WriteWorkerID does not overwrite existing metadata but instead appends to it via metadata.Join. That means reusing the parent ctx could result in multiple worker IDs being accumulated, which is definitely not what we want.

Using context.Background() to create a clean root context per worker makes sense and avoids unintended metadata propagation from the parent context.

I’ll update the implementation to use a fresh context for each worker to ensure isolation and avoid side effects in logging.


// Create a separate logger per worker so that each worker initializes
// its own Fn logging stream with the correct worker_id metadata.
// Shared loggers would reuse the first stream and cause incorrect
// portability_worker_id attribution across workers.
workerLogger := &tools.Logger{
Endpoint: *loggingEndpoint,
}

bufLogger := tools.NewBufferedLogger(workerLogger)
Copy link
Copy Markdown
Contributor

@cozos cozos Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this won't work because we need to pass workerCtx to BufferedLogger. NewBufferedLogger creates its own periodicFlushContext: context.Background() which ignores workerCtx:

func NewBufferedLogger(logger *Logger) *BufferedLogger {
return &BufferedLogger{logger: logger, lastFlush: time.Now(), flushInterval: defaultFlushInterval, periodicFlushContext: context.Background(), now: time.Now}
}

That's why I think we need to use NewBufferedLoggerWithFlushInterval which accepts a ctx argument:

func NewBufferedLoggerWithFlushInterval(ctx context.Context, logger *Logger, interval time.Duration) *BufferedLogger {
return &BufferedLogger{logger: logger, lastFlush: time.Now(), flushInterval: interval, periodicFlushContext: ctx, now: time.Now}
}

That ctx is then forwarded to the Fn logger here:

b.FlushAtDebug(b.periodicFlushContext)

TL:DR need to plumb workerCtx with something like:

bufLogger := tools.NewBufferedLoggerWithFlushInterval(workerCtx, workerLogger, 100*time.Millisecond)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good observation — thanks for pointing this out.

Currently, stdout/stderr logs are emitted via FlushAtDebug, which can make them harder to discover unless debug logging is enabled.

I agree that making this configurable (rather than changing the default) would likely be a safer approach, since altering the default log level could have broader impact on existing users.

For this PR, I’d prefer to keep the current behavior unchanged and focus on ensuring correct worker context propagation. But I think exposing configurability for log level could be a valuable follow-up improvement.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you meant to reply to other comment. For this particular thread I just want to clarify that passing workerCtx to BufferedLogger is critical for making sure the logs work properly.


errorCount := 0
for {
childPids.mu.Lock()
if childPids.canceled {
childPids.mu.Unlock()
return
}

workerLogger.Printf(workerCtx,
"Executing Python (worker %v): python %v",
workerId,
strings.Join(args, " "),
)

cmd := StartCommandEnv(
map[string]string{"WORKER_ID": workerId},
os.Stdin,
bufLogger,
bufLogger,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tvalentyn I just noticed that the process stderr/stdout is output as debug logs via FlushAtDebug in

b.FlushAtDebug(b.periodicFlushContext)

Is it possible to either

  1. Make this default FlushAtInfo
  2. Or let users configure this to the log level they desire?

"python",
args...,
)

childPids.v = append(childPids.v, cmd.Process.Pid)
childPids.mu.Unlock()
if err := cmd.Wait(); err != nil {
// Retry on fatal errors, like OOMs and segfaults, not just
// DoFns throwing exceptions.
Expand Down
Loading