diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d734eb3..38a85e0e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,14 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +⚠️ **Breaking API change:** `rivermigrate.Migrator.Validate` and `rivermigrate.Migrator.ValidateTx` now take a `*rivermigrate.ValidateOpts` parameter. Pass `nil` to preserve previous behavior. We normally endeavor not to make any breaking API changes, but this one will keep the API in a much nicer state, and is on an ancillary function that most installations won't be using. [PR #1259](https://github.com/riverqueue/river/pull/1259) + ### Added - Added `MetadataSet` to stage job metadata updates from worker middleware, `HookWorkBegin`, workers, or `HookWorkEnd`, with changes persisted when the job is completed. [PR #1269](https://github.com/riverqueue/river/pull/1269) -### Fixed - -⚠️ **Breaking API change:** `rivermigrate.Migrator.Validate` and `rivermigrate.Migrator.ValidateTx` now take a `*rivermigrate.ValidateOpts` parameter. Pass `nil` to preserve previous behavior. We normally endeavor not to make any breaking API changes, but this one will keep the API in a much nicer state, and is on an ancillary function that most installations won't be using. [PR #1259](https://github.com/riverqueue/river/pull/1259) - ### Changed - Added `rivermigrate.ValidateOpts.TargetVersion` so validation can check migrations up to a specific target version, matching the target-version behavior available on `Migrate` and `MigrateTx`. Notably, this is a breaking API change as the validate functions previously didn't take any options. [PR #1259](https://github.com/riverqueue/river/pull/1259) diff --git a/internal/jobcompleter/job_completer.go b/internal/jobcompleter/job_completer.go index 37a4d399..823394c4 100644 --- a/internal/jobcompleter/job_completer.go +++ b/internal/jobcompleter/job_completer.go @@ -10,6 +10,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/riverqueue/river/internal/jobstats" + "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" "github.com/riverqueue/river/rivershared/riverpilot" @@ -578,11 +579,9 @@ func withRetries[T any](logCtx context.Context, baseService *baseservice.BaseSer ) for attempt := 1; attempt <= numRetries; attempt++ { - const timeout = 10 * time.Second - // I've found that we want at least ten seconds for a large batch, // although it usually doesn't need that long. - ctx, cancel := context.WithTimeout(uncancelledCtx, timeout) + ctx, cancel := context.WithTimeout(uncancelledCtx, rivercommon.HotOperationTimeout) defer cancel() retVal, err := retryFunc(ctx) @@ -603,7 +602,7 @@ func withRetries[T any](logCtx context.Context, baseService *baseservice.BaseSer slog.Int("attempt", attempt), slog.String("err", err.Error()), slog.String("sleep_duration", sleepDuration.String()), - slog.String("timeout", timeout.String()), + slog.String("timeout", rivercommon.HotOperationTimeout.String()), ) if !disableSleep { serviceutil.CancellableSleep(logCtx, sleepDuration) diff --git a/internal/rivercommon/river_common.go b/internal/rivercommon/river_common.go index 986583ed..4f769b39 100644 --- a/internal/rivercommon/river_common.go +++ b/internal/rivercommon/river_common.go @@ -3,6 +3,7 @@ package rivercommon import ( "errors" "regexp" + "time" ) // These constants are made available in rivercommon so that they're accessible @@ -17,6 +18,16 @@ const ( QueueDefault = "default" ) +// HotOperationTimeout attempts to standardize timeouts for some "hot" +// operations like locking available jobs or completing finished jobs. It's +// somewhat questionable whether it makes sense to share timing on these +// queries, but for the time being it makes more sense than each part of the +// code randomly choosing its own timing. +// +// We probably want to have another look at this in the not-too-distant future +// to make sure we can't do anything a bit smarter when it comes to timeouts. +const HotOperationTimeout = 10 * time.Second + const ( // MetadataKeyPeriodicJobID is a metadata key inserted with a periodic job // when a configured periodic job has its ID property set. This lets diff --git a/rivershared/riverpilot/standard_pilot.go b/rivershared/riverpilot/standard_pilot.go index 45e2ff2b..fb6e7d80 100644 --- a/rivershared/riverpilot/standard_pilot.go +++ b/rivershared/riverpilot/standard_pilot.go @@ -3,15 +3,13 @@ package riverpilot import ( "context" "sync/atomic" - "time" + "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" "github.com/riverqueue/river/rivertype" ) -const standardPilotJobGetAvailableTimeoutDefault = 10 * time.Second - type StandardPilot struct { seq atomic.Int64 } @@ -23,7 +21,7 @@ func (p *StandardPilot) JobGetAvailable(ctx context.Context, exec riverdriver.Ex return nil, nil } - ctx, cancel := context.WithTimeoutCause(ctx, standardPilotJobGetAvailableTimeoutDefault, context.DeadlineExceeded) + ctx, cancel := context.WithTimeout(ctx, rivercommon.HotOperationTimeout) defer cancel() return exec.JobGetAvailable(ctx, params)