|
44 | 44 | Sync, |
45 | 45 | Transaction |
46 | 46 | ) |
| 47 | +from kazoo.protocol.states import Callback |
| 48 | +from kazoo.protocol.states import EventType |
47 | 49 | from kazoo.protocol.states import KazooState |
48 | 50 | from kazoo.protocol.states import KeeperState |
| 51 | +from kazoo.protocol.states import WatchedEvent |
49 | 52 | from kazoo.retry import KazooRetry |
50 | 53 | from kazoo.security import ACL |
51 | 54 | from kazoo.security import OPEN_ACL_UNSAFE |
@@ -192,7 +195,8 @@ def __init__(self, hosts='127.0.0.1:2181', |
192 | 195 | self._state = KeeperState.CLOSED |
193 | 196 | self.state = KazooState.LOST |
194 | 197 | self.state_listeners = set() |
195 | | - |
| 198 | + self._child_watchers = defaultdict(set) |
| 199 | + self._data_watchers = defaultdict(set) |
196 | 200 | self._reset() |
197 | 201 | self.read_only = read_only |
198 | 202 |
|
@@ -309,9 +313,20 @@ def _reset(self): |
309 | 313 | self._protocol_version = None |
310 | 314 |
|
311 | 315 | def _reset_watchers(self): |
| 316 | + watchers = [] |
| 317 | + for child_watchers in six.itervalues(self._child_watchers): |
| 318 | + watchers.extend(child_watchers) |
| 319 | + |
| 320 | + for data_watchers in six.itervalues(self._data_watchers): |
| 321 | + watchers.extend(data_watchers) |
| 322 | + |
312 | 323 | self._child_watchers = defaultdict(set) |
313 | 324 | self._data_watchers = defaultdict(set) |
314 | 325 |
|
| 326 | + ev = WatchedEvent(EventType.NONE, self._state, None) |
| 327 | + for watch in watchers: |
| 328 | + self.handler.dispatch_callback(Callback("watch", watch, (ev,))) |
| 329 | + |
315 | 330 | def _reset_session(self): |
316 | 331 | self._session_id = None |
317 | 332 | self._session_passwd = b'\x00' * 16 |
|
0 commit comments