11# frozen_string_literal: true
22
3- # Helper to use the 'parallel' gem to perform tasks
3+ # A class to parallelize process executation.
4+ # This is a utility class to execute tasks in parallel, with our own forking implementation
5+ # that passes through logs and reliably handles errors. If parallel processing has been disabled,
6+ # this instead executes the tasks serially, but provides the same API as the parallel tasks.
47
5- require 'parallel'
68require 'stringio'
79
810module OctocatalogDiff
911 module Util
10- # This is a utility class to execute tasks in parallel, using the 'parallel' gem.
11- # If parallel processing has been disabled, this instead executes the tasks serially,
12- # but provides the same API as the parallel tasks.
1312 class Parallel
13+ # This exception is called for a task that didn't complete.
14+ class IncompleteTask < RuntimeError ; end
15+
16+ # --------------------------------------
1417 # This class represents a parallel task. It requires a method reference, which will be executed with
1518 # any supplied arguments. It can optionally take a text description and a validator function.
19+ # --------------------------------------
1620 class Task
1721 attr_reader :description
1822 attr_accessor :args
@@ -35,10 +39,12 @@ def validate(result, logger = Logger.new(StringIO.new))
3539 end
3640 end
3741
42+ # --------------------------------------
3843 # This class represents the result from a parallel task. The status is set to true (success), false (error),
3944 # or nil (task was killed before it could complete). The exception (for failure) and output object (for success)
4045 # are readable attributes. The validity of the results, determined by executing the 'validate' method of the Task,
4146 # is available to be set and fetched.
47+ # --------------------------------------
4248 class Result
4349 attr_reader :output , :args
4450 attr_accessor :status , :exception
@@ -51,121 +57,170 @@ def initialize(opts = {})
5157 end
5258 end
5359
60+ # --------------------------------------
61+ # Static methods in the class
62+ # --------------------------------------
63+
5464 # Entry point for parallel processing. By default this will perform parallel processing,
5565 # but it will also accept an option to do serial processing instead.
5666 # @param task_array [Array<Parallel::Task>] Tasks to run
5767 # @param logger [Logger] Optional logger object
5868 # @param parallelized [Boolean] True for parallel processing, false for serial processing
69+ # @param raise_exception [Boolean] True to raise exception immediately if one occurs; false to return exception in results
5970 # @return [Array<Parallel::Result>] Parallel results (same order as tasks)
60- def self . run_tasks ( task_array , logger = nil , parallelized = true )
71+ def self . run_tasks ( task_array , logger = nil , parallelized = true , raise_exception = false )
6172 # Create a throwaway logger object if one is not given
6273 logger ||= Logger . new ( StringIO . new )
6374
64- # Validate input - we need an array. If the array is empty then return an empty array right away.
75+ # Validate input - we need an array of OctocatalogDiff::Util::Parallel::Task. If the array is empty then
76+ # return an empty array right away.
6577 raise ArgumentError , "run_tasks() argument must be array, not #{ task_array . class } " unless task_array . is_a? ( Array )
6678 return [ ] if task_array . empty?
6779
68- # Make sure each element in the array is a OctocatalogDiff::Util::Parallel::Task
69- task_array . each do | x |
70- next if x . is_a? ( OctocatalogDiff :: Util :: Parallel :: Task )
71- raise ArgumentError , "Element #{ x . inspect } must be a OctocatalogDiff::Util::Parallel::Task, not a #{ x . class } "
80+ invalid_inputs = task_array . reject { | task | task . is_a? ( OctocatalogDiff ::Util ::Parallel ::Task ) }
81+ if invalid_inputs . any?
82+ ele = invalid_inputs . first
83+ raise ArgumentError , "Element #{ ele . inspect } must be a OctocatalogDiff::Util::Parallel::Task, not a #{ ele . class } "
7284 end
7385
74- # Actually do the processing - choose here between parallel and serial
75- parallelized ? run_tasks_parallel ( task_array , logger ) : run_tasks_serial ( task_array , logger )
86+ # Initialize the result array. For now all entries in the array indicate that the task was killed.
87+ # Actual statuses will replace this initial status. If the initial status wasn't replaced, then indeed,
88+ # the task was killed.
89+ result = task_array . map { |x | Result . new ( exception : IncompleteTask . new ( 'Killed' ) , args : x . args ) }
90+ logger . debug "Initialized parallel task result array: size=#{ result . size } "
91+
92+ # Execute as per the requested method (serial or parallel) and handle results.
93+ exception = parallelized ? run_tasks_parallel ( result , task_array , logger ) : run_tasks_serial ( result , task_array , logger )
94+ raise exception if exception && raise_exception
95+ result
7696 end
7797
78- # Use the parallel gem to run each task in the task array. Under the hood this is forking a process for
79- # each task, and serializing/deserializing the arguments and the outputs.
98+ # Utility method! Not intended to be called from outside this class.
99+ # ---
100+ # Use a forking strategy to run tasks in parallel. Each task in the array is forked in a child
101+ # process, and when that task completes it writes its result (OctocatalogDiff::Util::Parallel::Result)
102+ # into a serialized data file. Once children are forked this method waits for their return, deserializing
103+ # the output from each data file and updating the `result` array with actual results.
104+ # @param result [Array<OctocatalogDiff::Util::Parallel::Result>] Parallel task results
80105 # @param task_array [Array<OctocatalogDiff::Util::Parallel::Task>] Tasks to perform
81106 # @param logger [Logger] Logger
82- # @return [Array<OctocatalogDiff::Util::Parallel::Result>] Parallel task results
83- def self . run_tasks_parallel ( task_array , logger )
84- # Create an empty array of results. The status is nil and the exception is pre-populated. If the code
85- # runs successfully and doesn't get killed, all of these default values will be overwritten. If the code
86- # gets killed before the task finishes, this exception will remain.
87- result = task_array . map do |x |
88- Result . new ( exception : ::Parallel ::Kill . new ( 'Killed' ) , args : x . args )
107+ # @return [Exception] First exception encountered by a child process; returns nil if no exceptions encountered.
108+ def self . run_tasks_parallel ( result , task_array , logger )
109+ pidmap = { }
110+ ipc_tempdir = Dir . mktmpdir
111+
112+ # Child process forking
113+ task_array . each_with_index do |task , index |
114+ # simplecov doesn't see this because it's forked
115+ # :nocov:
116+ this_pid = fork do
117+ task_result = execute_task ( task , logger )
118+ File . open ( File . join ( ipc_tempdir , "#{ Process . pid } .dat" ) , 'w' ) { |f | f . write Marshal . dump ( task_result ) }
119+ Kernel . exit! 0 # Kernel.exit! avoids at_exit from parents being triggered by children exiting
120+ end
121+ # :nocov:
122+
123+ pidmap [ this_pid ] = { index : index , start_time : Time . now }
124+ logger . debug "Launched pid=#{ this_pid } for index=#{ index } "
125+ logger . reopen if logger . respond_to? ( :reopen )
89126 end
90- logger . debug "Initialized parallel task result array: size=#{ result . size } "
91127
92- # Do parallel processing
93- ::Parallel . each ( task_array ,
94- finish : lambda do |item , i , parallel_result |
95- # Set the result array element to the result
96- result [ i ] = parallel_result
97-
98- # Kill all other parallel tasks if this task failed by throwing an exception
99- raise ::Parallel ::Kill unless parallel_result . exception . nil?
100-
101- # Run the validator to determine if the result is in fact valid. The validator
102- # returns true or false. If true, set the 'valid' attribute in the result. If
103- # false, kill all other parallel tasks.
104- if item . validate ( parallel_result . output , logger )
105- logger . debug ( "Success #{ item . description } " )
106- else
107- logger . warn ( "Failed #{ item . description } " )
108- result [ i ] . status = false
109- raise ::Parallel ::Kill
110- end
111- end ) do |ele |
112- # simplecov does not detect that this code runs because it's forked, but this is
113- # tested extensively in the parallel_spec.rb spec file.
114- # :nocov:
128+ # Waiting for children and handling results
129+ while pidmap . any?
130+ this_pid , exit_obj = Process . wait2 ( 0 )
131+ next unless this_pid && pidmap . key? ( this_pid )
132+ index = pidmap [ this_pid ] [ :index ]
133+ exitstatus = exit_obj . exitstatus
134+ raise "PID=#{ this_pid } exited abnormally: #{ exit_obj . inspect } " if exitstatus . nil?
135+ raise "PID=#{ this_pid } exited with status #{ exitstatus } " unless exitstatus . zero?
136+
137+ input = File . read ( File . join ( ipc_tempdir , "#{ this_pid } .dat" ) )
138+ result [ index ] = Marshal . load ( input ) # rubocop:disable Security/MarshalLoad
139+ time_delta = Time . now - pidmap [ this_pid ] [ :start_time ]
140+ pidmap . delete ( this_pid )
141+
142+ logger . debug "PID=#{ this_pid } completed in #{ time_delta } seconds, #{ input . length } bytes"
143+
144+ next if result [ index ] . status
145+ return result [ index ] . exception
146+ end
147+
148+ logger . debug 'All child processes completed with no exceptions raised'
149+
150+ # Cleanup: Kill any child processes that are still running, and clean the temporary directory
151+ # where data files were stored.
152+ ensure
153+ pidmap . each do |pid , _pid_data |
115154 begin
116- logger . debug ( "Begin #{ ele . description } " )
117- output = ele . execute ( logger )
118- logger . debug ( "Success #{ ele . description } " )
119- Result . new ( output : output , status : true , args : ele . args )
120- rescue => exc
121- logger . debug ( "Failed #{ ele . description } : #{ exc . class } #{ exc . message } " )
122- Result . new ( exception : exc , status : false , args : ele . args )
155+ Process . kill ( 'TERM' , pid )
156+ rescue Errno ::ESRCH # rubocop:disable Lint/HandleExceptions
157+ # If the process doesn't exist, that's fine.
123158 end
124- # :nocov:
125159 end
126160
127- # Return result
128- result
161+ retries = 0
162+ while File . directory? ( ipc_tempdir ) && retries < 10
163+ retries += 1
164+ begin
165+ FileUtils . remove_entry_secure ipc_tempdir
166+ rescue Errno ::ENOTEMPTY , Errno ::ENOENT # rubocop:disable Lint/HandleExceptions
167+ # Errno::ENOTEMPTY will trigger a retry because the directory exists
168+ # Errno::ENOENT will break the loop because the directory won't exist next time it's checked
169+ end
170+ end
129171 end
130172
173+ # Utility method! Not intended to be called from outside this class.
174+ # ---
131175 # Perform the tasks in serial.
176+ # @param result [Array<OctocatalogDiff::Util::Parallel::Result>] Parallel task results
132177 # @param task_array [Array<OctocatalogDiff::Util::Parallel::Task>] Tasks to perform
133178 # @param logger [Logger] Logger
134- # @return [Array<OctocatalogDiff::Util::Parallel::Result>] Parallel task results
135- def self . run_tasks_serial ( task_array , logger )
136- # Create an empty array of results. The status is nil and the exception is pre-populated. If the code
137- # runs successfully, all of these default values will be overwritten. If a predecessor task fails, all
138- # later task will have the defined exception.
139- result = task_array . map do |x |
140- Result . new ( exception : ::RuntimeError . new ( 'Cancellation - A prior task failed' ) , args : x . args )
141- end
142-
179+ def self . run_tasks_serial ( result , task_array , logger )
143180 # Perform the tasks 1 by 1 - each successful task will replace an element in the 'result' array,
144181 # whereas a failed task will replace the current element with an exception, and all later tasks
145182 # will not be replaced (thereby being populated with the cancellation error).
146- task_counter = 0
147- task_array . each do |ele |
148- begin
149- logger . debug ( "Begin #{ ele . description } " )
150- output = ele . execute ( logger )
151- result [ task_counter ] = Result . new ( output : output , status : true , args : ele . args )
152- rescue => exc
153- logger . debug ( "Failed #{ ele . description } : #{ exc . class } #{ exc . message } " )
154- result [ task_counter ] = Result . new ( exception : exc , status : false , args : ele . args )
155- end
183+ task_array . each_with_index do |ele , task_counter |
184+ result [ task_counter ] = execute_task ( ele , logger )
185+ next if result [ task_counter ] . status
186+ return result [ task_counter ] . exception
187+ end
188+ nil
189+ end
156190
157- if ele . validate ( output , logger )
158- logger . debug ( "Success #{ ele . description } " )
191+ # Utility method! Not intended to be called from outside this class.
192+ # ---
193+ # Process a single task. Called by run_tasks_parallel / run_tasks_serial.
194+ # This method will report all exceptions in the OctocatalogDiff::Util::Parallel::Result object
195+ # itself, and not raise them.
196+ # @param task [OctocatalogDiff::Util::Parallel::Task] Task object
197+ # @param logger [Logger] Logger
198+ # @return [OctocatalogDiff::Util::Parallel::Result] Parallel task result
199+ def self . execute_task ( task , logger )
200+ begin
201+ logger . debug ( "Begin #{ task . description } " )
202+ output = task . execute ( logger )
203+ result = Result . new ( output : output , status : true , args : task . args )
204+ rescue => exc
205+ logger . debug ( "Failed #{ task . description } : #{ exc . class } #{ exc . message } " )
206+ # Immediately return without running the validation, since this already failed.
207+ return Result . new ( exception : exc , status : false , args : task . args )
208+ end
209+
210+ begin
211+ if task . validate ( output , logger )
212+ logger . debug ( "Success #{ task . description } " )
159213 else
160- logger . warn ( "Failed #{ ele . description } " )
161- result [ task_counter ] . status = false
214+ # Preferably the validator method raised its own exception. However if it
215+ # simply returned false, raise our own exception here.
216+ raise "Failed #{ task . description } validation (unspecified error)"
162217 end
163-
164- break unless result [ task_counter ] . status
165- task_counter += 1
218+ rescue => exc
219+ logger . warn ( "Failed #{ task . description } validation: #{ exc . class } #{ exc . message } " )
220+ result . status = false
221+ result . exception = exc
166222 end
167223
168- # Return the result
169224 result
170225 end
171226 end
0 commit comments