Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions kazoo/aio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""
Simple asyncio integration of the threaded async executor engine.
"""
73 changes: 73 additions & 0 deletions kazoo/aio/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import asyncio

from kazoo.aio.handler import AioSequentialThreadingHandler
from kazoo.client import KazooClient, TransactionRequest


class AioKazooClient(KazooClient):
"""
The asyncio compatibility mostly mimics the behaviour of the base async one. All calls are wrapped in
Comment thread
Traktormaster marked this conversation as resolved.
Outdated
asyncio.shield() to prevent cancellation that is not supported in the base async implementation.
Comment thread
Traktormaster marked this conversation as resolved.
Outdated

The sync and base-async API are still completely functional. Mixing the use of any of the 3 should be okay.
Comment thread
Traktormaster marked this conversation as resolved.
Outdated
"""

def __init__(self, *args, **kwargs):
if not kwargs.get("handler"):
kwargs["handler"] = AioSequentialThreadingHandler()
KazooClient.__init__(self, *args, **kwargs)

# asyncio compatible api wrappers
async def start_aio(self):
return await asyncio.shield(self.start_async().future)

async def add_auth_aio(self, *args, **kwargs):
return await asyncio.shield(self.add_auth_async(*args, **kwargs).future)
Comment thread
Traktormaster marked this conversation as resolved.
Outdated

async def sync_aio(self, *args, **kwargs):
return await asyncio.shield(self.sync_async(*args, **kwargs).future)

async def create_aio(self, *args, **kwargs):
return await asyncio.shield(self.create_async(*args, **kwargs).future)

async def ensure_path_aio(self, *args, **kwargs):
return await asyncio.shield(self.ensure_path_async(*args, **kwargs).future)
Comment thread
Traktormaster marked this conversation as resolved.
Outdated

async def exists_aio(self, *args, **kwargs):
return await asyncio.shield(self.exists_async(*args, **kwargs).future)

async def get_aio(self, *args, **kwargs):
return await asyncio.shield(self.get_async(*args, **kwargs).future)

async def get_children_aio(self, *args, **kwargs):
return await asyncio.shield(self.get_children_async(*args, **kwargs).future)
Comment thread
Traktormaster marked this conversation as resolved.
Outdated

async def get_acls_aio(self, *args, **kwargs):
return await asyncio.shield(self.get_acls_async(*args, **kwargs).future)
Comment thread
Traktormaster marked this conversation as resolved.
Outdated

async def set_acls_aio(self, *args, **kwargs):
return await asyncio.shield(self.set_acls_async(*args, **kwargs).future)
Comment thread
Traktormaster marked this conversation as resolved.
Outdated

async def set_aio(self, *args, **kwargs):
return await asyncio.shield(self.set_async(*args, **kwargs).future)

def transaction_aio(self):
return AioTransactionRequest(self)

async def delete_aio(self, *args, **kwargs):
return await asyncio.shield(self.delete_async(*args, **kwargs).future)

async def reconfig_aio(self, *args, **kwargs):
return await asyncio.shield(self.reconfig_async(*args, **kwargs).future)
Comment thread
Traktormaster marked this conversation as resolved.
Outdated


class AioTransactionRequest(TransactionRequest):
async def commit_aio(self):
return await asyncio.shield(self.commit_async().future)

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_value, exc_tb):
if not exc_type:
await self.commit_aio()
52 changes: 52 additions & 0 deletions kazoo/aio/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import asyncio
import threading

from kazoo.handlers.threading import AsyncResult, SequentialThreadingHandler


class AioAsyncResult(AsyncResult):
def __init__(self, handler):
self.future = handler.loop.create_future()
AsyncResult.__init__(self, handler)

def set(self, value=None):
"""
The completion of the future has the same guarantees as the notification emitting of the condition.
Provided that no callbacks raise it will complete.
"""
AsyncResult.set(self, value)
self._handler.loop.call_soon_threadsafe(self.future.set_result, value)

def set_exception(self, exception):
"""
The completion of the future has the same guarantees as the notification emitting of the condition.
Provided that no callbacks raise it will complete.
"""
AsyncResult.set_exception(self, exception)
self._handler.loop.call_soon_threadsafe(self.future.set_exception, exception)


class AioSequentialThreadingHandler(SequentialThreadingHandler):
def __init__(self):
"""
Creating the handler must be done on the asyncio-loop's thread.
"""
self.loop = asyncio.get_running_loop()
self._aio_thread = threading.current_thread()
SequentialThreadingHandler.__init__(self)

def async_result(self):
"""
Almost all async-result objects are created by a method that is invoked from the user's thead. The
one exception I'm aware of is in the PatientChildrenWatch utility, that creates an async-result in
Comment thread
Traktormaster marked this conversation as resolved.
Outdated
its worker thread. Just because of that it is imperative to only create asyncio compatible results
Comment thread
Traktormaster marked this conversation as resolved.
Outdated
when the invoking code is from the loop's thread. There is no PEP/API guarantee that implementing
Comment thread
Traktormaster marked this conversation as resolved.
Outdated
the create_future() has to be thread-safe. The default is mostly thread-safe. The only thing that
Comment thread
Traktormaster marked this conversation as resolved.
Outdated
may get synchronization issue is a debug-feature for asyncio development. Quickly looking at the
Comment thread
Traktormaster marked this conversation as resolved.
Outdated
alternate implementation of uvloop, they use the default Future implementation, so no change there.
Comment thread
Traktormaster marked this conversation as resolved.
Outdated
For now, just to be safe, we check the current thread and create an async-result object based on the
Comment thread
Traktormaster marked this conversation as resolved.
Outdated
invoking thread's identity.
"""
if threading.current_thread() is self._aio_thread:
return AioAsyncResult(self)
return AsyncResult(self)
4 changes: 2 additions & 2 deletions kazoo/testing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from kazoo.testing.harness import KazooTestCase, KazooTestHarness
from kazoo.testing.harness import KazooAioTestCase, KazooTestCase, KazooTestHarness
Comment thread
Traktormaster marked this conversation as resolved.
Outdated


__all__ = ('KazooTestHarness', 'KazooTestCase', )
__all__ = ('KazooTestHarness', 'KazooTestCase', 'KazooAioTestCase', )
29 changes: 27 additions & 2 deletions kazoo/testing/harness.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""Kazoo testing harnesses"""
import asyncio
import logging
import os
import uuid
import unittest

from kazoo import python2atexit as atexit
from kazoo.aio.client import AioKazooClient
from kazoo.client import KazooClient
from kazoo.exceptions import KazooException
from kazoo.protocol.connection import _CONNECTION_DROP, _SESSION_EXPIRED
Expand Down Expand Up @@ -144,6 +146,7 @@ def test_something_else(self):

"""
DEFAULT_CLIENT_TIMEOUT = 15
CLIENT_CLS = KazooClient

def __init__(self, *args, **kw):
super(KazooTestHarness, self).__init__(*args, **kw)
Expand All @@ -159,14 +162,14 @@ def servers(self):
return ",".join([s.address for s in self.cluster])

def _get_nonchroot_client(self):
c = KazooClient(self.servers)
c = self.CLIENT_CLS(self.servers)
self._clients.append(c)
return c

def _get_client(self, **client_options):
if 'timeout' not in client_options:
client_options['timeout'] = self.DEFAULT_CLIENT_TIMEOUT
c = KazooClient(self.hosts, **client_options)
c = self.CLIENT_CLS(self.hosts, **client_options)
self._clients.append(c)
return c

Expand Down Expand Up @@ -245,3 +248,25 @@ def setUp(self):

def tearDown(self):
self.teardown_zookeeper()


class KazooAioTestCase(KazooTestHarness):
CLIENT_CLS = AioKazooClient

def __init__(self, *args, **kw):
super(KazooAioTestCase, self).__init__(*args, **kw)
self.loop = None

async def setup_zookeeper_aio(self):
self.setup_zookeeper() # NOTE: could enhance this to call start_aio() on the client
Comment thread
Traktormaster marked this conversation as resolved.
Outdated

async def teardown_zookeeper_aio(self):
self.teardown_zookeeper()

def setUp(self):
self.loop = asyncio.get_event_loop_policy().new_event_loop()
self.loop.run_until_complete(self.setup_zookeeper_aio())

def tearDown(self):
self.loop.run_until_complete(self.teardown_zookeeper_aio())
self.loop.close()
32 changes: 32 additions & 0 deletions kazoo/tests/test_aio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from kazoo.exceptions import NotEmptyError, NoNodeError
from kazoo.protocol.states import ZnodeStat
from kazoo.testing import KazooAioTestCase


class KazooAioTests(KazooAioTestCase):
def test_basic_aio_functionality(self):
self.loop.run_until_complete(self._test_basic_aio_functionality())

async def _test_basic_aio_functionality(self):
assert await self.client.create_aio("/tmp") == "/tmp"
assert await self.client.get_children_aio("/") == ["tmp"]
assert await self.client.ensure_path_aio("/tmp/x/y") == "/tmp/x/y"
assert await self.client.exists_aio("/tmp/x/y")
assert isinstance(await self.client.set_aio("/tmp/x/y", b"very aio"), ZnodeStat)
data, stat = await self.client.get_aio("/tmp/x/y")
assert data == b"very aio"
assert isinstance(stat, ZnodeStat)
try:
await self.client.delete_aio("/tmp/x")
except NotEmptyError:
pass
await self.client.delete_aio("/tmp/x/y")
try:
await self.client.get_aio("/tmp/x/y")
except NoNodeError:
pass
async with self.client.transaction_aio() as tx:
tx.create("/tmp/z", b"ZZZ")
tx.set_data("/tmp/x", b"XXX")
assert (await self.client.get_aio("/tmp/x"))[0] == b"XXX"
assert (await self.client.get_aio("/tmp/z"))[0] == b"ZZZ"