|
25 | 25 | Ping, |
26 | 26 | PingInstance, |
27 | 27 | ReplyHeader, |
| 28 | + SASL, |
28 | 29 | Transaction, |
29 | 30 | Watch, |
30 | 31 | int_struct |
|
39 | 40 | ForceRetryError, |
40 | 41 | RetryFailedError |
41 | 42 | ) |
| 43 | +try: |
| 44 | + from puresasl.client import SASLClient |
| 45 | + PURESASL_AVAILABLE = True |
| 46 | +except ImportError: |
| 47 | + PURESASL_AVAILABLE = False |
42 | 48 |
|
43 | 49 |
|
44 | 50 | log = logging.getLogger(__name__) |
@@ -154,6 +160,8 @@ def __init__(self, client, retry_sleeper, logger=None): |
154 | 160 |
|
155 | 161 | self._connection_routine = None |
156 | 162 |
|
| 163 | + self.sasl_cli = None |
| 164 | + |
157 | 165 | # This is instance specific to avoid odd thread bug issues in Python |
158 | 166 | # during shutdown global cleanup |
159 | 167 | @contextmanager |
@@ -416,6 +424,24 @@ def _read_socket(self, read_timeout): |
416 | 424 | async_object.set(True) |
417 | 425 | elif header.xid == WATCH_XID: |
418 | 426 | self._read_watch_event(buffer, offset) |
| 427 | + elif self.sasl_cli and not self.sasl_cli.complete: |
| 428 | + # SASL authentication is not yet finished, this can only |
| 429 | + # be a SASL packet |
| 430 | + self.logger.log(BLATHER, 'Received SASL') |
| 431 | + try: |
| 432 | + challenge, _ = SASL.deserialize(buffer, offset) |
| 433 | + except Exception: |
| 434 | + raise ConnectionDropped('error while SASL authentication.') |
| 435 | + response = self.sasl_cli.process(challenge) |
| 436 | + if response: |
| 437 | + # authentication not yet finished, answering the challenge |
| 438 | + self._send_sasl_request(challenge=response, |
| 439 | + timeout=client._session_timeout) |
| 440 | + else: |
| 441 | + # authentication is ok, state is CONNECTED |
| 442 | + # remove sensible information from the object |
| 443 | + client._session_callback(KeeperState.CONNECTED) |
| 444 | + self.sasl_cli.dispose() |
419 | 445 | else: |
420 | 446 | self.logger.log(BLATHER, 'Reading for header %r', header) |
421 | 447 |
|
@@ -544,11 +570,11 @@ def _connect_attempt(self, host, port, retry): |
544 | 570 | client._session_callback(KeeperState.CONNECTING) |
545 | 571 |
|
546 | 572 | try: |
| 573 | + self._xid = 0 |
547 | 574 | read_timeout, connect_timeout = self._connect(host, port) |
548 | 575 | read_timeout = read_timeout / 1000.0 |
549 | 576 | connect_timeout = connect_timeout / 1000.0 |
550 | 577 | retry.reset() |
551 | | - self._xid = 0 |
552 | 578 | self.ping_outstanding.clear() |
553 | 579 | with self._socket_error_handling(): |
554 | 580 | while not close_connection: |
@@ -660,13 +686,53 @@ def _connect(self, host, port): |
660 | 686 | client._session_callback(KeeperState.CONNECTED_RO) |
661 | 687 | self._ro_mode = iter(self._server_pinger()) |
662 | 688 | else: |
663 | | - client._session_callback(KeeperState.CONNECTED) |
664 | 689 | self._ro_mode = None |
665 | | - |
666 | | - for scheme, auth in client.auth_data: |
667 | | - ap = Auth(0, scheme, auth) |
668 | | - zxid = self._invoke(connect_timeout / 1000.0, ap, xid=AUTH_XID) |
669 | | - if zxid: |
670 | | - client.last_zxid = zxid |
| 690 | + if client.use_sasl and self.sasl_cli is None: |
| 691 | + if PURESASL_AVAILABLE: |
| 692 | + for scheme, auth in client.auth_data: |
| 693 | + if scheme == 'sasl': |
| 694 | + username, password = auth.split(":") |
| 695 | + self.sasl_cli = SASLClient( |
| 696 | + host=client.sasl_server_principal, |
| 697 | + service='zookeeper', |
| 698 | + mechanism='DIGEST-MD5', |
| 699 | + username=username, |
| 700 | + password=password |
| 701 | + ) |
| 702 | + break |
| 703 | + |
| 704 | + # As described in rfc |
| 705 | + # https://tools.ietf.org/html/rfc2831#section-2.1 |
| 706 | + # sending empty challenge |
| 707 | + self._send_sasl_request(challenge=b'', |
| 708 | + timeout=connect_timeout) |
| 709 | + else: |
| 710 | + self.logger.warn('Pure-sasl library is missing while sasl' |
| 711 | + ' authentification is configured. Please' |
| 712 | + ' install pure-sasl library to connect ' |
| 713 | + 'using sasl. Now falling back ' |
| 714 | + 'connecting WITHOUT any ' |
| 715 | + 'authentification.') |
| 716 | + client.use_sasl = False |
| 717 | + client._session_callback(KeeperState.CONNECTED) |
| 718 | + else: |
| 719 | + client._session_callback(KeeperState.CONNECTED) |
| 720 | + for scheme, auth in client.auth_data: |
| 721 | + if scheme == "digest": |
| 722 | + ap = Auth(0, scheme, auth) |
| 723 | + zxid = self._invoke( |
| 724 | + connect_timeout / 1000.0, |
| 725 | + ap, |
| 726 | + xid=AUTH_XID |
| 727 | + ) |
| 728 | + if zxid: |
| 729 | + client.last_zxid = zxid |
671 | 730 |
|
672 | 731 | return read_timeout, connect_timeout |
| 732 | + |
| 733 | + def _send_sasl_request(self, challenge, timeout): |
| 734 | + """ Called when sending a SASL request, xid needs be to incremented """ |
| 735 | + sasl_request = SASL(challenge) |
| 736 | + self._xid = (self._xid % 2147483647) + 1 |
| 737 | + xid = self._xid |
| 738 | + self._submit(sasl_request, timeout / 1000.0, xid) |
0 commit comments