Skip to content

Commit 4613581

Browse files
committed
Use our own parallel implementation
1 parent c0f9c0c commit 4613581

5 files changed

Lines changed: 49 additions & 43 deletions

File tree

lib/octocatalog-diff/util/parallel.rb

Lines changed: 42 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
# frozen_string_literal: true
22

3-
# Helper to use the 'parallel' gem to perform tasks
3+
# Parallelize process executation.
44

5-
require 'parallel'
65
require 'stringio'
6+
require 'yaml'
77

88
module OctocatalogDiff
99
module Util
1010
# This is a utility class to execute tasks in parallel, using the 'parallel' gem.
1111
# If parallel processing has been disabled, this instead executes the tasks serially,
1212
# but provides the same API as the parallel tasks.
1313
class Parallel
14+
# This class is called for a task that didn't complete.
15+
class IncompleteTask < RuntimeError; end
16+
1417
# This class represents a parallel task. It requires a method reference, which will be executed with
1518
# any supplied arguments. It can optionally take a text description and a validator function.
1619
class Task
@@ -60,7 +63,7 @@ def initialize(opts = {})
6063
#
6164
# Note: Parallelization throws intermittent errors under travis CI, so it will be disabled by
6265
# default for integration tests.
63-
def self.run_tasks(task_array, logger = nil, parallelized = !ENV.key?('OCTOCATALOG_DIFF_TRAVIS_CI_DISABLE_PARALLEL'))
66+
def self.run_tasks(task_array, logger = nil, parallelized = true)
6467
# Create a throwaway logger object if one is not given
6568
logger ||= Logger.new(StringIO.new)
6669

@@ -75,7 +78,7 @@ def self.run_tasks(task_array, logger = nil, parallelized = !ENV.key?('OCTOCATAL
7578
end
7679

7780
result = task_array.map do |x|
78-
Result.new(exception: ::Parallel::Kill.new('Killed'), args: x.args)
81+
Result.new(exception: IncompleteTask.new('Killed'), args: x.args)
7982
end
8083
logger.debug "Initialized parallel task result array: size=#{result.size}"
8184

@@ -94,42 +97,45 @@ def self.run_tasks(task_array, logger = nil, parallelized = !ENV.key?('OCTOCATAL
9497
# @param task_array [Array<OctocatalogDiff::Util::Parallel::Task>] Tasks to perform
9598
# @param logger [Logger] Logger
9699
def self.run_tasks_parallel(result, task_array, logger)
97-
opts = {
98-
isolation: true,
99-
finish: lambda do |item, i, parallel_result|
100-
# Set the result array element to the result
101-
result[i] = parallel_result
102-
103-
# Kill all other parallel tasks if this task failed by throwing an exception
104-
raise ::Parallel::Kill unless parallel_result.exception.nil?
105-
106-
# Run the validator to determine if the result is in fact valid. The validator
107-
# returns true or false. If true, set the 'valid' attribute in the result. If
108-
# false, kill all other parallel tasks.
109-
if item.validate(parallel_result.output, logger)
110-
logger.debug("Success #{item.description}")
111-
else
112-
logger.warn("Failed #{item.description}")
113-
result[i].status = false
114-
raise ::Parallel::Kill
115-
end
100+
pidmap = {}
101+
102+
task_array.each_with_index do |task, index|
103+
reader, writer = IO.pipe
104+
this_pid = fork do
105+
reader.close
106+
logger.reopen
107+
task_result = execute_task(task, logger)
108+
writer.write YAML.dump(task_result)
109+
exit 0
116110
end
117-
}
111+
pidmap[this_pid] = { reader: reader, index: index, start_time: Time.now }
112+
writer.close
113+
logger.debug "Launched pid=#{this_pid} for index=#{index}"
114+
logger.reopen
115+
end
116+
117+
while pidmap.any?
118+
this_pid, exit_obj = Process.wait2
119+
next unless pidmap.key?(this_pid)
118120

119-
::Parallel.each(task_array, opts) do |ele|
120-
# simplecov does not detect that this code runs because it's forked, but this is
121-
# tested extensively in the parallel_spec.rb spec file.
122-
# :nocov:
121+
exitstatus = exit_obj.exitstatus
122+
raise "PID=#{this_pid} exited with status #{exitstatus}" unless exitstatus.zero?
123+
logger.debug "PID=#{this_pid} completed task in #{Time.now - pidmap[this_pid][:start_time]} seconds"
124+
125+
index = pidmap[this_pid][:index]
126+
result[index] = YAML.load(pidmap[this_pid][:reader].read)
127+
pidmap[this_pid][:reader].close
128+
pidmap.delete(this_pid)
129+
break unless result[index].status
130+
end
131+
ensure
132+
pidmap.each do |pid, pid_data|
133+
pid_data[:reader].close
123134
begin
124-
logger.debug("Begin #{ele.description}")
125-
output = ele.execute(logger)
126-
logger.debug("Success #{ele.description}")
127-
Result.new(output: output, status: true, args: ele.args)
128-
rescue => exc
129-
logger.debug("Failed #{ele.description}: #{exc.class} #{exc.message}")
130-
Result.new(exception: exc, status: false, args: ele.args)
135+
Process.kill('TERM', pid)
136+
rescue Errno::ESRCH # rubocop:disable Lint/HandleExceptions
137+
# If the process doesn't exist, that's fine.
131138
end
132-
# :nocov:
133139
end
134140
end
135141

octocatalog-diff.gemspec

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ EOF
2626
s.add_runtime_dependency 'diffy', '>= 3.1.0'
2727
s.add_runtime_dependency 'httparty', '>= 0.11.0'
2828
s.add_runtime_dependency 'hashdiff', '>= 0.3.0'
29-
s.add_runtime_dependency 'parallel', '>= 1.11.1'
3029
s.add_runtime_dependency 'rugged', '>= 0.25.0b2'
3130

3231
s.add_development_dependency 'rspec', '~> 3.4.0'

spec/octocatalog-diff/integration/integration_helper.rb

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,6 @@ def self.integration(options = {})
115115
options[:parallel] = false if ENV['COVERAGE']
116116
options[:INTEGRATION] = true
117117

118-
# Disable parallel under travis CI
119-
ENV['OCTOCATALOG_DIFF_TRAVIS_CI_DISABLE_PARALLEL'] = 'true' if ENV['TRAVIS']
120-
121118
# Run octocatalog-diff CLI method. Capture stdout and stderr using 'strio'.
122119
logger, logger_string = OctocatalogDiff::Spec.setup_logger
123120
begin

spec/octocatalog-diff/tests/cli_spec.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@
8080
'--to-catalog', OctocatalogDiff::Spec.fixture_path('catalogs/tiny-catalog-2.json'),
8181
'--fact-file', OctocatalogDiff::Spec.fixture_path('facts/valid-facts.yaml'),
8282
'--cached-master-dir', @cached_master_directory,
83-
'-o', File.join(@cached_master_directory, 'trashfile.txt')
83+
'-o', File.join(@cached_master_directory, 'trashfile.txt'),
84+
'--no-parallel'
8485
]
8586
opts = {
8687
debug: true,

spec/octocatalog-diff/tests/util/parallel_spec.rb

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ def two(_arg, _logger = nil)
5959
break if File.file?(File.join($octocatalog_diff_util_parallel_spec_tempdir, 'one'))
6060
sleep 0.1
6161
end
62+
# Sometimes the system will still handle the second process if it's near-simultaneous
63+
# so sleep for a bit before exiting.
64+
sleep 0.5
6265
raise 'Two failed'
6366
end
6467
end
@@ -106,7 +109,7 @@ def two(_arg, _logger = nil)
106109
one_result = result[0]
107110
expect(one_result).to be_a_kind_of(OctocatalogDiff::Util::Parallel::Result)
108111
expect(one_result.status).to eq(nil)
109-
expect(one_result.exception).to be_a_kind_of(::Parallel::Kill)
112+
expect(one_result.exception).to be_a_kind_of(OctocatalogDiff::Util::Parallel::IncompleteTask)
110113
expect(one_result.exception.message).to eq('Killed')
111114
expect(one_result.output).to eq(nil)
112115

@@ -345,7 +348,7 @@ def two(arg, _logger = nil)
345348
two_result = result[1]
346349
expect(two_result).to be_a_kind_of(OctocatalogDiff::Util::Parallel::Result)
347350
expect(two_result.status).to eq(nil)
348-
expect(two_result.exception).to be_a_kind_of(::Parallel::Kill)
351+
expect(two_result.exception).to be_a_kind_of(OctocatalogDiff::Util::Parallel::IncompleteTask)
349352
expect(two_result.exception.message).to eq('Killed')
350353
end
351354

0 commit comments

Comments
 (0)