Skip to content

Commit 88545cb

Browse files
committed
Use files for IPC, it's much easier
1 parent f21856b commit 88545cb

1 file changed

Lines changed: 13 additions & 41 deletions

File tree

lib/octocatalog-diff/util/parallel.rb

Lines changed: 13 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,13 @@
33
# Parallelize process executation.
44

55
require 'stringio'
6-
require 'yaml'
76

87
module OctocatalogDiff
98
module Util
109
# This is a utility class to execute tasks in parallel, using the 'parallel' gem.
1110
# If parallel processing has been disabled, this instead executes the tasks serially,
1211
# but provides the same API as the parallel tasks.
1312
class Parallel
14-
BLOCK_SIZE = 1024 * 16
15-
1613
# This class is called for a task that didn't complete.
1714
class IncompleteTask < RuntimeError; end
1815

@@ -101,79 +98,54 @@ def self.run_tasks(task_array, logger = nil, parallelized = true, raise_exceptio
10198
# @param logger [Logger] Logger
10299
def self.run_tasks_parallel(result, task_array, logger)
103100
pidmap = {}
101+
ipc_tempdir = Dir.mktmpdir
104102

105103
task_array.each_with_index do |task, index|
106-
reader, writer = IO.pipe
107-
108104
# simplecov doesn't see this because it's forked
109105
# :nocov:
110106
this_pid = fork do
111-
reader.close
112107
logger.reopen if logger.respond_to?(:reopen)
113108
task_result = execute_task(task, logger)
114-
writer.write YAML.dump(task_result)
115-
writer.close
109+
File.open(File.join(ipc_tempdir, "#{Process.pid}.yaml"), 'w') { |f| f.write Marshal.dump(task_result) }
116110
logger.close
117111
exit 0
118112
end
119113
# :nocov:
120114

121-
pidmap[this_pid] = { reader: reader, index: index, start_time: Time.now, result: [] }
122-
writer.close
115+
pidmap[this_pid] = { index: index, start_time: Time.now }
123116
logger.debug "Launched pid=#{this_pid} for index=#{index}"
124-
logger.reopen
117+
logger.reopen if logger.respond_to?(:reopen)
125118
end
126119

127120
while pidmap.any?
128-
# Read from all pipes
129-
pidmap.each do |_this_pid, obj|
130-
begin
131-
buf = obj[:reader].read_nonblock(BLOCK_SIZE, buf)
132-
obj[:result] << buf if buf
133-
rescue IO::EAGAINWaitReadable, EOFError, Errno::EAGAIN # rubocop:disable Lint/ShadowedException
134-
next
135-
end
136-
end
137-
138-
# Any exits?
139-
this_pid, exit_obj = Process.wait2(0, Process::WNOHANG)
140-
unless this_pid && pidmap.key?(this_pid)
141-
sleep 0.1
142-
next
143-
end
144-
121+
# Wait for exits
122+
this_pid, exit_obj = Process.wait2(0)
123+
next unless this_pid && pidmap.key?(this_pid)
145124
index = pidmap[this_pid][:index]
146125
exitstatus = exit_obj.exitstatus
147126
raise "PID=#{this_pid} exited abnormally: #{exit_obj.inspect}" if exitstatus.nil?
148127
raise "PID=#{this_pid} exited with status #{exitstatus}" unless exitstatus.zero?
149128

150-
begin
151-
buf = pidmap[this_pid][:reader].read_nonblock(BLOCK_SIZE, buf)
152-
pidmap[this_pid][:result] << buf if buf
153-
rescue IO::EAGAINWaitReadable, EOFError, Errno::EAGAIN # rubocop:disable Lint/ShadowedException
154-
pidmap[this_pid][:reader].close
155-
end
156-
157-
input = pidmap[this_pid][:result].join('')
158-
logger.debug "PID=#{this_pid} completed in #{Time.now - pidmap[this_pid][:start_time]} seconds, #{input.length} bytes"
159-
129+
input = File.read(File.join(ipc_tempdir, "#{this_pid}.yaml"))
130+
result[index] = Marshal.load(input) # rubocop:disable Security/MarshalLoad
131+
time_delta = Time.now - pidmap[this_pid][:start_time]
160132
pidmap.delete(this_pid)
161133

162-
result[index] = YAML.load(input)
134+
logger.debug "PID=#{this_pid} completed in #{time_delta} seconds, #{input.length} bytes"
163135

164136
next if result[index].status
165137
return result[index].exception
166138
end
167139
nil
168140
ensure
169-
pidmap.each do |pid, pid_data|
170-
pid_data[:reader].close
141+
pidmap.each do |pid, _pid_data|
171142
begin
172143
Process.kill('TERM', pid)
173144
rescue Errno::ESRCH # rubocop:disable Lint/HandleExceptions
174145
# If the process doesn't exist, that's fine.
175146
end
176147
end
148+
FileUtils.remove_entry_secure ipc_tempdir if ipc_tempdir
177149
end
178150

179151
# Perform the tasks in serial.

0 commit comments

Comments
 (0)