Skip to content

Commit 7439ecd

Browse files
author
Joshua Harlow
committed
Do cleanup in a more more robust manner
- Adds checks to make sure that on each stop that is called that the zookeeper process has exited cleanly with a zero error code (and log if this doesn't happen) - Log the started programs - Track all created clients (and stop them on teardown) - Track threads made in lock/sempahore tests and clean them up
1 parent d780c64 commit 7439ecd

4 files changed

Lines changed: 86 additions & 65 deletions

File tree

kazoo/recipe/lock.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,6 @@ def release(self):
522522
def _inner_release(self):
523523
if not self.is_acquired:
524524
return False
525-
526525
try:
527526
self.client.delete(self.create_path)
528527
except NoNodeError: # pragma: nocover

kazoo/testing/common.py

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222

2323
import code
24+
import logging
2425
import os
2526
import os.path
2627
import shutil
@@ -34,6 +35,9 @@
3435
from glob import glob
3536

3637

38+
log = logging.getLogger(__name__)
39+
40+
3741
def debug(sig, frame):
3842
"""Interrupt running process, and provide a python prompt for
3943
interactive debugging."""
@@ -145,22 +149,26 @@ def run(self):
145149
log4j.appender.ROLLINGFILE.File=""" + to_java_compatible_path( # NOQA
146150
self.working_path + os.sep + "zookeeper.log\n"))
147151

148-
self.process = subprocess.Popen(
149-
args=["java",
150-
"-cp", self.classpath,
151-
152-
# "-Dlog4j.debug",
153-
"-Dreadonlymode.enabled=true",
154-
"-Dzookeeper.log.dir=%s" % log_path,
155-
"-Dzookeeper.root.logger=INFO,CONSOLE",
156-
"-Dlog4j.configuration=file:%s" % log4j_path,
157-
158-
# OS X: Prevent java from appearing in menu bar, process dock
159-
# and from activation of the main workspace on run.
160-
"-Djava.awt.headless=true",
161-
162-
"org.apache.zookeeper.server.quorum.QuorumPeerMain",
163-
config_path])
152+
args = [
153+
"java",
154+
"-cp", self.classpath,
155+
156+
# "-Dlog4j.debug",
157+
"-Dreadonlymode.enabled=true",
158+
"-Dzookeeper.log.dir=%s" % log_path,
159+
"-Dzookeeper.root.logger=INFO,CONSOLE",
160+
"-Dlog4j.configuration=file:%s" % log4j_path,
161+
162+
# OS X: Prevent java from appearing in menu bar, process dock
163+
# and from activation of the main workspace on run.
164+
"-Djava.awt.headless=true",
165+
166+
"org.apache.zookeeper.server.quorum.QuorumPeerMain",
167+
config_path,
168+
]
169+
self.process = subprocess.Popen(args=args)
170+
log.info("Started zookeeper process %s using args %s",
171+
self.process.pid, args)
164172
self._running = True
165173

166174
@property
@@ -226,6 +234,11 @@ def stop(self):
226234
return
227235
self.process.terminate()
228236
self.process.wait()
237+
if self.process.returncode != 0:
238+
log.warn("Zookeeper process %s failed to terminate with"
239+
" non-zero return code (it terminated with %s return"
240+
" code instead)", self.process.pid,
241+
self.process.returncode)
229242
self._running = False
230243

231244
def destroy(self):

kazoo/testing/harness.py

Lines changed: 20 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import unittest
77

88
from kazoo.client import KazooClient
9-
from kazoo.exceptions import NotEmptyError
9+
from kazoo.exceptions import KazooException, NotEmptyError
1010
from kazoo.protocol.states import (
1111
KazooState
1212
)
@@ -79,14 +79,13 @@ def servers(self):
7979
return ",".join([s.address for s in self.cluster])
8080

8181
def _get_nonchroot_client(self):
82-
return KazooClient(self.servers)
82+
c = KazooClient(self.servers)
83+
self._clients.append(c)
84+
return c
8385

8486
def _get_client(self, **kwargs):
8587
c = KazooClient(self.hosts, **kwargs)
86-
try:
87-
self._clients.append(c)
88-
except AttributeError:
89-
self._client = [c]
88+
self._clients.append(c)
9089
return c
9190

9291
def lose_connection(self, event_factory):
@@ -103,47 +102,31 @@ def setup_zookeeper(self, **client_options):
103102
The cluster will only be created on the first invocation and won't be
104103
fully torn down until exit.
105104
"""
106-
if not self.cluster[0].running:
105+
do_start = False
106+
for s in self.cluster:
107+
if not s.running:
108+
do_start = True
109+
if do_start:
107110
self.cluster.start()
108111
namespace = "/kazootests" + uuid.uuid4().hex
109112
self.hosts = self.servers + namespace
110-
111113
if 'timeout' not in client_options:
112114
client_options['timeout'] = 0.8
113115
self.client = self._get_client(**client_options)
114116
self.client.start()
115117
self.client.ensure_path("/")
116118

117119
def teardown_zookeeper(self):
118-
"""Clean up any ZNodes created during the test
119-
"""
120-
if not self.cluster[0].running:
121-
self.cluster.start()
122-
123-
tries = 0
124-
if self.client and self.client.connected:
125-
while tries < 3:
126-
try:
127-
self.client.retry(self.client.delete, '/', recursive=True)
128-
break
129-
except NotEmptyError:
130-
pass
131-
tries += 1
132-
self.client.stop()
133-
self.client.close()
134-
del self.client
135-
else:
136-
client = self._get_client()
137-
client.start()
138-
client.retry(client.delete, '/', recursive=True)
139-
client.stop()
140-
client.close()
141-
del client
142-
143-
for client in self._clients:
144-
client.stop()
145-
del client
146-
self._clients = None
120+
"""Reset and cleanup the zookeeper cluster that was started."""
121+
while self._clients:
122+
c = self._clients.pop()
123+
try:
124+
c.stop()
125+
except KazooException:
126+
log.exception("Failed stopping client %s", c)
127+
finally:
128+
c.close()
129+
self.client = None
147130

148131
def __break_connection(self, break_event, expected_state, event_factory):
149132
"""Break ZooKeeper connection using the specified event."""

kazoo/tests/test_lock.py

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,16 @@ def wait(self):
3737
class KazooLockTests(KazooTestCase):
3838
thread_count = 20
3939

40+
def __init__(self, *args, **kw):
41+
super(KazooLockTests, self).__init__(*args, **kw)
42+
self.threads_made = []
43+
44+
def tearDown(self):
45+
super(KazooLockTests, self).tearDown()
46+
while self.threads_made:
47+
t = self.threads_made.pop()
48+
t.join()
49+
4050
@staticmethod
4151
def make_condition():
4252
return threading.Condition()
@@ -45,9 +55,11 @@ def make_condition():
4555
def make_event():
4656
return threading.Event()
4757

48-
@staticmethod
49-
def make_thread(*args, **kwargs):
50-
return threading.Thread(*args, **kwargs)
58+
def make_thread(self, *args, **kwargs):
59+
t = threading.Thread(*args, **kwargs)
60+
t.daemon = True
61+
self.threads_made.append(t)
62+
return t
5163

5264
@staticmethod
5365
def make_wait():
@@ -389,6 +401,16 @@ def _thread(lock, event, timeout):
389401

390402
class TestSemaphore(KazooTestCase):
391403

404+
def __init__(self, *args, **kw):
405+
super(TestSemaphore, self).__init__(*args, **kw)
406+
self.threads_made = []
407+
408+
def tearDown(self):
409+
super(TestSemaphore, self).tearDown()
410+
while self.threads_made:
411+
t = self.threads_made.pop()
412+
t.join()
413+
392414
@staticmethod
393415
def make_condition():
394416
return threading.Condition()
@@ -397,9 +419,11 @@ def make_condition():
397419
def make_event():
398420
return threading.Event()
399421

400-
@staticmethod
401-
def make_thread(*args, **kwargs):
402-
return threading.Thread(*args, **kwargs)
422+
def make_thread(self, *args, **kwargs):
423+
t = threading.Thread(*args, **kwargs)
424+
t.daemon = True
425+
self.threads_made.append(t)
426+
return t
403427

404428
def setUp(self):
405429
super(TestSemaphore, self).setUp()
@@ -539,8 +563,8 @@ def sema_one():
539563
event.set()
540564
event2.wait()
541565

542-
thread = self.make_thread(target=sema_one, args=())
543-
thread.start()
566+
thread1 = self.make_thread(target=sema_one, args=())
567+
thread1.start()
544568

545569
started.wait()
546570
eq_(lh_semaphore.lease_holders(), ['george'])
@@ -552,8 +576,8 @@ def expire():
552576
self.expire_session(self.make_event)
553577
expired.set()
554578

555-
thread = self.make_thread(target=expire, args=())
556-
thread.start()
579+
thread2 = self.make_thread(target=expire, args=())
580+
thread2.start()
557581
expire_semaphore.wake_event.wait()
558582
expired.wait()
559583

@@ -563,7 +587,9 @@ def expire():
563587
event.wait(15)
564588
eq_(expire_semaphore.lease_holders(), ['fred'])
565589
event2.set()
566-
thread.join()
590+
591+
for t in (thread1, thread2):
592+
t.join()
567593

568594
def test_inconsistent_max_leases(self):
569595
sem1 = self.client.Semaphore(self.lockpath, max_leases=1)

0 commit comments

Comments
 (0)