Skip to content

Commit 87c6d0e

Browse files
committed
Merge pull request #322 from harlowja/better-cleanup
Do cleanup in a more more robust manner
2 parents 524fbb5 + 7439ecd commit 87c6d0e

4 files changed

Lines changed: 86 additions & 65 deletions

File tree

kazoo/recipe/lock.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,6 @@ def release(self):
547547
def _inner_release(self):
548548
if not self.is_acquired:
549549
return False
550-
551550
try:
552551
self.client.delete(self.create_path)
553552
except NoNodeError: # pragma: nocover

kazoo/testing/common.py

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222

2323
import code
24+
import logging
2425
import os
2526
import os.path
2627
import shutil
@@ -34,6 +35,9 @@
3435
from glob import glob
3536

3637

38+
log = logging.getLogger(__name__)
39+
40+
3741
def debug(sig, frame):
3842
"""Interrupt running process, and provide a python prompt for
3943
interactive debugging."""
@@ -145,22 +149,26 @@ def run(self):
145149
log4j.appender.ROLLINGFILE.File=""" + to_java_compatible_path( # NOQA
146150
self.working_path + os.sep + "zookeeper.log\n"))
147151

148-
self.process = subprocess.Popen(
149-
args=["java",
150-
"-cp", self.classpath,
151-
152-
# "-Dlog4j.debug",
153-
"-Dreadonlymode.enabled=true",
154-
"-Dzookeeper.log.dir=%s" % log_path,
155-
"-Dzookeeper.root.logger=INFO,CONSOLE",
156-
"-Dlog4j.configuration=file:%s" % log4j_path,
157-
158-
# OS X: Prevent java from appearing in menu bar, process dock
159-
# and from activation of the main workspace on run.
160-
"-Djava.awt.headless=true",
161-
162-
"org.apache.zookeeper.server.quorum.QuorumPeerMain",
163-
config_path])
152+
args = [
153+
"java",
154+
"-cp", self.classpath,
155+
156+
# "-Dlog4j.debug",
157+
"-Dreadonlymode.enabled=true",
158+
"-Dzookeeper.log.dir=%s" % log_path,
159+
"-Dzookeeper.root.logger=INFO,CONSOLE",
160+
"-Dlog4j.configuration=file:%s" % log4j_path,
161+
162+
# OS X: Prevent java from appearing in menu bar, process dock
163+
# and from activation of the main workspace on run.
164+
"-Djava.awt.headless=true",
165+
166+
"org.apache.zookeeper.server.quorum.QuorumPeerMain",
167+
config_path,
168+
]
169+
self.process = subprocess.Popen(args=args)
170+
log.info("Started zookeeper process %s using args %s",
171+
self.process.pid, args)
164172
self._running = True
165173

166174
@property
@@ -226,6 +234,11 @@ def stop(self):
226234
return
227235
self.process.terminate()
228236
self.process.wait()
237+
if self.process.returncode != 0:
238+
log.warn("Zookeeper process %s failed to terminate with"
239+
" non-zero return code (it terminated with %s return"
240+
" code instead)", self.process.pid,
241+
self.process.returncode)
229242
self._running = False
230243

231244
def destroy(self):

kazoo/testing/harness.py

Lines changed: 20 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from kazoo import python2atexit as atexit
99

1010
from kazoo.client import KazooClient
11-
from kazoo.exceptions import NotEmptyError
11+
from kazoo.exceptions import KazooException, NotEmptyError
1212
from kazoo.protocol.states import (
1313
KazooState
1414
)
@@ -81,14 +81,13 @@ def servers(self):
8181
return ",".join([s.address for s in self.cluster])
8282

8383
def _get_nonchroot_client(self):
84-
return KazooClient(self.servers)
84+
c = KazooClient(self.servers)
85+
self._clients.append(c)
86+
return c
8587

8688
def _get_client(self, **kwargs):
8789
c = KazooClient(self.hosts, **kwargs)
88-
try:
89-
self._clients.append(c)
90-
except AttributeError:
91-
self._client = [c]
90+
self._clients.append(c)
9291
return c
9392

9493
def lose_connection(self, event_factory):
@@ -105,47 +104,31 @@ def setup_zookeeper(self, **client_options):
105104
The cluster will only be created on the first invocation and won't be
106105
fully torn down until exit.
107106
"""
108-
if not self.cluster[0].running:
107+
do_start = False
108+
for s in self.cluster:
109+
if not s.running:
110+
do_start = True
111+
if do_start:
109112
self.cluster.start()
110113
namespace = "/kazootests" + uuid.uuid4().hex
111114
self.hosts = self.servers + namespace
112-
113115
if 'timeout' not in client_options:
114116
client_options['timeout'] = 0.8
115117
self.client = self._get_client(**client_options)
116118
self.client.start()
117119
self.client.ensure_path("/")
118120

119121
def teardown_zookeeper(self):
120-
"""Clean up any ZNodes created during the test
121-
"""
122-
if not self.cluster[0].running:
123-
self.cluster.start()
124-
125-
tries = 0
126-
if self.client and self.client.connected:
127-
while tries < 3:
128-
try:
129-
self.client.retry(self.client.delete, '/', recursive=True)
130-
break
131-
except NotEmptyError:
132-
pass
133-
tries += 1
134-
self.client.stop()
135-
self.client.close()
136-
del self.client
137-
else:
138-
client = self._get_client()
139-
client.start()
140-
client.retry(client.delete, '/', recursive=True)
141-
client.stop()
142-
client.close()
143-
del client
144-
145-
for client in self._clients:
146-
client.stop()
147-
del client
148-
self._clients = None
122+
"""Reset and cleanup the zookeeper cluster that was started."""
123+
while self._clients:
124+
c = self._clients.pop()
125+
try:
126+
c.stop()
127+
except KazooException:
128+
log.exception("Failed stopping client %s", c)
129+
finally:
130+
c.close()
131+
self.client = None
149132

150133
def __break_connection(self, break_event, expected_state, event_factory):
151134
"""Break ZooKeeper connection using the specified event."""

kazoo/tests/test_lock.py

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,16 @@ def wait(self):
3737
class KazooLockTests(KazooTestCase):
3838
thread_count = 20
3939

40+
def __init__(self, *args, **kw):
41+
super(KazooLockTests, self).__init__(*args, **kw)
42+
self.threads_made = []
43+
44+
def tearDown(self):
45+
super(KazooLockTests, self).tearDown()
46+
while self.threads_made:
47+
t = self.threads_made.pop()
48+
t.join()
49+
4050
@staticmethod
4151
def make_condition():
4252
return threading.Condition()
@@ -45,9 +55,11 @@ def make_condition():
4555
def make_event():
4656
return threading.Event()
4757

48-
@staticmethod
49-
def make_thread(*args, **kwargs):
50-
return threading.Thread(*args, **kwargs)
58+
def make_thread(self, *args, **kwargs):
59+
t = threading.Thread(*args, **kwargs)
60+
t.daemon = True
61+
self.threads_made.append(t)
62+
return t
5163

5264
@staticmethod
5365
def make_wait():
@@ -389,6 +401,16 @@ def _thread(lock, event, timeout):
389401

390402
class TestSemaphore(KazooTestCase):
391403

404+
def __init__(self, *args, **kw):
405+
super(TestSemaphore, self).__init__(*args, **kw)
406+
self.threads_made = []
407+
408+
def tearDown(self):
409+
super(TestSemaphore, self).tearDown()
410+
while self.threads_made:
411+
t = self.threads_made.pop()
412+
t.join()
413+
392414
@staticmethod
393415
def make_condition():
394416
return threading.Condition()
@@ -397,9 +419,11 @@ def make_condition():
397419
def make_event():
398420
return threading.Event()
399421

400-
@staticmethod
401-
def make_thread(*args, **kwargs):
402-
return threading.Thread(*args, **kwargs)
422+
def make_thread(self, *args, **kwargs):
423+
t = threading.Thread(*args, **kwargs)
424+
t.daemon = True
425+
self.threads_made.append(t)
426+
return t
403427

404428
def setUp(self):
405429
super(TestSemaphore, self).setUp()
@@ -539,8 +563,8 @@ def sema_one():
539563
event.set()
540564
event2.wait()
541565

542-
thread = self.make_thread(target=sema_one, args=())
543-
thread.start()
566+
thread1 = self.make_thread(target=sema_one, args=())
567+
thread1.start()
544568

545569
started.wait()
546570
eq_(lh_semaphore.lease_holders(), ['george'])
@@ -552,8 +576,8 @@ def expire():
552576
self.expire_session(self.make_event)
553577
expired.set()
554578

555-
thread = self.make_thread(target=expire, args=())
556-
thread.start()
579+
thread2 = self.make_thread(target=expire, args=())
580+
thread2.start()
557581
expire_semaphore.wake_event.wait()
558582
expired.wait()
559583

@@ -563,7 +587,9 @@ def expire():
563587
event.wait(15)
564588
eq_(expire_semaphore.lease_holders(), ['fred'])
565589
event2.set()
566-
thread.join()
590+
591+
for t in (thread1, thread2):
592+
t.join()
567593

568594
def test_inconsistent_max_leases(self):
569595
sem1 = self.client.Semaphore(self.lockpath, max_leases=1)

0 commit comments

Comments
 (0)