Skip to content

Commit c318b29

Browse files
committed
Handle Parallel::DeadWorker
1 parent 8e06f27 commit c318b29

1 file changed

Lines changed: 20 additions & 12 deletions

File tree

lib/octocatalog-diff/util/parallel.rb

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -71,16 +71,10 @@ def self.run_tasks(task_array, logger = nil, parallelized = true)
7171
raise ArgumentError, "Element #{x.inspect} must be a OctocatalogDiff::Util::Parallel::Task, not a #{x.class}"
7272
end
7373

74-
# Actually do the processing - choose here between parallel and serial
75-
parallelized ? run_tasks_parallel(task_array, logger) : run_tasks_serial(task_array, logger)
76-
end
74+
# Serial processing
75+
return run_tasks_serial(task_array, logger) unless parallelized
7776

78-
# Use the parallel gem to run each task in the task array. Under the hood this is forking a process for
79-
# each task, and serializing/deserializing the arguments and the outputs.
80-
# @param task_array [Array<OctocatalogDiff::Util::Parallel::Task>] Tasks to perform
81-
# @param logger [Logger] Logger
82-
# @return [Array<OctocatalogDiff::Util::Parallel::Result>] Parallel task results
83-
def self.run_tasks_parallel(task_array, logger)
77+
# Parallel processing.
8478
# Create an empty array of results. The status is nil and the exception is pre-populated. If the code
8579
# runs successfully and doesn't get killed, all of these default values will be overwritten. If the code
8680
# gets killed before the task finishes, this exception will remain.
@@ -89,6 +83,23 @@ def self.run_tasks_parallel(task_array, logger)
8983
end
9084
logger.debug "Initialized parallel task result array: size=#{result.size}"
9185

86+
begin
87+
run_tasks_parallel(result, task_array, logger)
88+
rescue ::Parallel::DeadWorker => exc
89+
# Accept failure of any worker since result array will contain the initialized failure case.
90+
# :nocov:
91+
logger.warn "Rescued #{exc.class}: #{exc.message}"
92+
# :nocov:
93+
end
94+
result
95+
end
96+
97+
# Use the parallel gem to run each task in the task array. Under the hood this is forking a process for
98+
# each task, and serializing/deserializing the arguments and the outputs.
99+
# @param result [Array<OctocatalogDiff::Util::Parallel::Result>] Parallel task results
100+
# @param task_array [Array<OctocatalogDiff::Util::Parallel::Task>] Tasks to perform
101+
# @param logger [Logger] Logger
102+
def self.run_tasks_parallel(result, task_array, logger)
92103
# Do parallel processing
93104
::Parallel.each(task_array,
94105
finish: lambda do |item, i, parallel_result|
@@ -123,9 +134,6 @@ def self.run_tasks_parallel(task_array, logger)
123134
end
124135
# :nocov:
125136
end
126-
127-
# Return result
128-
result
129137
end
130138

131139
# Perform the tasks in serial.

0 commit comments

Comments
 (0)