1616"""
1717
1818import sys
19+ try :
20+ from time import monotonic as now
21+ except ImportError :
22+ from time import time as now
1923import uuid
2024
2125import six
3236from kazoo .protocol .states import KazooState
3337
3438
39+ class _Watch (object ):
40+ def __init__ (self , duration = None ):
41+ self .duration = duration
42+ self .started_at = None
43+
44+ def start (self ):
45+ self .started_at = now ()
46+
47+ def leftover (self ):
48+ if self .duration is None :
49+ return None
50+ else :
51+ elapsed = now () - self .started_at
52+ return max (0 , self .duration - elapsed )
53+
54+
3555class Lock (object ):
3656 """Kazoo Lock
3757
@@ -448,7 +468,13 @@ def _inner_acquire(self, blocking, timeout=None):
448468 if self .client .exists (self .create_path ):
449469 return True
450470
451- with self .client .Lock (self .lock_path , self .data ):
471+ w = _Watch (duration = timeout )
472+ w .start ()
473+ lock = self .client .Lock (self .lock_path , self .data )
474+ gotten = lock .acquire (blocking = blocking , timeout = w .leftover ())
475+ if not gotten :
476+ return False
477+ try :
452478 while True :
453479 self .wake_event .clear ()
454480
@@ -459,16 +485,15 @@ def _inner_acquire(self, blocking, timeout=None):
459485 if blocking :
460486 # If blocking, wait until self._watch_lease_change() is
461487 # called before returning
462- self .wake_event .wait (timeout )
488+ self .wake_event .wait (w . leftover () )
463489 if not self .wake_event .isSet ():
464490 raise LockTimeout (
465491 "Failed to acquire semaphore on %s "
466492 "after %s seconds" % (self .path , timeout ))
467493 else :
468- # If not blocking, register another watch that will trigger
469- # self._get_lease() as soon as the children change again.
470- self .client .get_children (self .path , self ._get_lease )
471494 return False
495+ finally :
496+ lock .release ()
472497
473498 def _watch_lease_change (self , event ):
474499 self .wake_event .set ()
0 commit comments