1+ import gc
2+ import importlib
13import uuid
24
35from mock import patch , call , Mock
46from nose .tools import eq_ , ok_ , assert_not_equal , raises
7+ from objgraph import count as count_refs_by_type
58
6- from kazoo .testing import KazooTestCase
9+ from kazoo .testing import KazooTestHarness
710from kazoo .exceptions import KazooException
811from kazoo .recipe .cache import TreeCache , TreeNode , TreeEvent
912
1013
11- class KazooTreeCacheTests (KazooTestCase ):
14+ class KazooAdaptiveHandlerTestCase (KazooTestHarness ):
15+ HANDLERS = (
16+ ('kazoo.handlers.gevent' , 'SequentialGeventHandler' ),
17+ ('kazoo.handlers.eventlet' , 'SequentialEventletHandler' ),
18+ ('kazoo.handlers.threading' , 'SequentialThreadingHandler' ),
19+ )
1220
21+ def setUp (self ):
22+ self .handler = self .choose_an_installed_handler ()
23+ self .setup_zookeeper (handler = self .handler )
24+
25+ def tearDown (self ):
26+ self .handler = None
27+ self .teardown_zookeeper ()
28+
29+ def choose_an_installed_handler (self ):
30+ for handler_module , handler_class in self .HANDLERS :
31+ try :
32+ mod = importlib .import_module (handler_module )
33+ cls = getattr (mod , handler_class )
34+ except ImportError :
35+ continue
36+ else :
37+ return cls ()
38+ raise ImportError ('No available handler' )
39+
40+
41+ class KazooTreeCacheTests (KazooAdaptiveHandlerTestCase ):
1342 def setUp (self ):
1443 super (KazooTreeCacheTests , self ).setUp ()
1544 self ._event_queue = self .client .handler .queue_impl ()
@@ -18,12 +47,15 @@ def setUp(self):
1847 self .cache = None
1948
2049 def tearDown (self ):
21- super (KazooTreeCacheTests , self ).tearDown ()
2250 if not self ._error_queue .empty ():
2351 try :
2452 raise self ._error_queue .get ()
2553 except FakeException :
2654 pass
55+ if self .cache is not None :
56+ self .cache .close ()
57+ self .cache = None
58+ super (KazooTreeCacheTests , self ).tearDown ()
2759
2860 def make_cache (self ):
2961 if self .cache is None :
@@ -51,6 +83,29 @@ def spy_client(self, method_name):
5183 method = getattr (self .client , method_name )
5284 return patch .object (self .client , method_name , wraps = method )
5385
86+ def _wait_gc (self ):
87+ # trigger switching on some coroutine handlers
88+ self .client .handler .sleep_func (0.1 )
89+
90+ completion_queue = getattr (self .handler , 'completion_queue' , None )
91+ if completion_queue is not None :
92+ while not self .client .handler .completion_queue .empty ():
93+ self .client .handler .sleep_func (0.1 )
94+
95+ for gen in range (3 ):
96+ gc .collect (gen )
97+
98+ def count_tree_node (self ):
99+ # inspect GC and count tree nodes for checking memory leak
100+ for retry in range (10 ):
101+ result = set ()
102+ for _ in range (5 ):
103+ self ._wait_gc ()
104+ result .add (count_refs_by_type ('TreeNode' ))
105+ if len (result ) == 1 :
106+ return list (result )[0 ]
107+ raise RuntimeError ('could not count refs exactly' )
108+
54109 def test_start (self ):
55110 self .make_cache ()
56111 self .wait_cache (since = TreeEvent .INITIALIZED )
@@ -74,12 +129,29 @@ def test_start_closed(self):
74129 self .cache .start ()
75130
76131 def test_close (self ):
132+ eq_ (self .count_tree_node (), 0 )
133+
77134 self .make_cache ()
78135 self .wait_cache (since = TreeEvent .INITIALIZED )
79136 self .client .create (self .path + '/foo/bar/baz' , makepath = True )
80137 for _ in range (3 ):
81138 self .wait_cache (TreeEvent .NODE_ADDED )
82139
140+ # setup stub watchers which are outside of tree cache
141+ stub_data_watcher = Mock (spec = lambda event : None )
142+ stub_child_watcher = Mock (spec = lambda event : None )
143+ self .client .get (self .path + '/foo' , stub_data_watcher )
144+ self .client .get_children (self .path + '/foo' , stub_child_watcher )
145+
146+ # watchers inside tree cache should be here
147+ root_path = self .client .chroot + self .path
148+ eq_ (len (self .client ._data_watchers [root_path + '/foo' ]), 2 )
149+ eq_ (len (self .client ._data_watchers [root_path + '/foo/bar' ]), 1 )
150+ eq_ (len (self .client ._data_watchers [root_path + '/foo/bar/baz' ]), 1 )
151+ eq_ (len (self .client ._child_watchers [root_path + '/foo' ]), 2 )
152+ eq_ (len (self .client ._child_watchers [root_path + '/foo/bar' ]), 1 )
153+ eq_ (len (self .client ._child_watchers [root_path + '/foo/bar/baz' ]), 1 )
154+
83155 self .cache .close ()
84156
85157 # nothing should be published since tree closed
@@ -93,6 +165,53 @@ def test_close(self):
93165 # node state should not be changed
94166 assert_not_equal (self .cache ._root ._state , TreeNode .STATE_DEAD )
95167
168+ # watchers should be reset
169+ eq_ (len (self .client ._data_watchers [root_path + '/foo' ]), 1 )
170+ eq_ (len (self .client ._data_watchers [root_path + '/foo/bar' ]), 0 )
171+ eq_ (len (self .client ._data_watchers [root_path + '/foo/bar/baz' ]), 0 )
172+ eq_ (len (self .client ._child_watchers [root_path + '/foo' ]), 1 )
173+ eq_ (len (self .client ._child_watchers [root_path + '/foo/bar' ]), 0 )
174+ eq_ (len (self .client ._child_watchers [root_path + '/foo/bar/baz' ]), 0 )
175+
176+ # outside watchers should not be deleted
177+ eq_ (list (self .client ._data_watchers [root_path + '/foo' ])[0 ],
178+ stub_data_watcher )
179+ eq_ (list (self .client ._child_watchers [root_path + '/foo' ])[0 ],
180+ stub_child_watcher )
181+
182+ # should not be any leaked memory (tree node) here
183+ self .cache = None
184+ eq_ (self .count_tree_node (), 0 )
185+
186+ def test_delete_operation (self ):
187+ self .make_cache ()
188+ self .wait_cache (since = TreeEvent .INITIALIZED )
189+
190+ eq_ (self .count_tree_node (), 1 )
191+
192+ self .client .create (self .path + '/foo/bar/baz' , makepath = True )
193+ for _ in range (3 ):
194+ self .wait_cache (TreeEvent .NODE_ADDED )
195+
196+ self .client .delete (self .path + '/foo' , recursive = True )
197+ for _ in range (3 ):
198+ self .wait_cache (TreeEvent .NODE_REMOVED )
199+
200+ # tree should be empty
201+ eq_ (self .cache ._root ._children , {})
202+
203+ # watchers should be reset
204+ root_path = self .client .chroot + self .path
205+ eq_ (self .client ._data_watchers [root_path + '/foo' ], set ())
206+ eq_ (self .client ._data_watchers [root_path + '/foo/bar' ], set ())
207+ eq_ (self .client ._data_watchers [root_path + '/foo/bar/baz' ], set ())
208+ eq_ (self .client ._child_watchers [root_path + '/foo' ], set ())
209+ eq_ (self .client ._child_watchers [root_path + '/foo/bar' ], set ())
210+ eq_ (self .client ._child_watchers [root_path + '/foo/bar/baz' ], set ())
211+
212+ # should not be any leaked memory (tree node) here
213+ eq_ (self .count_tree_node (), 1 )
214+
96215 def test_children_operation (self ):
97216 self .make_cache ()
98217 self .wait_cache (since = TreeEvent .INITIALIZED )
0 commit comments