@@ -158,6 +158,7 @@ def __init__(self, client, retry_sleeper, logger=None):
158158 self ._xid = None
159159 self ._rw_server = None
160160 self ._ro_mode = False
161+ self ._ro = False
161162
162163 self ._connection_routine = None
163164
@@ -439,9 +440,9 @@ def _read_socket(self, read_timeout):
439440 self ._send_sasl_request (challenge = response ,
440441 timeout = client ._session_timeout )
441442 else :
442- # authentication is ok, state is CONNECTED
443+ # authentication is ok, state is CONNECTED or CONNECTED_RO
443444 # remove sensible information from the object
444- client . _session_callback ( KeeperState . CONNECTED )
445+ self . _set_connected_ro_or_rw ( client )
445446 self .sasl_cli .dispose ()
446447 else :
447448 self .logger .log (BLATHER , 'Reading for header %r' , header )
@@ -684,55 +685,52 @@ def _connect(self, host, port):
684685 read_timeout )
685686
686687 if connect_result .read_only :
687- client ._session_callback (KeeperState .CONNECTED_RO )
688- self ._ro_mode = iter (self ._server_pinger ())
689- else :
690- self ._ro_mode = None
688+ self ._ro = True
691689
692- # Get a copy of the auth data before iterating, in case it is
693- # changed.
694- client_auth_data_copy = copy .copy (client .auth_data )
695-
696- if client .use_sasl and self .sasl_cli is None :
697- if PURESASL_AVAILABLE :
698- for scheme , auth in client_auth_data_copy :
699- if scheme == 'sasl' :
700- username , password = auth .split (":" )
701- self .sasl_cli = SASLClient (
702- host = client .sasl_server_principal ,
703- service = 'zookeeper' ,
704- mechanism = 'DIGEST-MD5' ,
705- username = username ,
706- password = password
707- )
708- break
709-
710- # As described in rfc
711- # https://tools.ietf.org/html/rfc2831#section-2.1
712- # sending empty challenge
713- self ._send_sasl_request (challenge = b'' ,
714- timeout = connect_timeout )
715- else :
716- self .logger .warn ('Pure-sasl library is missing while sasl'
717- ' authentification is configured. Please'
718- ' install pure-sasl library to connect '
719- 'using sasl. Now falling back '
720- 'connecting WITHOUT any '
721- 'authentification.' )
722- client .use_sasl = False
723- client ._session_callback (KeeperState .CONNECTED )
724- else :
725- client ._session_callback (KeeperState .CONNECTED )
690+ # Get a copy of the auth data before iterating, in case it is
691+ # changed.
692+ client_auth_data_copy = copy .copy (client .auth_data )
693+
694+ if client .use_sasl and self .sasl_cli is None :
695+ if PURESASL_AVAILABLE :
726696 for scheme , auth in client_auth_data_copy :
727- if scheme == "digest" :
728- ap = Auth (0 , scheme , auth )
729- zxid = self ._invoke (
730- connect_timeout / 1000.0 ,
731- ap ,
732- xid = AUTH_XID
697+ if scheme == 'sasl' :
698+ username , password = auth .split (":" )
699+ self .sasl_cli = SASLClient (
700+ host = client .sasl_server_principal ,
701+ service = 'zookeeper' ,
702+ mechanism = 'DIGEST-MD5' ,
703+ username = username ,
704+ password = password
733705 )
734- if zxid :
735- client .last_zxid = zxid
706+ break
707+
708+ # As described in rfc
709+ # https://tools.ietf.org/html/rfc2831#section-2.1
710+ # sending empty challenge
711+ self ._send_sasl_request (challenge = b'' ,
712+ timeout = connect_timeout )
713+ else :
714+ self .logger .warn ('Pure-sasl library is missing while sasl'
715+ ' authentification is configured. Please'
716+ ' install pure-sasl library to connect '
717+ 'using sasl. Now falling back '
718+ 'connecting WITHOUT any '
719+ 'authentification.' )
720+ client .use_sasl = False
721+ self ._set_connected_ro_or_rw (client )
722+ else :
723+ self ._set_connected_ro_or_rw (client )
724+ for scheme , auth in client_auth_data_copy :
725+ if scheme == "digest" :
726+ ap = Auth (0 , scheme , auth )
727+ zxid = self ._invoke (
728+ connect_timeout / 1000.0 ,
729+ ap ,
730+ xid = AUTH_XID
731+ )
732+ if zxid :
733+ client .last_zxid = zxid
736734
737735 return read_timeout , connect_timeout
738736
@@ -742,3 +740,13 @@ def _send_sasl_request(self, challenge, timeout):
742740 self ._xid = (self ._xid % 2147483647 ) + 1
743741 xid = self ._xid
744742 self ._submit (sasl_request , timeout / 1000.0 , xid )
743+
744+ def _set_connected_ro_or_rw (self , client ):
745+ """ Called to decide whether to set the KeeperState to CONNECTED_RO
746+ or CONNECTED"""
747+ if self ._ro :
748+ client ._session_callback (KeeperState .CONNECTED_RO )
749+ self ._ro_mode = iter (self ._server_pinger ())
750+ else :
751+ client ._session_callback (KeeperState .CONNECTED )
752+ self ._ro_mode = None
0 commit comments