Skip to content

Commit 453dbd6

Browse files
committed
feat: add WebTransport keyvalue server example
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
1 parent 353dfb7 commit 453dbd6

File tree

5 files changed

+194
-9
lines changed

5 files changed

+194
-9
lines changed

Cargo.lock

Lines changed: 18 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
[package]
2+
name = "wasi-keyvalue-web-server"
3+
version = "0.1.0"
4+
5+
authors.workspace = true
6+
categories.workspace = true
7+
edition.workspace = true
8+
homepage.workspace = true
9+
license.workspace = true
10+
repository.workspace = true
11+
12+
[dependencies]
13+
anyhow = { workspace = true, features = ["std"] }
14+
bytes = { workspace = true }
15+
clap = { workspace = true, features = [
16+
"color",
17+
"derive",
18+
"error-context",
19+
"help",
20+
"std",
21+
"suggestions",
22+
"usage",
23+
] }
24+
futures = { workspace = true }
25+
tokio = { workspace = true, features = ["rt-multi-thread", "signal"] }
26+
tracing = { workspace = true }
27+
tracing-subscriber = { workspace = true, features = [
28+
"ansi",
29+
"env-filter",
30+
"fmt",
31+
] }
32+
wrpc-transport = { workspace = true }
33+
wrpc-transport-web = { workspace = true }
34+
wrpc-wasi-keyvalue = { workspace = true }
35+
wrpc-wasi-keyvalue-mem = { workspace = true }
36+
wtransport = { workspace = true, features = ["self-signed"] }
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
use core::net::SocketAddr;
2+
use core::pin::pin;
3+
use core::time::Duration;
4+
5+
use std::sync::Arc;
6+
7+
use anyhow::Context as _;
8+
use clap::Parser;
9+
use futures::stream::select_all;
10+
use futures::StreamExt as _;
11+
use tokio::task::JoinSet;
12+
use tokio::{select, signal};
13+
use tracing::{debug, error, info, warn};
14+
use wtransport::{Endpoint, Identity, ServerConfig};
15+
16+
#[derive(Parser, Debug)]
17+
#[command(author, version, about, long_about = None)]
18+
struct Args {
19+
/// Address to serve `wasi:keyvalue` on
20+
#[arg(default_value = "[::1]:4433")]
21+
addr: SocketAddr,
22+
}
23+
24+
#[tokio::main]
25+
async fn main() -> anyhow::Result<()> {
26+
tracing_subscriber::fmt().init();
27+
28+
let Args { addr } = Args::parse();
29+
30+
let id = Identity::self_signed(["localhost", "127.0.0.1", "::1"])
31+
.context("failed to generate server certificate")?;
32+
33+
let ep = Endpoint::server(
34+
ServerConfig::builder()
35+
.with_bind_address(addr)
36+
.with_identity(id)
37+
.keep_alive_interval(Some(Duration::from_secs(3)))
38+
.build(),
39+
)
40+
.context("failed to create server endpoint")?;
41+
42+
let srv = Arc::new(wrpc_transport_web::Server::new());
43+
let accept = tokio::spawn({
44+
let mut tasks = JoinSet::<anyhow::Result<()>>::new();
45+
let srv = Arc::clone(&srv);
46+
async move {
47+
loop {
48+
select! {
49+
conn = ep.accept() => {
50+
let srv = Arc::clone(&srv);
51+
tasks.spawn(async move {
52+
let req = conn
53+
.await
54+
.context("failed to accept WebTransport connection")?;
55+
let conn = req
56+
.accept()
57+
.await
58+
.context("failed to establish WebTransport connection")?;
59+
let wrpc = wrpc_transport_web::Client::from(conn);
60+
loop {
61+
srv.accept(&wrpc)
62+
.await
63+
.context("failed to accept wRPC connection")?;
64+
}
65+
});
66+
}
67+
Some(res) = tasks.join_next() => {
68+
match res {
69+
Ok(Ok(())) => {}
70+
Ok(Err(err)) => {
71+
warn!(?err, "failed to serve connection")
72+
}
73+
Err(err) => {
74+
error!(?err, "failed to join task")
75+
}
76+
}
77+
}
78+
}
79+
}
80+
}
81+
});
82+
83+
let invocations =
84+
wrpc_wasi_keyvalue::serve(srv.as_ref(), wrpc_wasi_keyvalue_mem::Handler::default())
85+
.await
86+
.context("failed to serve `wasi:keyvalue`")?;
87+
// NOTE: This will conflate all invocation streams into a single stream via `futures::stream::SelectAll`,
88+
// to customize this, iterate over the returned `invocations` and set up custom handling per export
89+
let mut invocations = select_all(
90+
invocations
91+
.into_iter()
92+
.map(|(instance, name, invocations)| invocations.map(move |res| (instance, name, res))),
93+
);
94+
let shutdown = signal::ctrl_c();
95+
let mut shutdown = pin!(shutdown);
96+
let mut tasks = JoinSet::new();
97+
loop {
98+
select! {
99+
Some((instance, name, res)) = invocations.next() => {
100+
match res {
101+
Ok(fut) => {
102+
debug!(instance, name, "invocation accepted");
103+
tasks.spawn(async move {
104+
if let Err(err) = fut.await {
105+
warn!(?err, "failed to handle invocation");
106+
} else {
107+
info!(instance, name, "invocation successfully handled");
108+
}
109+
});
110+
}
111+
Err(err) => {
112+
warn!(?err, instance, name, "failed to accept invocation");
113+
}
114+
}
115+
}
116+
Some(res) = tasks.join_next() => {
117+
if let Err(err) = res {
118+
error!(?err, "failed to join task")
119+
}
120+
}
121+
res = &mut shutdown => {
122+
accept.abort();
123+
// wait for all invocations to complete
124+
while let Some(res) = tasks.join_next().await {
125+
if let Err(err) = res {
126+
error!(?err, "failed to join task")
127+
}
128+
}
129+
return res.context("failed to listen for ^C")
130+
}
131+
}
132+
}
133+
}

examples/web/rust/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ quinn = { workspace = true, features = [
3131
"runtime-tokio",
3232
"rustls",
3333
] }
34-
rustls = { workspace = true }
34+
rustls = { workspace = true, features = ["ring"] }
3535
tokio = { workspace = true, features = [
3636
"macros",
3737
"rt-multi-thread",

examples/web/rust/src/main.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use url::Url;
3030
use uuid::Uuid;
3131
use wrpc_transport::{ResourceBorrow, ResourceOwn};
3232
use wrpc_wasi_keyvalue::exports::wasi::keyvalue::store;
33-
use wtransport::tls::Sha256DigestFmt;
33+
use wtransport::tls::{Sha256DigestFmt, WEBTRANSPORT_ALPN};
3434
use wtransport::{Endpoint, Identity, ServerConfig};
3535

3636
pub type Result<T, E = store::Error> = core::result::Result<T, E>;
@@ -164,7 +164,7 @@ impl<C: Send + Sync> store::Handler<C> for Handler {
164164
return Ok(Ok(ResourceOwn::from(id)));
165165
}
166166
let (url, suffix) = identifier.split_once(';').unwrap_or((&identifier, ""));
167-
let mut url = match Url::parse(url) {
167+
let url = match Url::parse(url) {
168168
Ok(url) => url,
169169
Err(err) => return Ok(Err(store::Error::Other(err.to_string()))),
170170
};
@@ -260,14 +260,12 @@ impl<C: Send + Sync> store::Handler<C> for Handler {
260260
}
261261
}
262262
"wrpc+web" => {
263-
if url.set_scheme("https").is_err() {
264-
return Ok(Err(store::Error::Other("invalid URL".to_string())));
265-
}
266-
263+
let mut conf = client_tls_config();
264+
conf.alpn_protocols.push(WEBTRANSPORT_ALPN.to_vec());
267265
let ep = match wtransport::Endpoint::client(
268266
wtransport::ClientConfig::builder()
269267
.with_bind_default()
270-
.with_custom_tls(client_tls_config())
268+
.with_custom_tls(conf)
271269
.build(),
272270
)
273271
.context("failed to create WebTransport endpoint")
@@ -276,7 +274,7 @@ impl<C: Send + Sync> store::Handler<C> for Handler {
276274
Err(err) => return Ok(Err(store::Error::Other(format!("{err:#}")))),
277275
};
278276
let conn = match ep
279-
.connect(url)
277+
.connect(format!("https://{}", url.authority()))
280278
.await
281279
.context("failed to establish connection")
282280
{

0 commit comments

Comments
 (0)