Skip to content

Commit 4456f18

Browse files
tonyseekjeffwidman
authored andcommitted
perf(recipe): Give TreeCache standalone queue
This commit lets TreeCache do not use queue of connection routine any more.
1 parent cbd02f5 commit 4456f18

1 file changed

Lines changed: 16 additions & 1 deletion

File tree

kazoo/recipe/cache.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ class TreeCache(object):
3636
STATE_STARTED = 1
3737
STATE_CLOSED = 2
3838

39+
_STOP = object()
40+
3941
def __init__(self, client, path):
4042
self._client = client
4143
self._root = TreeNode.make_root(self, path)
@@ -44,6 +46,8 @@ def __init__(self, client, path):
4446
self._is_initialized = False
4547
self._error_listeners = []
4648
self._event_listeners = []
49+
self._task_queue = client.handler.queue_impl()
50+
self._task_thread = None
4751

4852
def start(self):
4953
"""Starts the cache.
@@ -66,6 +70,7 @@ def start(self):
6670
else:
6771
raise KazooException('already started')
6872

73+
self._task_thread = self._client.handler.spawn(self._do_background)
6974
self._client.add_listener(self._session_watcher)
7075
self._client.ensure_path(self._root._path)
7176

@@ -87,6 +92,7 @@ def close(self):
8792
"""
8893
if self._state == self.STATE_STARTED:
8994
self._state = self.STATE_CLOSED
95+
self._task_queue.put(self._STOP)
9096
self._client.remove_listener(self._session_watcher)
9197
with handle_exception(self._error_listeners):
9298
self._root.on_deleted()
@@ -168,7 +174,16 @@ def _do_publish_event(self, event):
168174
listener(event)
169175

170176
def _in_background(self, func, *args, **kwargs):
171-
self._client.handler.callback_queue.put(lambda: func(*args, **kwargs))
177+
self._task_queue.put((func, args, kwargs))
178+
179+
def _do_background(self):
180+
while True:
181+
with handle_exception(self._error_listeners):
182+
cb = self._task_queue.get()
183+
if cb is self._STOP:
184+
break
185+
func, args, kwargs = cb
186+
func(*args, **kwargs)
172187

173188
def _session_watcher(self, state):
174189
if state == KazooState.SUSPENDED:

0 commit comments

Comments
 (0)