Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ jobs:
run: docker pull ${{ inputs.serviceImage }}

- name: Run test tool
uses: restatedev/e2e/sdk-tests@v1.0
uses: restatedev/e2e/sdk-tests@v2.2
with:
restateContainerImage: ${{ inputs.restateCommit != '' && 'localhost/restatedev/restate-commit-download:latest' || (inputs.restateImage != '' && inputs.restateImage || 'ghcr.io/restatedev/restate:main') }}
serviceContainerImage: ${{ inputs.serviceImage != '' && inputs.serviceImage || 'restatedev/test-services-python' }}
Expand Down
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ doc = false
[dependencies]
pyo3 = { version = "0.25.1", features = ["extension-module"] }
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
restate-sdk-shared-core = { git = "https://github.com/restatedev/sdk-shared-core.git", rev = "5127f0291bff456a515f2b8d572c4090e8ff450e", features = ["request_identity", "sha2_random_seed"] }
restate-sdk-shared-core = { version = "7.0.0", features = ["request_identity", "sha2_random_seed"] }
2 changes: 2 additions & 0 deletions python/restate/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
RestateDurableFuture,
RestateDurableCallFuture,
RestateDurableSleepFuture,
ScopedContext,
SendHandle,
RunOptions,
)
Expand Down Expand Up @@ -101,6 +102,7 @@ async def create_client(
"RestateDurableCallFuture",
"RestateDurableSleepFuture",
"SendHandle",
"ScopedContext",
"RunOptions",
"TerminalError",
"app",
Expand Down
148 changes: 140 additions & 8 deletions python/restate/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import typing
from contextlib import asynccontextmanager

from .client_types import RestateClient, RestateClientSendHandle, HttpError
from .client_types import RestateClient, RestateClientSendHandle, RestateScopedClient, HttpError

from .context import HandlerType
from .serde import BytesSerde, JsonSerde, Serde
Expand All @@ -36,6 +36,9 @@ def __init__(self, client: httpx.AsyncClient, headers: typing.Optional[dict] = N
self.headers = headers or {}
self.client = client

def scope(self, scope: str) -> RestateScopedClient:
return ScopedClient(self, scope)

async def do_call(
self,
tpe: HandlerType[I, O],
Expand All @@ -46,6 +49,8 @@ async def do_call(
idempotency_key: str | None = None,
headers: typing.Dict[str, str] | None = None,
force_json_output: bool = False,
scope: str | None = None,
limit_key: str | None = None,
) -> O:
"""Make an RPC call to the given handler"""
target_handler = handler_from_callable(tpe)
Expand Down Expand Up @@ -77,6 +82,8 @@ async def do_call(
send=send,
idempotency_key=idempotency_key,
headers=headers,
scope=scope,
limit_key=limit_key,
)

async def do_raw_call(
Expand All @@ -91,6 +98,8 @@ async def do_raw_call(
send: bool = False,
idempotency_key: str | None = None,
headers: typing.Dict[str, str] | None = None,
scope: str | None = None,
limit_key: str | None = None,
) -> O:
"""Make an RPC call to the given handler"""
parameter = input_serde.serialize(input_param)
Expand All @@ -112,6 +121,8 @@ async def do_raw_call(
key=key,
delay=ms,
idempotency_key=idempotency_key,
scope=scope,
limit_key=limit_key,
)
return output_serde.deserialize(res) # type: ignore

Expand All @@ -126,21 +137,37 @@ async def post(
key: str | None = None,
delay: int | None = None,
idempotency_key: str | None = None,
scope: str | None = None,
limit_key: str | None = None,
) -> bytes:
"""
Send a POST request to the Restate service.
"""
endpoint = service
if key:
endpoint += f"/{key}"
endpoint += f"/{handler}"
if send:
endpoint += "/send"
if delay is not None:
if scope is not None:
# Scoped invocations use the dedicated ingress path:
# restate/scope/{scope}/call/{service}[/{key}]/{handler}
# restate/scope/{scope}/send/{service}[/{key}]/{handler}
verb = "send" if send else "call"
endpoint = f"restate/scope/{scope}/{verb}/{service}"
if key:
endpoint += f"/{key}"
endpoint += f"/{handler}"
if send and delay is not None:
endpoint = endpoint + f"?delay={delay}"
else:
endpoint = service
if key:
endpoint += f"/{key}"
endpoint += f"/{handler}"
if send:
endpoint += "/send"
if delay is not None:
endpoint = endpoint + f"?delay={delay}"
dict_headers = dict(headers) if headers is not None else {}
if idempotency_key is not None:
dict_headers["Idempotency-Key"] = idempotency_key
if limit_key is not None:
dict_headers["x-restate-limit-key"] = limit_key
res = await self.client.post(endpoint, headers=dict_headers, content=content)
if res.status_code >= 400:
raise HttpError(res.status_code, res.reason_phrase, res.text)
Expand Down Expand Up @@ -250,6 +277,8 @@ async def generic_call(
key: str | None = None,
idempotency_key: str | None = None,
headers: typing.Dict[str, str] | None = None,
scope: str | None = None,
limit_key: str | None = None,
) -> bytes:
serde = BytesSerde()
call_handle = await self.do_raw_call(
Expand All @@ -261,6 +290,8 @@ async def generic_call(
key=key,
idempotency_key=idempotency_key,
headers=headers,
scope=scope,
limit_key=limit_key,
)
return call_handle

Expand All @@ -273,6 +304,8 @@ async def generic_send(
send_delay: timedelta | None = None,
idempotency_key: str | None = None,
headers: typing.Dict[str, str] | None = None,
scope: str | None = None,
limit_key: str | None = None,
) -> RestateClientSendHandle:
serde = BytesSerde()
output_serde: Serde[dict] = JsonSerde()
Expand All @@ -288,11 +321,110 @@ async def generic_send(
send=True,
idempotency_key=idempotency_key,
headers=headers,
scope=scope,
limit_key=limit_key,
)

return RestateClientSendHandle(send_handle_json.get("invocationId", ""), 200) # TODO: verify


class ScopedClient(RestateScopedClient):
"""
A scoped client returned by ``client.scope(scope_key)``.

Re-dispatches to the underlying :class:`Client` with the captured scope and a
per-call ``limit_key``.
"""

def __init__(self, client: Client, scope_key: str):
self.client = client
self.scope_key = scope_key

async def service_call(
self,
tpe: HandlerType[I, O],
arg: I,
limit_key: str | None = None,
idempotency_key: str | None = None,
headers: typing.Dict[str, str] | None = None,
) -> O:
return await self.client.do_call(
tpe,
arg,
idempotency_key=idempotency_key,
headers=headers,
scope=self.scope_key,
limit_key=limit_key,
)

async def service_send(
self,
tpe: HandlerType[I, O],
arg: I,
send_delay: typing.Optional[timedelta] = None,
limit_key: str | None = None,
idempotency_key: str | None = None,
headers: typing.Dict[str, str] | None = None,
) -> RestateClientSendHandle:
send_handle = await self.client.do_call(
tpe,
parameter=arg,
send=True,
send_delay=send_delay,
idempotency_key=idempotency_key,
headers=headers,
force_json_output=True,
scope=self.scope_key,
limit_key=limit_key,
)
send = typing.cast(typing.Dict[str, str], send_handle)
return RestateClientSendHandle(send.get("invocationId", ""), 200)

async def workflow_call(
self,
tpe: HandlerType[I, O],
key: str,
arg: I,
limit_key: str | None = None,
idempotency_key: str | None = None,
headers: typing.Dict[str, str] | None = None,
) -> O:
return await self.client.do_call(
tpe,
arg,
key,
idempotency_key=idempotency_key,
headers=headers,
scope=self.scope_key,
limit_key=limit_key,
)

async def workflow_send(
self,
tpe: HandlerType[I, O],
key: str,
arg: I,
send_delay: typing.Optional[timedelta] = None,
limit_key: str | None = None,
idempotency_key: str | None = None,
headers: typing.Dict[str, str] | None = None,
) -> RestateClientSendHandle:
send_handle = await self.client.do_call(
tpe,
parameter=arg,
key=key,
send=True,
send_delay=send_delay,
idempotency_key=idempotency_key,
headers=headers,
force_json_output=True,
scope=self.scope_key,
limit_key=limit_key,
)
send = typing.cast(typing.Dict[str, str], send_handle)
return RestateClientSendHandle(send.get("invocationId", ""), 200)


@asynccontextmanager
async def create_client(
ingress: str, headers: typing.Optional[dict] = None
Expand Down
Loading
Loading