1212"""
1313from __future__ import absolute_import
1414
15- from collections import defaultdict
16- import errno
17- from itertools import chain
1815import logging
19- import select
2016import socket
2117import threading
2218import time
2521
2622import kazoo .python2atexit as python2atexit
2723from kazoo .handlers import utils
24+ from kazoo .handlers .utils import selector_select
2825
2926try :
3027 import Queue
3128except ImportError : # pragma: nocover
3229 import queue as Queue
3330
34-
3531# sentinel objects
3632_STOP = object ()
3733
3834log = logging .getLogger (__name__ )
3935
40- _HAS_EPOLL = hasattr (select , "epoll" )
41-
4236
4337def _to_fileno (obj ):
4438 if isinstance (obj , six .integer_types ):
@@ -65,6 +59,7 @@ class KazooTimeoutError(Exception):
6559
6660class AsyncResult (utils .AsyncResult ):
6761 """A one-time event that stores a value or an exception"""
62+
6863 def __init__ (self , handler ):
6964 super (AsyncResult , self ).__init__ (handler ,
7065 threading .Condition ,
@@ -133,6 +128,7 @@ def _thread_worker(): # pragma: nocover
133128 del func # release before possible idle
134129 except self .queue_empty :
135130 continue
131+
136132 t = self .spawn (_thread_worker )
137133 return t
138134
@@ -173,82 +169,7 @@ def stop(self):
173169 python2atexit .unregister (self .stop )
174170
175171 def select (self , * args , ** kwargs ):
176- # if we have epoll, and select is not expected to work
177- # use an epoll-based "select". Otherwise don't touch
178- # anything to minimize changes
179- if _HAS_EPOLL :
180- # if the highest fd we've seen is > 1023
181- if max (map (_to_fileno , chain .from_iterable (args [:3 ]))) > 1023 :
182- return self ._epoll_select (* args , ** kwargs )
183- return self ._select (* args , ** kwargs )
184-
185- def _select (self , * args , ** kwargs ):
186- timeout = kwargs .pop ('timeout' , None )
187- # either the time to give up, or None
188- end = (time .time () + timeout ) if timeout else None
189- while end is None or time .time () < end :
190- if end is not None :
191- # make a list, since tuples aren't mutable
192- args = list (args )
193-
194- # set the timeout to the remaining time
195- args [3 ] = end - time .time ()
196- try :
197- return select .select (* args , ** kwargs )
198- except select .error as ex :
199- # if the system call was interrupted, we'll retry until timeout
200- # in Python 3, system call interruptions are a native exception
201- # in Python 2, they are not
202- errnum = ex .errno if isinstance (ex , OSError ) else ex [0 ]
203- if errnum == errno .EINTR :
204- continue
205- raise
206- # if we hit our timeout, lets return as a timeout
207- return ([], [], [])
208-
209- def _epoll_select (self , rlist , wlist , xlist , timeout = None ):
210- """epoll-based drop-in replacement for select to overcome select
211- limitation on a maximum filehandle value
212- """
213- if timeout is None :
214- timeout = - 1
215- eventmasks = defaultdict (int )
216- rfd2obj = defaultdict (list )
217- wfd2obj = defaultdict (list )
218- xfd2obj = defaultdict (list )
219- read_evmask = select .EPOLLIN | select .EPOLLPRI # Just in case
220-
221- def store_evmasks (obj_list , evmask , fd2obj ):
222- for obj in obj_list :
223- fileno = _to_fileno (obj )
224- eventmasks [fileno ] |= evmask
225- fd2obj [fileno ].append (obj )
226-
227- store_evmasks (rlist , read_evmask , rfd2obj )
228- store_evmasks (wlist , select .EPOLLOUT , wfd2obj )
229- store_evmasks (xlist , select .EPOLLERR , xfd2obj )
230-
231- poller = select .epoll ()
232-
233- for fileno in eventmasks :
234- poller .register (fileno , eventmasks [fileno ])
235-
236- try :
237- events = poller .poll (timeout )
238- revents = []
239- wevents = []
240- xevents = []
241- for fileno , event in events :
242- if event & read_evmask :
243- revents += rfd2obj .get (fileno , [])
244- if event & select .EPOLLOUT :
245- wevents += wfd2obj .get (fileno , [])
246- if event & select .EPOLLERR :
247- xevents += xfd2obj .get (fileno , [])
248- finally :
249- poller .close ()
250-
251- return revents , wevents , xevents
172+ return selector_select (* args , ** kwargs )
252173
253174 def socket (self ):
254175 return utils .create_tcp_socket (socket )
0 commit comments