Skip to content

Commit 1960796

Browse files
authored
Merge pull request #448 from packysauce/epoll
Use epoll when available to support fds > 1023
2 parents 95b2185 + 267e61b commit 1960796

2 files changed

Lines changed: 105 additions & 2 deletions

File tree

kazoo/handlers/threading.py

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818
import socket
1919
import threading
2020
import time
21+
import six
22+
23+
from collections import defaultdict
24+
from itertools import chain
2125

2226
import kazoo.python2atexit as python2atexit
2327

@@ -33,6 +37,27 @@
3337

3438
log = logging.getLogger(__name__)
3539

40+
_HAS_EPOLL = hasattr(select, "epoll")
41+
42+
43+
def _to_fileno(obj):
44+
if isinstance(obj, six.integer_types):
45+
fd = int(obj)
46+
elif hasattr(obj, "fileno"):
47+
fd = obj.fileno()
48+
if not isinstance(fd, six.integer_types):
49+
raise TypeError("fileno() returned a non-integer")
50+
fd = int(fd)
51+
else:
52+
raise TypeError("argument must be an int, or have a fileno() method.")
53+
54+
if fd < 0:
55+
raise ValueError(
56+
"file descriptor cannot be a negative integer (%d)" % (fd,)
57+
)
58+
59+
return fd
60+
3661

3762
class KazooTimeoutError(Exception):
3863
pass
@@ -143,8 +168,17 @@ def stop(self):
143168
python2atexit.unregister(self.stop)
144169

145170
def select(self, *args, **kwargs):
146-
# select() takes no kwargs, so it will be in args
147-
timeout = args[3] if len(args) == 4 else None
171+
# if we have epoll, and select is not expected to work
172+
# use an epoll-based "select". Otherwise don't touch
173+
# anything to minimize changes
174+
if _HAS_EPOLL:
175+
# if the highest fd we've seen is > 1023
176+
if max(map(_to_fileno, chain(*args[:3]))) > 1023:
177+
return self._epoll_select(*args, **kwargs)
178+
return self._select(*args, **kwargs)
179+
180+
def _select(self, *args, **kwargs):
181+
timeout = kwargs.pop('timeout', None)
148182
# either the time to give up, or None
149183
end = (time.time() + timeout) if timeout else None
150184
while end is None or time.time() < end:
@@ -167,6 +201,50 @@ def select(self, *args, **kwargs):
167201
# if we hit our timeout, lets return as a timeout
168202
return ([], [], [])
169203

204+
def _epoll_select(self, rlist, wlist, xlist, timeout=None):
205+
"""epoll-based drop-in replacement for select to overcome select
206+
limitation on a maximum filehandle value
207+
"""
208+
if timeout is None:
209+
timeout = -1
210+
eventmasks = defaultdict(int)
211+
rfd2obj = defaultdict(list)
212+
wfd2obj = defaultdict(list)
213+
xfd2obj = defaultdict(list)
214+
read_evmask = select.EPOLLIN | select.EPOLLPRI # Just in case
215+
216+
def store_evmasks(obj_list, evmask, fd2obj):
217+
for obj in obj_list:
218+
fileno = _to_fileno(obj)
219+
eventmasks[fileno] |= evmask
220+
fd2obj[fileno].append(obj)
221+
222+
store_evmasks(rlist, read_evmask, rfd2obj)
223+
store_evmasks(wlist, select.EPOLLOUT, wfd2obj)
224+
store_evmasks(xlist, select.EPOLLERR, xfd2obj)
225+
226+
poller = select.epoll()
227+
228+
for fileno in eventmasks:
229+
poller.register(fileno, eventmasks[fileno])
230+
231+
try:
232+
events = poller.poll(timeout)
233+
revents = []
234+
wevents = []
235+
xevents = []
236+
for fileno, event in events:
237+
if event & read_evmask:
238+
revents += rfd2obj.get(fileno, [])
239+
if event & select.EPOLLOUT:
240+
wevents += wfd2obj.get(fileno, [])
241+
if event & select.EPOLLERR:
242+
xevents += xfd2obj.get(fileno, [])
243+
finally:
244+
poller.close()
245+
246+
return revents, wevents, xevents
247+
170248
def socket(self):
171249
return utils.create_tcp_socket(socket)
172250

kazoo/tests/test_threading_handler.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,31 @@ def test_double_start_stop(self):
4646
h.stop()
4747
self.assertFalse(h._running)
4848

49+
def test_huge_file_descriptor(self):
50+
from kazoo.handlers.threading import _HAS_EPOLL
51+
if not _HAS_EPOLL:
52+
self.skipTest('only run on systems with epoll()')
53+
import resource
54+
import socket
55+
from kazoo.handlers.utils import create_tcp_socket
56+
try:
57+
resource.setrlimit(resource.RLIMIT_NOFILE, (4096, 4096))
58+
except (ValueError, resource.error):
59+
self.skipTest('couldnt raise fd limit high enough')
60+
fd = 0
61+
socks = []
62+
while fd < 4000:
63+
sock = create_tcp_socket(socket)
64+
fd = sock.fileno()
65+
socks.append(sock)
66+
h = self._makeOne()
67+
h.start()
68+
h.select(socks, [], [])
69+
with self.assertRaises(ValueError):
70+
h._select(socks, [], [])
71+
h._epoll_select(socks, [], [])
72+
h.stop()
73+
4974

5075
class TestThreadingAsync(unittest.TestCase):
5176
def _makeOne(self, *args):

0 commit comments

Comments
 (0)