Skip to content

Commit 7d7a015

Browse files
committed
Refactor parallel and serial
1 parent 3a3c681 commit 7d7a015

2 files changed

Lines changed: 60 additions & 70 deletions

File tree

lib/octocatalog-diff/util/parallel.rb

Lines changed: 52 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -71,26 +71,17 @@ def self.run_tasks(task_array, logger = nil, parallelized = true)
7171
raise ArgumentError, "Element #{x.inspect} must be a OctocatalogDiff::Util::Parallel::Task, not a #{x.class}"
7272
end
7373

74-
# Serial processing
75-
return run_tasks_serial(task_array, logger) unless parallelized
76-
77-
# Parallel processing.
78-
# Create an empty array of results. The status is nil and the exception is pre-populated. If the code
79-
# runs successfully and doesn't get killed, all of these default values will be overwritten. If the code
80-
# gets killed before the task finishes, this exception will remain.
8174
result = task_array.map do |x|
8275
Result.new(exception: ::Parallel::Kill.new('Killed'), args: x.args)
8376
end
8477
logger.debug "Initialized parallel task result array: size=#{result.size}"
8578

86-
begin
79+
if parallelized
8780
run_tasks_parallel(result, task_array, logger)
88-
rescue ::Parallel::DeadWorker => exc
89-
# Accept failure of any worker since result array will contain the initialized failure case.
90-
# :nocov:
91-
logger.warn "Rescued #{exc.class}: #{exc.message}"
92-
# :nocov:
81+
else
82+
run_tasks_serial(result, task_array, logger)
9383
end
84+
9485
result
9586
end
9687

@@ -100,27 +91,29 @@ def self.run_tasks(task_array, logger = nil, parallelized = true)
10091
# @param task_array [Array<OctocatalogDiff::Util::Parallel::Task>] Tasks to perform
10192
# @param logger [Logger] Logger
10293
def self.run_tasks_parallel(result, task_array, logger)
103-
# Do parallel processing
104-
::Parallel.each(task_array,
105-
isolation: true,
106-
finish: lambda do |item, i, parallel_result|
107-
# Set the result array element to the result
108-
result[i] = parallel_result
109-
110-
# Kill all other parallel tasks if this task failed by throwing an exception
111-
raise ::Parallel::Kill unless parallel_result.exception.nil?
112-
113-
# Run the validator to determine if the result is in fact valid. The validator
114-
# returns true or false. If true, set the 'valid' attribute in the result. If
115-
# false, kill all other parallel tasks.
116-
if item.validate(parallel_result.output, logger)
117-
logger.debug("Success #{item.description}")
118-
else
119-
logger.warn("Failed #{item.description}")
120-
result[i].status = false
121-
raise ::Parallel::Kill
122-
end
123-
end) do |ele|
94+
opts = {
95+
isolation: true,
96+
finish: lambda do |item, i, parallel_result|
97+
# Set the result array element to the result
98+
result[i] = parallel_result
99+
100+
# Kill all other parallel tasks if this task failed by throwing an exception
101+
raise ::Parallel::Kill unless parallel_result.exception.nil?
102+
103+
# Run the validator to determine if the result is in fact valid. The validator
104+
# returns true or false. If true, set the 'valid' attribute in the result. If
105+
# false, kill all other parallel tasks.
106+
if item.validate(parallel_result.output, logger)
107+
logger.debug("Success #{item.description}")
108+
else
109+
logger.warn("Failed #{item.description}")
110+
result[i].status = false
111+
raise ::Parallel::Kill
112+
end
113+
end
114+
}
115+
116+
::Parallel.each(task_array, opts) do |ele|
124117
# simplecov does not detect that this code runs because it's forked, but this is
125118
# tested extensively in the parallel_spec.rb spec file.
126119
# :nocov:
@@ -138,43 +131,40 @@ def self.run_tasks_parallel(result, task_array, logger)
138131
end
139132

140133
# Perform the tasks in serial.
134+
# @param result [Array<OctocatalogDiff::Util::Parallel::Result>] Parallel task results
141135
# @param task_array [Array<OctocatalogDiff::Util::Parallel::Task>] Tasks to perform
142136
# @param logger [Logger] Logger
143-
# @return [Array<OctocatalogDiff::Util::Parallel::Result>] Parallel task results
144-
def self.run_tasks_serial(task_array, logger)
145-
# Create an empty array of results. The status is nil and the exception is pre-populated. If the code
146-
# runs successfully, all of these default values will be overwritten. If a predecessor task fails, all
147-
# later task will have the defined exception.
148-
result = task_array.map do |x|
149-
Result.new(exception: ::RuntimeError.new('Cancellation - A prior task failed'), args: x.args)
150-
end
151-
137+
def self.run_tasks_serial(result, task_array, logger)
152138
# Perform the tasks 1 by 1 - each successful task will replace an element in the 'result' array,
153139
# whereas a failed task will replace the current element with an exception, and all later tasks
154140
# will not be replaced (thereby being populated with the cancellation error).
155-
task_counter = 0
156-
task_array.each do |ele|
157-
begin
158-
logger.debug("Begin #{ele.description}")
159-
output = ele.execute(logger)
160-
result[task_counter] = Result.new(output: output, status: true, args: ele.args)
161-
rescue => exc
162-
logger.debug("Failed #{ele.description}: #{exc.class} #{exc.message}")
163-
result[task_counter] = Result.new(exception: exc, status: false, args: ele.args)
164-
end
141+
task_array.each_with_index do |ele, task_counter|
142+
result[task_counter] = execute_task(ele, logger)
143+
break unless result[task_counter].status
144+
end
145+
end
165146

166-
if ele.validate(output, logger)
167-
logger.debug("Success #{ele.description}")
168-
else
169-
logger.warn("Failed #{ele.description}")
170-
result[task_counter].status = false
171-
end
147+
# Process a single task.
148+
# @param task [OctocatalogDiff::Util::Parallel::Task] Task object
149+
# @param logger [Logger] Logger
150+
# @return [OctocatalogDiff::Util::Parallel::Result] Parallel task result
151+
def self.execute_task(task, logger)
152+
begin
153+
logger.debug("Begin #{task.description}")
154+
output = task.execute(logger)
155+
result = Result.new(output: output, status: true, args: task.args)
156+
rescue => exc
157+
logger.debug("Failed #{task.description}: #{exc.class} #{exc.message}")
158+
result = Result.new(exception: exc, status: false, args: task.args)
159+
end
172160

173-
break unless result[task_counter].status
174-
task_counter += 1
161+
if task.validate(output, logger)
162+
logger.debug("Success #{task.description}")
163+
else
164+
logger.warn("Failed #{task.description}")
165+
result.status = false
175166
end
176167

177-
# Return the result
178168
result
179169
end
180170
end

spec/octocatalog-diff/tests/util/parallel_spec.rb

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,15 +72,15 @@ def two(_arg, _logger = nil)
7272

7373
one_result = result[0]
7474
expect(one_result).to be_a_kind_of(OctocatalogDiff::Util::Parallel::Result)
75-
expect(one_result.status).to eq(true)
76-
expect(one_result.exception).to eq(nil)
77-
expect(one_result.output).to eq('one abc')
75+
expect(one_result.status).to eq(true), result.inspect
76+
expect(one_result.exception).to eq(nil), result.inspect
77+
expect(one_result.output).to eq('one abc'), result.inspect
7878

7979
two_result = result[1]
8080
expect(two_result).to be_a_kind_of(OctocatalogDiff::Util::Parallel::Result)
81-
expect(two_result.status).to eq(false)
82-
expect(two_result.exception).to be_a_kind_of(RuntimeError)
83-
expect(two_result.exception.message).to eq('Two failed')
81+
expect(two_result.status).to eq(false), result.inspect
82+
expect(two_result.exception).to be_a_kind_of(RuntimeError), result.inspect
83+
expect(two_result.exception.message).to eq('Two failed'), result.inspect
8484
end
8585

8686
it 'should kill running tasks when one task fails' do
@@ -345,8 +345,8 @@ def two(arg, _logger = nil)
345345
two_result = result[1]
346346
expect(two_result).to be_a_kind_of(OctocatalogDiff::Util::Parallel::Result)
347347
expect(two_result.status).to eq(nil)
348-
expect(two_result.exception).to be_a_kind_of(::RuntimeError)
349-
expect(two_result.exception.message).to eq('Cancellation - A prior task failed')
348+
expect(two_result.exception).to be_a_kind_of(::Parallel::Kill)
349+
expect(two_result.exception.message).to eq('Killed')
350350
end
351351

352352
it 'should log debug messages' do

0 commit comments

Comments
 (0)