Skip to content

Commit 86a1843

Browse files
committed
Merge pull request #333 from rgs1/support-reconfig
Add support for reconfig cluster membership operation (issue #234)
2 parents 62bdb34 + 8d2db77 commit 86a1843

5 files changed

Lines changed: 179 additions & 0 deletions

File tree

CHANGES.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ Changelog
77
Features
88
********
99

10+
- Issue #234: Add support for reconfig cluster membership operation
11+
1012
Bug Handling
1113
************
1214

kazoo/client.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
GetACL,
4040
SetACL,
4141
GetData,
42+
Reconfig,
4243
SetData,
4344
Sync,
4445
Transaction
@@ -1326,6 +1327,98 @@ def _delete_recursive(self, path):
13261327
except NoNodeError: # pragma: nocover
13271328
pass
13281329

1330+
def reconfig(self, joining, leaving, new_members, from_config=-1):
1331+
"""Reconfig a cluster.
1332+
1333+
This call will succeed if the cluster was reconfigured accordingly.
1334+
1335+
:param joining: a comma separated list of servers being added
1336+
(see example for format) (incremental reconfiguration)
1337+
:param leaving: a comma separated list of servers being removed
1338+
(see example for format) (incremental reconfiguration)
1339+
:param new_members: a comma separated list of new membership
1340+
(non-incremental reconfiguration)
1341+
:param from_config: version of the current configuration (optional -
1342+
causes reconfiguration to throw an exception if
1343+
configuration is no longer current)
1344+
:type from_config: int
1345+
:returns:
1346+
Tuple (value, :class:`~kazoo.protocol.states.ZnodeStat`) of
1347+
node.
1348+
:rtype: tuple
1349+
1350+
Basic Example:
1351+
1352+
.. code-block:: python
1353+
1354+
zk = KazooClient()
1355+
zk.start()
1356+
1357+
# first add an observer (incremental reconfiguration)
1358+
joining = 'server.100=10.0.0.10:2889:3888:observer;0.0.0.0:2181'
1359+
data, _ = zk.reconfig(
1360+
joining=joining, leaving=None, new_members=None)
1361+
1362+
# wait and then remove it (just by using its id) (incremental)
1363+
data, _ = zk.reconfig(joining=None, leaving='100', new_members=None)
1364+
1365+
# now do a full change of the cluster (non-incremental)
1366+
new = [
1367+
'server.100=10.0.0.10:2889:3888:observer;0.0.0.0:2181',
1368+
'server.100=10.0.0.11:2889:3888:observer;0.0.0.0:2181',
1369+
'server.100=10.0.0.12:2889:3888:observer;0.0.0.0:2181',
1370+
]
1371+
data, _ = zk.reconfig(
1372+
joining=None, leaving=None, new_members=','.join(new))
1373+
1374+
zk.stop()
1375+
1376+
:raises:
1377+
:exc:`~kazoo.exceptions.UnimplementedError` if not supported.
1378+
1379+
:exc:`~kazoo.exceptions.NewConfigNoQuorumError` if no quorum of new
1380+
config is connected and up-to-date with the leader of last
1381+
commmitted config - try invoking reconfiguration after new servers
1382+
are connected and synced.
1383+
1384+
:exc:`~kazoo.exceptions.ReconfigInProcessError` if another
1385+
reconfiguration is in progress.
1386+
1387+
:exc:`~kazoo.exceptions.BadVersionError` if version doesn't
1388+
match.
1389+
1390+
:exc:`~kazoo.exceptions.BadArgumentsError` if any of the given
1391+
lists of servers has a bad format.
1392+
1393+
:exc:`~kazoo.exceptions.ZookeeperError` if the server
1394+
returns a non-zero error code.
1395+
1396+
"""
1397+
result = self.reconfig_async(joining, leaving, new_members, from_config)
1398+
return result.get()
1399+
1400+
def reconfig_async(self, joining, leaving, new_members, from_config):
1401+
"""Asynchronously reconfig a cluster. Takes the same arguments as
1402+
:meth:`reconfig`.
1403+
1404+
:rtype: :class:`~kazoo.interfaces.IAsyncResult`
1405+
1406+
"""
1407+
if joining and not isinstance(joining, basestring):
1408+
raise TypeError("joining must be a string")
1409+
if leaving and not isinstance(leaving, basestring):
1410+
raise TypeError("leaving must be a string")
1411+
if new_members and not isinstance(new_members, basestring):
1412+
raise TypeError("new_members must be a string")
1413+
if not isinstance(from_config, int):
1414+
raise TypeError("from_config must be an int")
1415+
1416+
async_result = self.handler.async_result()
1417+
reconfig = Reconfig(joining, leaving, new_members, from_config)
1418+
self._call(reconfig, async_result)
1419+
1420+
return async_result
1421+
13291422

13301423
class TransactionRequest(object):
13311424
"""A Zookeeper Transaction Request

kazoo/exceptions.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,16 @@ class BadArgumentsError(ZookeeperError):
107107
pass
108108

109109

110+
@_zookeeper_exception(-13)
111+
class NewConfigNoQuorumError(ZookeeperError):
112+
pass
113+
114+
115+
@_zookeeper_exception(-14)
116+
class ReconfigInProcessError(ZookeeperError):
117+
pass
118+
119+
110120
@_zookeeper_exception(-100)
111121
class APIError(ZookeeperError):
112122
pass

kazoo/protocol/serialization.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
# Struct objects with formats compiled
1111
bool_struct = struct.Struct('B')
1212
int_struct = struct.Struct('!i')
13+
long_struct = struct.Struct('!q')
1314
int_int_struct = struct.Struct('!ii')
1415
int_int_long_struct = struct.Struct('!iiq')
1516

@@ -352,6 +353,24 @@ def unchroot(client, response):
352353
return resp
353354

354355

356+
class Reconfig(namedtuple('Reconfig', 'joining leaving new_members config_id')):
357+
type = 16
358+
359+
def serialize(self):
360+
b = bytearray()
361+
b.extend(write_string(self.joining))
362+
b.extend(write_string(self.leaving))
363+
b.extend(write_string(self.new_members))
364+
b.extend(long_struct.pack(self.config_id))
365+
return b
366+
367+
@classmethod
368+
def deserialize(cls, bytes, offset):
369+
data, offset = read_buffer(bytes, offset)
370+
stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
371+
return data, stat
372+
373+
355374
class Auth(namedtuple('Auth', 'auth_type scheme auth')):
356375
type = 100
357376

kazoo/tests/test_client.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from kazoo.exceptions import (
1616
AuthFailedError,
1717
BadArgumentsError,
18+
BadVersionError,
1819
ConfigurationError,
1920
ConnectionClosedError,
2021
ConnectionLoss,
@@ -1147,3 +1148,57 @@ def test_unchroot(self):
11471148
client.chroot = '/a'
11481149
self.assertEquals(client.unchroot('/a/b'), '/b')
11491150
self.assertEquals(client.unchroot('/b/c'), '/b/c')
1151+
1152+
1153+
class TestReconfig(KazooTestCase):
1154+
1155+
def setUp(self):
1156+
KazooTestCase.setUp(self)
1157+
if TRAVIS_ZK_VERSION:
1158+
version = TRAVIS_ZK_VERSION
1159+
else:
1160+
version = self.client.server_version()
1161+
if not version or version < (3, 5):
1162+
raise SkipTest("Must use Zookeeper 3.5 or above")
1163+
1164+
def test_add_remove_observer(self):
1165+
def free_sock_port():
1166+
s = socket.socket()
1167+
s.bind(('', 0))
1168+
return s, s.getsockname()[1]
1169+
1170+
# get ports for election, zab and client endpoints. we need to use
1171+
# ports for which we'd immediately get a RST upon connect(); otherwise
1172+
# the cluster could crash if it gets a SocketTimeoutException:
1173+
# https://issues.apache.org/jira/browse/ZOOKEEPER-2202
1174+
s1, port1 = free_sock_port()
1175+
s2, port2 = free_sock_port()
1176+
s3, port3 = free_sock_port()
1177+
1178+
joining = 'server.100=0.0.0.0:%d:%d:observer;0.0.0.0:%d' % (
1179+
port1, port2, port3)
1180+
data, _ = self.client.reconfig(joining=joining,
1181+
leaving=None,
1182+
new_members=None)
1183+
self.assertIn(joining, data)
1184+
1185+
data, _ = self.client.reconfig(joining=None,
1186+
leaving='100',
1187+
new_members=None)
1188+
self.assertNotIn(joining, data)
1189+
1190+
# try to add it again, but a config number in the future
1191+
curver = int(data.split('\n')[-1].split('=')[1], base=16)
1192+
self.assertRaises(BadVersionError,
1193+
self.client.reconfig,
1194+
joining=joining,
1195+
leaving=None,
1196+
new_members=None,
1197+
from_config=curver + 1)
1198+
1199+
def test_bad_input(self):
1200+
self.assertRaises(BadArgumentsError,
1201+
self.client.reconfig,
1202+
joining='some thing',
1203+
leaving=None,
1204+
new_members=None)

0 commit comments

Comments
 (0)