Skip to content

Commit 35ce106

Browse files
StephenSorriauxjeffwidman
authored andcommitted
feat(core): Added SSL support (#513)
* client: Allow SSL use when communicating with Zookeeper, fixes #382 Zookeeper 3.5 supports SSL for client communications, this commit adds support for it on the Kazoo side. Note that you need to give the client the key, certificate and CA files. Co-Authored-By: Monty Taylor <mordred@inaugust.com> * Added keyfile password for ssl connection * Added a way to bypass ssl certification validation * Added a timeout when using SSL connection
1 parent 7a8167d commit 35ce106

3 files changed

Lines changed: 98 additions & 23 deletions

File tree

kazoo/client.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,9 @@ def __init__(self, hosts='127.0.0.1:2181',
106106
timeout=10.0, client_id=None, handler=None,
107107
default_acl=None, auth_data=None, read_only=None,
108108
randomize_hosts=True, connection_retry=None,
109-
command_retry=None, logger=None, **kwargs):
109+
command_retry=None, logger=None, keyfile=None,
110+
keyfile_password=None, certfile=None, ca=None,
111+
use_ssl=False, verify_certs=True, **kwargs):
110112
"""Create a :class:`KazooClient` instance. All time arguments
111113
are in seconds.
112114
@@ -135,6 +137,13 @@ def __init__(self, hosts='127.0.0.1:2181',
135137
options which will be used for creating one.
136138
:param logger: A custom logger to use instead of the module
137139
global `log` instance.
140+
:param keyfile: SSL keyfile to use for authentication
141+
:param keyfile_password: SSL keyfile password
142+
:param certfile: SSL certfile to use for authentication
143+
:param ca: SSL CA file to use for authentication
144+
:param use_ssl: argument to control whether SSL is used or not
145+
:param verify_certs: when using SSL, argument to bypass
146+
certs verification
138147
139148
Basic Example:
140149
@@ -183,6 +192,12 @@ def __init__(self, hosts='127.0.0.1:2181',
183192
self.chroot = None
184193
self.set_hosts(hosts)
185194

195+
self.use_ssl = use_ssl
196+
self.verify_certs = verify_certs
197+
self.certfile = certfile
198+
self.keyfile = keyfile
199+
self.keyfile_password = keyfile_password
200+
self.ca = ca
186201
# Curator like simplified state tracking, and listeners for
187202
# state transitions
188203
self._state = KeeperState.CLOSED
@@ -648,7 +663,14 @@ def command(self, cmd=b'ruok'):
648663

649664
peer = self._connection._socket.getpeername()[:2]
650665
sock = self.handler.create_connection(
651-
peer, timeout=self._session_timeout / 1000.0)
666+
peer, timeout=self._session_timeout / 1000.0,
667+
use_ssl=self.use_ssl,
668+
ca=self.ca,
669+
certfile=self.certfile,
670+
keyfile=self.keyfile,
671+
keyfile_password=self.keyfile_password,
672+
verify_certs=self.verify_certs,
673+
)
652674
sock.sendall(cmd)
653675
result = sock.recv(8192)
654676
sock.close()

kazoo/handlers/utils.py

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import errno
44
import functools
55
import select
6+
import ssl
7+
import socket
68
import time
79

810
HAS_FNCTL = True
@@ -183,7 +185,10 @@ def create_tcp_socket(module):
183185
return sock
184186

185187

186-
def create_tcp_connection(module, address, timeout=None):
188+
def create_tcp_connection(module, address, timeout=None,
189+
use_ssl=False, ca=None, certfile=None,
190+
keyfile=None, keyfile_password=None,
191+
verify_certs=True):
187192
end = None
188193
if timeout is None:
189194
# thanks to create_connection() developers for
@@ -194,17 +199,48 @@ def create_tcp_connection(module, address, timeout=None):
194199
sock = None
195200

196201
while end is None or time.time() < end:
197-
try:
198-
# if we got a timeout, lets ensure that we decrement the time
199-
# otherwise there is no timeout set and we'll call it as such
200-
timeout_at = end if end is None else end - time.time()
201-
sock = module.create_connection(address, timeout_at)
202-
break
203-
except Exception as ex:
204-
errnum = ex.errno if isinstance(ex, OSError) else ex[0]
205-
if errnum == errno.EINTR:
206-
continue
207-
raise
202+
timeout_at = end if end is None else end - time.time()
203+
if use_ssl:
204+
# Disallow use of SSLv2 and V3 (meaning we require TLSv1.0+)
205+
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
206+
context.options |= ssl.OP_NO_SSLv2
207+
context.options |= ssl.OP_NO_SSLv3
208+
# Load default CA certs
209+
context.load_default_certs(ssl.Purpose.SERVER_AUTH)
210+
context.verify_mode = (
211+
ssl.CERT_OPTIONAL if verify_certs else ssl.CERT_NONE
212+
)
213+
if ca:
214+
context.load_verify_locations(ca)
215+
if certfile and keyfile:
216+
context.verify_mode = (
217+
ssl.CERT_REQUIRED if verify_certs else ssl.CERT_NONE
218+
)
219+
context.load_cert_chain(certfile=certfile,
220+
keyfile=keyfile,
221+
password=keyfile_password)
222+
try:
223+
# Query the address to get back it's address family
224+
addrs = socket.getaddrinfo(address[0], address[1], 0,
225+
socket.SOCK_STREAM)
226+
conn = context.wrap_socket(module.socket(addrs[0][0]))
227+
conn.settimeout(timeout_at)
228+
conn.connect(address)
229+
sock = conn
230+
break
231+
except ssl.SSLError:
232+
raise
233+
else:
234+
try:
235+
# if we got a timeout, lets ensure that we decrement the time
236+
# otherwise there is no timeout set and we'll call it as such
237+
sock = module.create_connection(address, timeout_at)
238+
break
239+
except Exception as ex:
240+
errnum = ex.errno if isinstance(ex, OSError) else ex[0]
241+
if errnum == errno.EINTR:
242+
continue
243+
raise
208244

209245
if sock is None:
210246
raise module.error

kazoo/protocol/connection.py

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -216,13 +216,21 @@ def _read(self, length, timeout):
216216
remaining = length
217217
with self._socket_error_handling():
218218
while remaining > 0:
219-
s = self.handler.select([self._socket], [], [], timeout)[0]
220-
if not s: # pragma: nocover
221-
# If the read list is empty, we got a timeout. We don't
222-
# have to check wlist and xlist as we don't set any
223-
raise self.handler.timeout_exception("socket time-out"
224-
" during read")
225-
219+
# Because of SSL framing, a select may not return when using
220+
# an SSL socket because the underlying physical socket may not
221+
# have anything to select, but the wrapped object may still
222+
# have something to read as it has previously gotten enough
223+
# data from the underlying socket.
224+
if (hasattr(self._socket, "pending")
225+
and self._socket.pending() > 0):
226+
pass
227+
else:
228+
s = self.handler.select([self._socket], [], [], timeout)[0]
229+
if not s: # pragma: nocover
230+
# If the read list is empty, we got a timeout. We don't
231+
# have to check wlist and xlist as we don't set any
232+
raise self.handler.timeout_exception(
233+
"socket time-out during read")
226234
chunk = self._socket.recv(remaining)
227235
if chunk == b'':
228236
raise ConnectionDropped('socket connection broken')
@@ -596,7 +604,8 @@ def _connect_attempt(self, host, port, retry):
596604

597605
def _connect(self, host, port):
598606
client = self.client
599-
self.logger.info('Connecting to %s:%s', host, port)
607+
self.logger.info('Connecting to %s:%s, use_ssl: %r',
608+
host, port, self.client.use_ssl)
600609

601610
self.logger.log(BLATHER,
602611
' Using session_id: %r session_passwd: %s',
@@ -605,7 +614,15 @@ def _connect(self, host, port):
605614

606615
with self._socket_error_handling():
607616
self._socket = self.handler.create_connection(
608-
(host, port), client._session_timeout / 1000.0)
617+
address=(host, port),
618+
timeout=client._session_timeout / 1000.0,
619+
use_ssl=self.client.use_ssl,
620+
keyfile=self.client.keyfile,
621+
certfile=self.client.certfile,
622+
ca=self.client.ca,
623+
keyfile_password=self.client.keyfile_password,
624+
verify_certs=self.client.verify_certs,
625+
)
609626

610627
self._socket.setblocking(0)
611628

0 commit comments

Comments
 (0)