11# frozen_string_literal: true
22
3- # Parallelize process executation.
3+ # A class to parallelize process executation.
4+ # This is a utility class to execute tasks in parallel, with our own forking implementation
5+ # that passes through logs and reliably handles errors. If parallel processing has been disabled,
6+ # this instead executes the tasks serially, but provides the same API as the parallel tasks.
47
58require 'stringio'
69
710module OctocatalogDiff
811 module Util
9- # This is a utility class to execute tasks in parallel, using the 'parallel' gem.
10- # If parallel processing has been disabled, this instead executes the tasks serially,
11- # but provides the same API as the parallel tasks.
1212 class Parallel
13- # This class is called for a task that didn't complete.
13+ # This exception is called for a task that didn't complete.
1414 class IncompleteTask < RuntimeError ; end
1515
16+ # --------------------------------------
1617 # This class represents a parallel task. It requires a method reference, which will be executed with
1718 # any supplied arguments. It can optionally take a text description and a validator function.
19+ # --------------------------------------
1820 class Task
1921 attr_reader :description
2022 attr_accessor :args
@@ -37,10 +39,12 @@ def validate(result, logger = Logger.new(StringIO.new))
3739 end
3840 end
3941
42+ # --------------------------------------
4043 # This class represents the result from a parallel task. The status is set to true (success), false (error),
4144 # or nil (task was killed before it could complete). The exception (for failure) and output object (for success)
4245 # are readable attributes. The validity of the results, determined by executing the 'validate' method of the Task,
4346 # is available to be set and fetched.
47+ # --------------------------------------
4448 class Result
4549 attr_reader :output , :args
4650 attr_accessor :status , :exception
@@ -53,65 +57,66 @@ def initialize(opts = {})
5357 end
5458 end
5559
60+ # --------------------------------------
61+ # Static methods in the class
62+ # --------------------------------------
63+
5664 # Entry point for parallel processing. By default this will perform parallel processing,
5765 # but it will also accept an option to do serial processing instead.
5866 # @param task_array [Array<Parallel::Task>] Tasks to run
5967 # @param logger [Logger] Optional logger object
6068 # @param parallelized [Boolean] True for parallel processing, false for serial processing
69+ # @param raise_exception [Boolean] True to raise exception immediately if one occurs; false to return exception in results
6170 # @return [Array<Parallel::Result>] Parallel results (same order as tasks)
62- #
63- # Note: Parallelization throws intermittent errors under travis CI, so it will be disabled by
64- # default for integration tests.
6571 def self . run_tasks ( task_array , logger = nil , parallelized = true , raise_exception = false )
6672 # Create a throwaway logger object if one is not given
6773 logger ||= Logger . new ( StringIO . new )
6874
69- # Validate input - we need an array. If the array is empty then return an empty array right away.
75+ # Validate input - we need an array of OctocatalogDiff::Util::Parallel::Task. If the array is empty then
76+ # return an empty array right away.
7077 raise ArgumentError , "run_tasks() argument must be array, not #{ task_array . class } " unless task_array . is_a? ( Array )
7178 return [ ] if task_array . empty?
7279
73- # Make sure each element in the array is a OctocatalogDiff::Util::Parallel::Task
74- task_array . each do | x |
75- next if x . is_a? ( OctocatalogDiff :: Util :: Parallel :: Task )
76- raise ArgumentError , "Element #{ x . inspect } must be a OctocatalogDiff::Util::Parallel::Task, not a #{ x . class } "
80+ invalid_inputs = task_array . reject { | task | task . is_a? ( OctocatalogDiff ::Util ::Parallel ::Task ) }
81+ if invalid_inputs . any?
82+ ele = invalid_inputs . first
83+ raise ArgumentError , "Element #{ ele . inspect } must be a OctocatalogDiff::Util::Parallel::Task, not a #{ ele . class } "
7784 end
7885
79- result = task_array . map do |x |
80- Result . new ( exception : IncompleteTask . new ( 'Killed' ) , args : x . args )
81- end
86+ # Initialize the result array. For now all entries in the array indicate that the task was killed.
87+ # Actual statuses will replace this initial status. If the initial status wasn't replaced, then indeed,
88+ # the task was killed.
89+ result = task_array . map { |x | Result . new ( exception : IncompleteTask . new ( 'Killed' ) , args : x . args ) }
8290 logger . debug "Initialized parallel task result array: size=#{ result . size } "
8391
84- exception = if parallelized
85- run_tasks_parallel ( result , task_array , logger )
86- else
87- run_tasks_serial ( result , task_array , logger )
88- end
89-
92+ # Execute as per the requested method (serial or parallel) and handle results.
93+ exception = parallelized ? run_tasks_parallel ( result , task_array , logger ) : run_tasks_serial ( result , task_array , logger )
9094 raise exception if exception && raise_exception
9195 result
9296 end
9397
94- # Use the parallel gem to run each task in the task array. Under the hood this is forking a process for
95- # each task, and serializing/deserializing the arguments and the outputs.
98+ # Utility method! Not intended to be called from outside this class.
99+ # ---
100+ # Use a forking strategy to run tasks in parallel. Each task in the array is forked in a child
101+ # process, and when that task completes it writes its result (OctocatalogDiff::Util::Parallel::Result)
102+ # into a serialized data file. Once children are forked this method waits for their return, deserializing
103+ # the output from each data file and updating the `result` array with actual results.
96104 # @param result [Array<OctocatalogDiff::Util::Parallel::Result>] Parallel task results
97105 # @param task_array [Array<OctocatalogDiff::Util::Parallel::Task>] Tasks to perform
98106 # @param logger [Logger] Logger
107+ # @return [Exception] First exception encountered by a child process; returns nil if no exceptions encountered.
99108 def self . run_tasks_parallel ( result , task_array , logger )
100109 pidmap = { }
101110 ipc_tempdir = Dir . mktmpdir
102111
112+ # Child process forking
103113 task_array . each_with_index do |task , index |
104114 # simplecov doesn't see this because it's forked
105- # Kernel.exit! avoids at_exit calls possibly set up by rspec tests
106115 # :nocov:
107116 this_pid = fork do
108- begin
109- task_result = execute_task ( task , logger )
110- File . open ( File . join ( ipc_tempdir , "#{ Process . pid } .yaml" ) , 'w' ) { |f | f . write Marshal . dump ( task_result ) }
111- Kernel . exit! 0
112- rescue
113- Kernel . exit! 255
114- end
117+ task_result = execute_task ( task , logger )
118+ File . open ( File . join ( ipc_tempdir , "#{ Process . pid } .dat" ) , 'w' ) { |f | f . write Marshal . dump ( task_result ) }
119+ Kernel . exit! 0 # Kernel.exit! avoids at_exit from parents being triggered by children exiting
115120 end
116121 # :nocov:
117122
@@ -120,16 +125,16 @@ def self.run_tasks_parallel(result, task_array, logger)
120125 logger . reopen if logger . respond_to? ( :reopen )
121126 end
122127
128+ # Waiting for children and handling results
123129 while pidmap . any?
124- # Wait for exits
125130 this_pid , exit_obj = Process . wait2 ( 0 )
126131 next unless this_pid && pidmap . key? ( this_pid )
127132 index = pidmap [ this_pid ] [ :index ]
128133 exitstatus = exit_obj . exitstatus
129134 raise "PID=#{ this_pid } exited abnormally: #{ exit_obj . inspect } " if exitstatus . nil?
130135 raise "PID=#{ this_pid } exited with status #{ exitstatus } " unless exitstatus . zero?
131136
132- input = File . read ( File . join ( ipc_tempdir , "#{ this_pid } .yaml " ) )
137+ input = File . read ( File . join ( ipc_tempdir , "#{ this_pid } .dat " ) )
133138 result [ index ] = Marshal . load ( input ) # rubocop:disable Security/MarshalLoad
134139 time_delta = Time . now - pidmap [ this_pid ] [ :start_time ]
135140 pidmap . delete ( this_pid )
@@ -139,7 +144,11 @@ def self.run_tasks_parallel(result, task_array, logger)
139144 next if result [ index ] . status
140145 return result [ index ] . exception
141146 end
142- nil
147+
148+ logger . debug 'All child processes completed with no exceptions raised'
149+
150+ # Cleanup: Kill any child processes that are still running, and clean the temporary directory
151+ # where data files were stored.
143152 ensure
144153 pidmap . each do |pid , _pid_data |
145154 begin
@@ -161,6 +170,8 @@ def self.run_tasks_parallel(result, task_array, logger)
161170 end
162171 end
163172
173+ # Utility method! Not intended to be called from outside this class.
174+ # ---
164175 # Perform the tasks in serial.
165176 # @param result [Array<OctocatalogDiff::Util::Parallel::Result>] Parallel task results
166177 # @param task_array [Array<OctocatalogDiff::Util::Parallel::Task>] Tasks to perform
@@ -177,7 +188,11 @@ def self.run_tasks_serial(result, task_array, logger)
177188 nil
178189 end
179190
180- # Process a single task.
191+ # Utility method! Not intended to be called from outside this class.
192+ # ---
193+ # Process a single task. Called by run_tasks_parallel / run_tasks_serial.
194+ # This method will report all exceptions in the OctocatalogDiff::Util::Parallel::Result object
195+ # itself, and not raise them.
181196 # @param task [OctocatalogDiff::Util::Parallel::Task] Task object
182197 # @param logger [Logger] Logger
183198 # @return [OctocatalogDiff::Util::Parallel::Result] Parallel task result
@@ -188,13 +203,16 @@ def self.execute_task(task, logger)
188203 result = Result . new ( output : output , status : true , args : task . args )
189204 rescue => exc
190205 logger . debug ( "Failed #{ task . description } : #{ exc . class } #{ exc . message } " )
206+ # Immediately return without running the validation, since this already failed.
191207 return Result . new ( exception : exc , status : false , args : task . args )
192208 end
193209
194210 begin
195211 if task . validate ( output , logger )
196212 logger . debug ( "Success #{ task . description } " )
197213 else
214+ # Preferably the validator method raised its own exception. However if it
215+ # simply returned false, raise our own exception here.
198216 raise "Failed #{ task . description } validation (unspecified error)"
199217 end
200218 rescue => exc
0 commit comments