1414and/or the lease has been lost.
1515
1616"""
17+ import re
1718import sys
19+
1820try :
1921 from time import monotonic as now
2022except ImportError :
2729 CancelledError ,
2830 KazooException ,
2931 LockTimeout ,
30- NoNodeError
32+ NoNodeError ,
3133)
3234from kazoo .protocol .states import KazooState
3335from kazoo .retry import (
3436 ForceRetryError ,
3537 KazooRetry ,
36- RetryFailedError
38+ RetryFailedError ,
3739)
3840
3941
@@ -82,22 +84,38 @@ class Lock(object):
8284 # sequence number. Involved in read/write locks.
8385 _EXCLUDE_NAMES = ["__lock__" ]
8486
85- def __init__ (self , client , path , identifier = None ):
87+ def __init__ (self , client , path , identifier = None , extra_lock_patterns = () ):
8688 """Create a Kazoo lock.
8789
8890 :param client: A :class:`~kazoo.client.KazooClient` instance.
8991 :param path: The lock path to use.
90- :param identifier: Name to use for this lock contender. This
91- can be useful for querying to see who the
92- current lock contenders are.
93-
92+ :param identifier: Name to use for this lock contender. This can be
93+ useful for querying to see who the current lock
94+ contenders are.
95+ :param extra_lock_patterns: Strings that will be used to
96+ identify other znode in the path
97+ that should be considered contenders
98+ for this lock.
99+ Use this for cross-implementation
100+ compatibility.
101+
102+ .. versionadded:: 2.7.1
103+ The extra_lock_patterns option.
94104 """
95105 self .client = client
96106 self .path = path
107+ self ._exclude_names = set (
108+ self ._EXCLUDE_NAMES + list (extra_lock_patterns )
109+ )
110+ self ._contenders_re = re .compile (
111+ r"(?:{patterns})(-?\d{{10}})$" .format (
112+ patterns = "|" .join (self ._exclude_names )
113+ )
114+ )
97115
98116 # some data is written to the node. this can be queried via
99117 # contenders() to see who is contending for the lock
100- self .data = str (identifier or "" ).encode (' utf-8' )
118+ self .data = str (identifier or "" ).encode (" utf-8" )
101119 self .node = None
102120
103121 self .wake_event = client .handler .event_object ()
@@ -113,8 +131,9 @@ def __init__(self, client, path, identifier=None):
113131 self .is_acquired = False
114132 self .assured_path = False
115133 self .cancelled = False
116- self ._retry = KazooRetry (max_tries = None ,
117- sleep_func = client .handler .sleep_func )
134+ self ._retry = KazooRetry (
135+ max_tries = None , sleep_func = client .handler .sleep_func
136+ )
118137 self ._lock = client .handler .lock_object ()
119138
120139 def _ensure_path (self ):
@@ -171,6 +190,7 @@ def _acquire_lock():
171190 return False
172191 if not locked :
173192 # Lock acquire doesn't take a timeout, so simulate it...
193+ # XXX: This is not true in Py3 >= 3.2
174194 try :
175195 locked = retry (_acquire_lock )
176196 except RetryFailedError :
@@ -179,9 +199,12 @@ def _acquire_lock():
179199 try :
180200 gotten = False
181201 try :
182- gotten = retry (self ._inner_acquire ,
183- blocking = blocking , timeout = timeout ,
184- ephemeral = ephemeral )
202+ gotten = retry (
203+ self ._inner_acquire ,
204+ blocking = blocking ,
205+ timeout = timeout ,
206+ ephemeral = ephemeral ,
207+ )
185208 except RetryFailedError :
186209 pass
187210 except KazooException :
@@ -222,8 +245,9 @@ def _inner_acquire(self, blocking, timeout, ephemeral=True):
222245 self .create_tried = True
223246
224247 if not node :
225- node = self .client .create (self .create_path , self .data ,
226- ephemeral = ephemeral , sequence = True )
248+ node = self .client .create (
249+ self .create_path , self .data , ephemeral = ephemeral , sequence = True
250+ )
227251 # strip off path to node
228252 node = node [len (self .path ) + 1 :]
229253
@@ -236,18 +260,8 @@ def _inner_acquire(self, blocking, timeout, ephemeral=True):
236260 if self .cancelled :
237261 raise CancelledError ()
238262
239- children = self ._get_sorted_children ()
240-
241- try :
242- our_index = children .index (node )
243- except ValueError : # pragma: nocover
244- # somehow we aren't in the children -- probably we are
245- # recovering from a session failure and our ephemeral
246- # node was removed
247- raise ForceRetryError ()
248-
249- predecessor = self .predecessor (children , our_index )
250- if not predecessor :
263+ predecessor = self ._get_predecessor (node )
264+ if predecessor is None :
251265 return True
252266
253267 if not blocking :
@@ -263,40 +277,51 @@ def _inner_acquire(self, blocking, timeout, ephemeral=True):
263277 else :
264278 self .wake_event .wait (timeout )
265279 if not self .wake_event .isSet ():
266- raise LockTimeout ("Failed to acquire lock on %s after "
267- "%s seconds" % (self .path , timeout ))
280+ raise LockTimeout (
281+ "Failed to acquire lock on %s after %s seconds"
282+ % (self .path , timeout )
283+ )
268284 finally :
269285 self .client .remove_listener (self ._watch_session )
270286
271- def predecessor (self , children , index ):
272- for c in reversed (children [:index ]):
273- if any (n in c for n in self ._EXCLUDE_NAMES ):
274- return c
275- return None
276-
277287 def _watch_predecessor (self , event ):
278288 self .wake_event .set ()
279289
280- def _get_sorted_children (self ):
290+ def _get_predecessor (self , node ):
291+ """returns `node`'s predecessor or None
292+
293+ Note: This handle the case where the current lock is not a contender
294+ (e.g. rlock), this and also edge cases where the lock's ephemeral node
295+ is gone.
296+ """
281297 children = self .client .get_children (self .path )
298+ found_self = False
299+ # Filter out the contenders using the computed regex
300+ contender_matches = []
301+ for child in children :
302+ match = self ._contenders_re .search (child )
303+ if match is not None :
304+ contender_matches .append (match )
305+ if child == node :
306+ # Remember the node's match object so we can short circuit
307+ # below.
308+ found_self = match
309+
310+ if found_self is False : # pragma: nocover
311+ # somehow we aren't in the childrens -- probably we are
312+ # recovering from a session failure and our ephemeral
313+ # node was removed.
314+ raise ForceRetryError ()
315+
316+ predecessor = None
317+ # Sort the contenders using the sequence number extracted by the regex,
318+ # then extract the original string.
319+ for match in sorted (contender_matches , key = lambda m : m .groups ()):
320+ if match is found_self :
321+ break
322+ predecessor = match .string
282323
283- # Node names are prefixed by a type: strip the prefix first, which may
284- # be one of multiple values in case of a read-write lock, and return
285- # only the sequence number (as a string since it is padded and will
286- # sort correctly anyway).
287- #
288- # In some cases, the lock path may contain nodes with other prefixes
289- # (eg. in case of a lease), just sort them last ('~' sorts after all
290- # ASCII digits).
291- def _seq (c ):
292- for name in ["__lock__" , "__rlock__" ]:
293- idx = c .find (name )
294- if idx != - 1 :
295- return c [idx + len (name ):]
296- # Sort unknown node names eg. "lease_holder" last.
297- return '~'
298- children .sort (key = _seq )
299- return children
324+ return predecessor
300325
301326 def _find_node (self ):
302327 children = self .client .get_children (self .path )
@@ -347,16 +372,37 @@ def contenders(self):
347372 if not self .assured_path :
348373 self ._ensure_path ()
349374
350- children = self ._get_sorted_children ()
351-
352- contenders = []
375+ children = self .client .get_children (self .path )
376+ # We want all contenders, including self (this is especially important
377+ # for r/w locks). This is similar to the logic of `_get_predecessor`
378+ # except we include our own pattern.
379+ all_contenders_re = re .compile (
380+ r"(?:{patterns})(-?\d{{10}})$" .format (
381+ patterns = "|" .join (self ._exclude_names | {self ._NODE_NAME })
382+ )
383+ )
384+ # Filter out the contenders using the computed regex
385+ contender_matches = []
353386 for child in children :
387+ match = all_contenders_re .search (child )
388+ if match is not None :
389+ contender_matches .append (match )
390+ # Sort the contenders using the sequence number extracted by the regex,
391+ # then extract the original string.
392+ contender_nodes = [
393+ match .string
394+ for match in sorted (contender_matches , key = lambda m : m .groups ())
395+ ]
396+ # Retrieve all the contender nodes data (preserving order).
397+ contenders = []
398+ for node in contender_nodes :
354399 try :
355- data , stat = self .client .get (self .path + "/" + child )
400+ data , stat = self .client .get (self .path + "/" + node )
356401 if data is not None :
357- contenders .append (data .decode (' utf-8' ))
402+ contenders .append (data .decode (" utf-8" ))
358403 except NoNodeError : # pragma: nocover
359404 pass
405+
360406 return contenders
361407
362408 def __enter__ (self ):
@@ -391,6 +437,7 @@ class WriteLock(Lock):
391437 shared lock.
392438
393439 """
440+
394441 _NODE_NAME = "__lock__"
395442 _EXCLUDE_NAMES = ["__lock__" , "__rlock__" ]
396443
@@ -420,6 +467,7 @@ class ReadLock(Lock):
420467 shared lock.
421468
422469 """
470+
423471 _NODE_NAME = "__rlock__"
424472 _EXCLUDE_NAMES = ["__lock__" ]
425473
@@ -458,6 +506,7 @@ class Semaphore(object):
458506 The max_leases check.
459507
460508 """
509+
461510 def __init__ (self , client , path , identifier = None , max_leases = 1 ):
462511 """Create a Kazoo Lock
463512
@@ -483,12 +532,12 @@ def __init__(self, client, path, identifier=None, max_leases=1):
483532
484533 # some data is written to the node. this can be queried via
485534 # contenders() to see who is contending for the lock
486- self .data = str (identifier or "" ).encode (' utf-8' )
535+ self .data = str (identifier or "" ).encode (" utf-8" )
487536 self .max_leases = max_leases
488537 self .wake_event = client .handler .event_object ()
489538
490539 self .create_path = self .path + "/" + uuid .uuid4 ().hex
491- self .lock_path = path + '-' + ' __lock__'
540+ self .lock_path = path + "-" + " __lock__"
492541 self .is_acquired = False
493542 self .assured_path = False
494543 self .cancelled = False
@@ -501,19 +550,19 @@ def _ensure_path(self):
501550 # node did already exist
502551 data , _ = self .client .get (self .path )
503552 try :
504- leases = int (data .decode (' utf-8' ))
553+ leases = int (data .decode (" utf-8" ))
505554 except (ValueError , TypeError ):
506555 # ignore non-numeric data, maybe the node data is used
507556 # for other purposes
508557 pass
509558 else :
510559 if leases != self .max_leases :
511560 raise ValueError (
512- "Inconsistent max leases: %s, expected: %s" %
513- (leases , self .max_leases )
561+ "Inconsistent max leases: %s, expected: %s"
562+ % (leases , self .max_leases )
514563 )
515564 else :
516- self .client .set (self .path , str (self .max_leases ).encode (' utf-8' ))
565+ self .client .set (self .path , str (self .max_leases ).encode (" utf-8" ))
517566
518567 def cancel (self ):
519568 """Cancel a pending semaphore acquire."""
@@ -548,7 +597,8 @@ def acquire(self, blocking=True, timeout=None):
548597
549598 try :
550599 self .is_acquired = self .client .retry (
551- self ._inner_acquire , blocking = blocking , timeout = timeout )
600+ self ._inner_acquire , blocking = blocking , timeout = timeout
601+ )
552602 except KazooException :
553603 # if we did ultimately fail, attempt to clean up
554604 self ._best_effort_cleanup ()
@@ -590,8 +640,9 @@ def _inner_acquire(self, blocking, timeout=None):
590640 self .wake_event .wait (w .leftover ())
591641 if not self .wake_event .isSet ():
592642 raise LockTimeout (
593- "Failed to acquire semaphore on %s "
594- "after %s seconds" % (self .path , timeout ))
643+ "Failed to acquire semaphore on %s"
644+ " after %s seconds" % (self .path , timeout )
645+ )
595646 else :
596647 return False
597648 finally :
@@ -612,8 +663,9 @@ def _get_lease(self, data=None):
612663 # Get a list of the current potential lock holders. If they change,
613664 # notify our wake_event object. This is used to unblock a blocking
614665 # self._inner_acquire call.
615- children = self .client .get_children (self .path ,
616- self ._watch_lease_change )
666+ children = self .client .get_children (
667+ self .path , self ._watch_lease_change
668+ )
617669
618670 # If there are leases available, acquire one
619671 if len (children ) < self .max_leases :
@@ -674,7 +726,7 @@ def lease_holders(self):
674726 for child in children :
675727 try :
676728 data , stat = self .client .get (self .path + "/" + child )
677- lease_holders .append (data .decode (' utf-8' ))
729+ lease_holders .append (data .decode (" utf-8" ))
678730 except NoNodeError : # pragma: nocover
679731 pass
680732 return lease_holders
0 commit comments