@@ -1494,14 +1494,13 @@ def wait_interactive(self, jobs=None, interval=1.0, timeout=-1.0):
14941494 # Control methods
14951495 # --------------------------------------------------------------------------
14961496
1497- def clear (self , targets = None , block = None ):
1498- """Clear the namespace in target(s)."""
1499- block = self .block if block is None else block
1500- targets = self ._build_targets (targets )[0 ]
1497+ def _send_control_request (self , targets , msg_type , content , block ):
1498+ """Send a request on the control channel"""
1499+ target_identities = self ._build_targets (targets )[0 ]
15011500 futures = []
1502- for t in targets :
1501+ for ident in target_identities :
15031502 futures .append (
1504- self ._send (self ._control_stream , 'clear_request' , content = {} , ident = t )
1503+ self ._send (self ._control_stream , msg_type , content = content , ident = ident )
15051504 )
15061505 if not block :
15071506 return multi_future (futures )
@@ -1511,6 +1510,34 @@ def clear(self, targets=None, block=None):
15111510 if msg ['content' ]['status' ] != 'ok' :
15121511 raise self ._unwrap_exception (msg ['content' ])
15131512
1513+ def send_signal (self , sig , targets = None , block = None ):
1514+ """Send a signal target(s).
1515+
1516+ Parameters
1517+ ----------
1518+
1519+ sig: int or str
1520+ The signal number or name to send.
1521+ If a str, will evaluate to getattr(signal, sig) on the engine,
1522+ which is useful for sending signals cross-platform.
1523+
1524+ .. versionadded:: 7.0
1525+ """
1526+ block = self .block if block is None else block
1527+ return self ._send_control_request (
1528+ targets = targets ,
1529+ msg_type = 'signal_request' ,
1530+ content = {'sig' : sig },
1531+ block = block ,
1532+ )
1533+
1534+ def clear (self , targets = None , block = None ):
1535+ """Clear the namespace in target(s)."""
1536+ block = self .block if block is None else block
1537+ return self ._send_control_request (
1538+ targets = targets , msg_type = 'clear_request' , content = {}, block = block
1539+ )
1540+
15141541 def abort (self , jobs = None , targets = None , block = None ):
15151542 """Abort specific jobs from the execution queues of target(s).
15161543
@@ -1531,7 +1558,6 @@ def abort(self, jobs=None, targets=None, block=None):
15311558 """
15321559 block = self .block if block is None else block
15331560 jobs = jobs if jobs is not None else list (self .outstanding )
1534- targets = self ._build_targets (targets )[0 ]
15351561
15361562 msg_ids = []
15371563 if isinstance (jobs , string_types + (AsyncResult ,)):
@@ -1549,22 +1575,13 @@ def abort(self, jobs=None, targets=None, block=None):
15491575 else :
15501576 msg_ids .append (j )
15511577 content = dict (msg_ids = msg_ids )
1552- futures = []
1553- for t in targets :
1554- futures .append (
1555- self ._send (
1556- self ._control_stream , 'abort_request' , content = content , ident = t
1557- )
1558- )
15591578
1560- if not block :
1561- return multi_future (futures )
1562- else :
1563- for f in futures :
1564- f .wait ()
1565- msg = f .result ()
1566- if msg ['content' ]['status' ] != 'ok' :
1567- raise self ._unwrap_exception (msg ['content' ])
1579+ return self ._send_control_request (
1580+ targets ,
1581+ msg_type = 'abort_request' ,
1582+ content = content ,
1583+ block = block ,
1584+ )
15681585
15691586 def shutdown (self , targets = 'all' , restart = False , hub = False , block = None ):
15701587 """Terminates one or more engine processes, optionally including the hub.
0 commit comments