Skip to content

Commit cbd02f5

Browse files
tonyseekjeffwidman
authored andcommitted
refactor: Unify queue factory in various handlers
Then every handlers have queue_impl and queue_empty as their attributes.
1 parent 9b0a793 commit cbd02f5

2 files changed

Lines changed: 9 additions & 7 deletions

File tree

kazoo/handlers/eventlet.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,13 @@ class SequentialEventletHandler(object):
7676
7777
"""
7878
name = "sequential_eventlet_handler"
79+
queue_impl = green_queue.LightQueue
80+
queue_empty = green_queue.Empty
7981

8082
def __init__(self):
8183
"""Create a :class:`SequentialEventletHandler` instance"""
82-
self.callback_queue = green_queue.LightQueue()
83-
self.completion_queue = green_queue.LightQueue()
84+
self.callback_queue = self.queue_impl()
85+
self.completion_queue = self.queue_impl()
8486
self._workers = []
8587
self._started = False
8688

kazoo/handlers/gevent.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
from gevent import socket
88
import gevent.event
99
import gevent.queue
10-
from gevent.queue import Empty
11-
from gevent.queue import Queue
1210
import gevent.select
1311
import gevent.thread
1412
try:
@@ -50,11 +48,13 @@ class SequentialGeventHandler(object):
5048
5149
"""
5250
name = "sequential_gevent_handler"
51+
queue_impl = gevent.queue.Queue
52+
queue_empty = gevent.queue.Empty
5353
sleep_func = staticmethod(gevent.sleep)
5454

5555
def __init__(self):
5656
"""Create a :class:`SequentialGeventHandler` instance"""
57-
self.callback_queue = Queue()
57+
self.callback_queue = self.queue_impl()
5858
self._running = False
5959
self._async = None
6060
self._state_change = Semaphore()
@@ -72,7 +72,7 @@ def greenlet_worker():
7272
if func is _STOP:
7373
break
7474
func()
75-
except Empty:
75+
except self.queue_empty:
7676
continue
7777
except Exception as exc:
7878
log.warning("Exception in worker greenlet")
@@ -110,7 +110,7 @@ def stop(self):
110110
worker.join()
111111

112112
# Clear the queues
113-
self.callback_queue = Queue() # pragma: nocover
113+
self.callback_queue = self.queue_impl() # pragma: nocover
114114

115115
python2atexit.unregister(self.stop)
116116

0 commit comments

Comments
 (0)