Skip to content

Commit 6f08e35

Browse files
committed
Implement non-blocking read and PID checking
1 parent fe7ede1 commit 6f08e35

1 file changed

Lines changed: 33 additions & 8 deletions

File tree

lib/octocatalog-diff/util/parallel.rb

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ module Util
1111
# If parallel processing has been disabled, this instead executes the tasks serially,
1212
# but provides the same API as the parallel tasks.
1313
class Parallel
14+
BLOCK_SIZE = 1024 * 16
15+
1416
# This class is called for a task that didn't complete.
1517
class IncompleteTask < RuntimeError; end
1618

@@ -107,32 +109,55 @@ def self.run_tasks_parallel(result, task_array, logger)
107109
# :nocov:
108110
this_pid = fork do
109111
reader.close
110-
logger.reopen
112+
logger.reopen if logger.respond_to?(:reopen)
111113
task_result = execute_task(task, logger)
112114
writer.write YAML.dump(task_result)
115+
writer.close
116+
logger.close
113117
exit 0
114118
end
115119
# :nocov:
116120

117-
pidmap[this_pid] = { reader: reader, index: index, start_time: Time.now }
121+
pidmap[this_pid] = { reader: reader, index: index, start_time: Time.now, result: [] }
118122
writer.close
119123
logger.debug "Launched pid=#{this_pid} for index=#{index}"
120124
logger.reopen
121125
end
122126

123127
while pidmap.any?
124-
this_pid, exit_obj = Process.wait2
125-
next unless pidmap.key?(this_pid)
128+
# Read from all pipes
129+
pidmap.each do |_this_pid, obj|
130+
begin
131+
buf = obj[:reader].read_nonblock(BLOCK_SIZE, buf)
132+
obj[:result] << buf if buf
133+
rescue IO::EAGAINWaitReadable, EOFError, Errno::EAGAIN # rubocop:disable Lint/ShadowedException
134+
next
135+
end
136+
end
137+
138+
# Any exits?
139+
this_pid, exit_obj = Process.wait2(0, Process::WNOHANG)
140+
next unless this_pid && pidmap.key?(this_pid)
126141

142+
index = pidmap[this_pid][:index]
127143
exitstatus = exit_obj.exitstatus
144+
raise "PID=#{this_pid} exited abnormally: #{exit_obj.inspect}" if exitstatus.nil?
128145
raise "PID=#{this_pid} exited with status #{exitstatus}" unless exitstatus.zero?
129-
logger.debug "PID=#{this_pid} completed task in #{Time.now - pidmap[this_pid][:start_time]} seconds"
130146

131-
index = pidmap[this_pid][:index]
132-
result[index] = YAML.load(pidmap[this_pid][:reader].read)
133-
pidmap[this_pid][:reader].close
147+
begin
148+
buf = pidmap[this_pid][:reader].read_nonblock(BLOCK_SIZE, buf)
149+
pidmap[this_pid][:result] << buf if buf
150+
rescue IO::EAGAINWaitReadable, EOFError, Errno::EAGAIN # rubocop:disable Lint/ShadowedException
151+
pidmap[this_pid][:reader].close
152+
end
153+
154+
input = pidmap[this_pid][:result].join('')
155+
logger.debug "PID=#{this_pid} completed in #{Time.now - pidmap[this_pid][:start_time]} seconds, #{input.length} bytes"
156+
134157
pidmap.delete(this_pid)
135158

159+
result[index] = YAML.load(input)
160+
136161
next if result[index].status
137162
return result[index].exception
138163
end

0 commit comments

Comments
 (0)