Skip to content

Commit a00f4a7

Browse files
committed
feat(examples): add UDS KV server
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
1 parent eb2a536 commit a00f4a7

5 files changed

Lines changed: 240 additions & 70 deletions

File tree

Cargo.lock

Lines changed: 16 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
[package]
2+
name = "wasi-keyvalue-unix-server"
3+
version = "0.1.0"
4+
5+
authors.workspace = true
6+
categories.workspace = true
7+
edition.workspace = true
8+
homepage.workspace = true
9+
license.workspace = true
10+
repository.workspace = true
11+
12+
[dependencies]
13+
anyhow = { workspace = true, features = ["std"] }
14+
bytes = { workspace = true }
15+
clap = { workspace = true, features = [
16+
"color",
17+
"derive",
18+
"error-context",
19+
"help",
20+
"std",
21+
"suggestions",
22+
"usage",
23+
] }
24+
futures = { workspace = true }
25+
tokio = { workspace = true, features = [
26+
"fs",
27+
"rt-multi-thread",
28+
"net",
29+
"signal",
30+
] }
31+
tracing = { workspace = true }
32+
tracing-subscriber = { workspace = true, features = [
33+
"ansi",
34+
"env-filter",
35+
"fmt",
36+
] }
37+
wrpc-transport = { workspace = true, features = ["net"] }
38+
wrpc-wasi-keyvalue = { workspace = true }
39+
wrpc-wasi-keyvalue-mem = { workspace = true }
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
use core::pin::pin;
2+
use std::path::PathBuf;
3+
use std::sync::Arc;
4+
5+
use anyhow::Context as _;
6+
use clap::Parser;
7+
use futures::stream::select_all;
8+
use futures::StreamExt as _;
9+
use tokio::task::JoinSet;
10+
use tokio::{fs, select, signal};
11+
use tracing::{debug, error, info, warn};
12+
13+
#[derive(Parser, Debug)]
14+
#[command(author, version, about, long_about = None)]
15+
struct Args {
16+
/// Path to serve `wasi:keyvalue` on
17+
#[arg(default_value = "/tmp/wrpc/wasi/keyvalue.sock")]
18+
path: PathBuf,
19+
}
20+
21+
#[tokio::main]
22+
async fn main() -> anyhow::Result<()> {
23+
tracing_subscriber::fmt().init();
24+
25+
let Args { path } = Args::parse();
26+
27+
if let Some(dir) = path.parent() {
28+
if !dir.exists() {
29+
fs::create_dir_all(dir)
30+
.await
31+
.with_context(|| format!("failed to create `{}`", dir.display()))?
32+
}
33+
}
34+
let lis = tokio::net::UnixListener::bind(&path)
35+
.with_context(|| format!("failed to bind Unix listener on `{}`", path.display()))?;
36+
let srv = Arc::new(wrpc_transport::Server::default());
37+
let accept = tokio::spawn({
38+
let srv = Arc::clone(&srv);
39+
async move {
40+
loop {
41+
if let Err(err) = srv.accept(&lis).await {
42+
error!(?err, "failed to accept Unix connection");
43+
}
44+
}
45+
}
46+
});
47+
48+
let invocations =
49+
wrpc_wasi_keyvalue::serve(srv.as_ref(), wrpc_wasi_keyvalue_mem::Handler::default())
50+
.await
51+
.context("failed to serve `wasi:keyvalue`")?;
52+
// NOTE: This will conflate all invocation streams into a single stream via `futures::stream::SelectAll`,
53+
// to customize this, iterate over the returned `invocations` and set up custom handling per export
54+
let mut invocations = select_all(
55+
invocations
56+
.into_iter()
57+
.map(|(instance, name, invocations)| invocations.map(move |res| (instance, name, res))),
58+
);
59+
let shutdown = signal::ctrl_c();
60+
let mut shutdown = pin!(shutdown);
61+
let mut tasks = JoinSet::new();
62+
loop {
63+
select! {
64+
Some((instance, name, res)) = invocations.next() => {
65+
match res {
66+
Ok(fut) => {
67+
debug!(instance, name, "invocation accepted");
68+
tasks.spawn(async move {
69+
if let Err(err) = fut.await {
70+
warn!(?err, "failed to handle invocation");
71+
} else {
72+
info!(instance, name, "invocation successfully handled");
73+
}
74+
});
75+
}
76+
Err(err) => {
77+
warn!(?err, instance, name, "failed to accept invocation");
78+
}
79+
}
80+
}
81+
Some(res) = tasks.join_next() => {
82+
if let Err(err) = res {
83+
error!(?err, "failed to join task")
84+
}
85+
}
86+
res = &mut shutdown => {
87+
accept.abort();
88+
// wait for all invocations to complete
89+
while let Some(res) = tasks.join_next().await {
90+
if let Err(err) = res {
91+
error!(?err, "failed to join task")
92+
}
93+
}
94+
return res.context("failed to listen for ^C")
95+
}
96+
}
97+
}
98+
}

examples/web/rust/src/main.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -248,10 +248,7 @@ impl<C: Send + Sync> store::Handler<C> for Handler {
248248
}
249249
}
250250
#[cfg(unix)]
251-
"wrpc+uds" => {
252-
if url.set_scheme("file").is_err() {
253-
return Ok(Err(store::Error::Other("invalid URL".to_string())));
254-
}
251+
"wrpc+unix" => {
255252
let Ok(path) = url.to_file_path() else {
256253
return Ok(Err(store::Error::Other(
257254
"failed to get filesystem path from URL".to_string(),

0 commit comments

Comments
 (0)