Skip to content

Commit cdc4cdd

Browse files
committed
feat(examples): add QUIC KV server
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
1 parent a00f4a7 commit cdc4cdd

7 files changed

Lines changed: 257 additions & 27 deletions

File tree

Cargo.lock

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
[package]
2+
name = "wasi-keyvalue-quic-server"
3+
version = "0.1.0"
4+
5+
authors.workspace = true
6+
categories.workspace = true
7+
edition.workspace = true
8+
homepage.workspace = true
9+
license.workspace = true
10+
repository.workspace = true
11+
12+
[dependencies]
13+
anyhow = { workspace = true, features = ["std"] }
14+
bytes = { workspace = true }
15+
clap = { workspace = true, features = [
16+
"color",
17+
"derive",
18+
"error-context",
19+
"help",
20+
"std",
21+
"suggestions",
22+
"usage",
23+
] }
24+
futures = { workspace = true }
25+
quinn = { workspace = true, features = [
26+
"log",
27+
"platform-verifier",
28+
"ring",
29+
"runtime-tokio",
30+
"rustls",
31+
] }
32+
rcgen = { workspace = true, features = ["crypto", "ring", "zeroize"] }
33+
rustls = { workspace = true }
34+
tokio = { workspace = true, features = ["rt-multi-thread", "signal"] }
35+
tracing = { workspace = true }
36+
tracing-subscriber = { workspace = true, features = [
37+
"ansi",
38+
"env-filter",
39+
"fmt",
40+
] }
41+
wrpc-transport = { workspace = true }
42+
wrpc-transport-quic = { workspace = true }
43+
wrpc-wasi-keyvalue = { workspace = true }
44+
wrpc-wasi-keyvalue-mem = { workspace = true }
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
use core::net::SocketAddr;
2+
use core::pin::pin;
3+
4+
use std::sync::Arc;
5+
6+
use anyhow::Context as _;
7+
use clap::Parser;
8+
use futures::stream::select_all;
9+
use futures::StreamExt as _;
10+
use quinn::crypto::rustls::QuicServerConfig;
11+
use quinn::{Endpoint, ServerConfig};
12+
use rcgen::{generate_simple_self_signed, CertifiedKey};
13+
use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};
14+
use rustls::version::TLS13;
15+
use tokio::task::JoinSet;
16+
use tokio::{select, signal};
17+
use tracing::{debug, error, info, warn};
18+
19+
#[derive(Parser, Debug)]
20+
#[command(author, version, about, long_about = None)]
21+
struct Args {
22+
/// Address to serve `wasi:keyvalue` on
23+
#[arg(default_value = "[::1]:4433")]
24+
addr: SocketAddr,
25+
}
26+
27+
#[tokio::main]
28+
async fn main() -> anyhow::Result<()> {
29+
tracing_subscriber::fmt().init();
30+
31+
let Args { addr } = Args::parse();
32+
33+
let CertifiedKey { cert, key_pair } = generate_simple_self_signed([
34+
"localhost".to_string(),
35+
"::1".to_string(),
36+
"127.0.0.1".to_string(),
37+
])
38+
.context("failed to generate server certificate")?;
39+
let cert = CertificateDer::from(cert);
40+
41+
let conf = rustls::ServerConfig::builder_with_protocol_versions(&[&TLS13])
42+
.with_no_client_auth() // TODO: verify client cert
43+
.with_single_cert(
44+
vec![cert],
45+
PrivatePkcs8KeyDer::from(key_pair.serialize_der()).into(),
46+
)
47+
.context("failed to create server config")?;
48+
49+
let conf: QuicServerConfig = conf
50+
.try_into()
51+
.context("failed to convert rustls client config to QUIC server config")?;
52+
53+
let ep = Endpoint::server(ServerConfig::with_crypto(Arc::new(conf)), addr)
54+
.context("failed to create server endpoint")?;
55+
56+
let srv = Arc::new(wrpc_transport_quic::Server::new());
57+
let accept = tokio::spawn({
58+
let mut tasks = JoinSet::<anyhow::Result<()>>::new();
59+
let srv = Arc::clone(&srv);
60+
async move {
61+
loop {
62+
select! {
63+
Some(conn) = ep.accept() => {
64+
let srv = Arc::clone(&srv);
65+
tasks.spawn(async move {
66+
let conn = conn
67+
.accept()
68+
.context("failed to accept QUIC connection")?;
69+
let conn = conn.await.context("failed to establish QUIC connection")?;
70+
let wrpc = wrpc_transport_quic::Client::from(conn);
71+
loop {
72+
srv.accept(&wrpc)
73+
.await
74+
.context("failed to accept wRPC connection")?;
75+
}
76+
});
77+
}
78+
Some(res) = tasks.join_next() => {
79+
match res {
80+
Ok(Ok(())) => {}
81+
Ok(Err(err)) => {
82+
warn!(?err, "failed to serve connection")
83+
}
84+
Err(err) => {
85+
error!(?err, "failed to join task")
86+
}
87+
}
88+
}
89+
else => {
90+
return;
91+
}
92+
}
93+
}
94+
}
95+
});
96+
97+
let invocations =
98+
wrpc_wasi_keyvalue::serve(srv.as_ref(), wrpc_wasi_keyvalue_mem::Handler::default())
99+
.await
100+
.context("failed to serve `wasi:keyvalue`")?;
101+
// NOTE: This will conflate all invocation streams into a single stream via `futures::stream::SelectAll`,
102+
// to customize this, iterate over the returned `invocations` and set up custom handling per export
103+
let mut invocations = select_all(
104+
invocations
105+
.into_iter()
106+
.map(|(instance, name, invocations)| invocations.map(move |res| (instance, name, res))),
107+
);
108+
let shutdown = signal::ctrl_c();
109+
let mut shutdown = pin!(shutdown);
110+
let mut tasks = JoinSet::new();
111+
loop {
112+
select! {
113+
Some((instance, name, res)) = invocations.next() => {
114+
match res {
115+
Ok(fut) => {
116+
debug!(instance, name, "invocation accepted");
117+
tasks.spawn(async move {
118+
if let Err(err) = fut.await {
119+
warn!(?err, "failed to handle invocation");
120+
} else {
121+
info!(instance, name, "invocation successfully handled");
122+
}
123+
});
124+
}
125+
Err(err) => {
126+
warn!(?err, instance, name, "failed to accept invocation");
127+
}
128+
}
129+
}
130+
Some(res) = tasks.join_next() => {
131+
if let Err(err) = res {
132+
error!(?err, "failed to join task")
133+
}
134+
}
135+
res = &mut shutdown => {
136+
accept.abort();
137+
// wait for all invocations to complete
138+
while let Some(res) = tasks.join_next().await {
139+
if let Err(err) = res {
140+
error!(?err, "failed to join task")
141+
}
142+
}
143+
return res.context("failed to listen for ^C")
144+
}
145+
}
146+
}
147+
}

examples/rust/wasi-keyvalue-tcp-server/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use core::pin::pin;
2+
23
use std::sync::Arc;
34

45
use anyhow::Context as _;

examples/rust/wasi-keyvalue-unix-server/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use core::pin::pin;
2+
23
use std::path::PathBuf;
34
use std::sync::Arc;
45

examples/web/rust/src/main.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -212,9 +212,8 @@ impl<C: Send + Sync> store::Handler<C> for Handler {
212212
Ok(ep) => ep,
213213
Err(err) => return Ok(Err(store::Error::Other(format!("{err:#}")))),
214214
};
215-
let Some(mut san) = url.path().strip_prefix('/') else {
216-
return Ok(Err(store::Error::Other("invalid URL".to_string())));
217-
};
215+
let san = url.path();
216+
let mut san = san.strip_prefix('/').unwrap_or(san);
218217
if san.is_empty() {
219218
san = "localhost"
220219
}

examples/web/ui/index.html

Lines changed: 42 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -112,14 +112,14 @@
112112
}
113113
identifier = url;
114114
} else if (conn === 'quic') {
115-
let addr = obj.addr;
115+
let addr = obj['quic-addr'];
116116
if (addr == null || addr === '') {
117117
dbg.error('QUIC address must be set');
118118
return
119119
}
120-
identifier = 'wrpc+quic://' + identifier;
120+
identifier = 'wrpc+quic://' + addr;
121121

122-
let bucket = obj.bucket;
122+
let bucket = obj['quic-bucket'];
123123
if (bucket != null && bucket != '') {
124124
identifier = identifier + ';' + bucket;
125125
}
@@ -131,7 +131,7 @@
131131
}
132132
identifier = 'wrpc+tcp://' + addr;
133133

134-
let bucket = obj.bucket;
134+
let bucket = obj['tcp-bucket'];
135135
if (bucket != null && bucket != '') {
136136
identifier = identifier + ';' + bucket;
137137
}
@@ -148,14 +148,14 @@
148148
identifier = identifier + ';' + bucket;
149149
}
150150
} else if (conn === 'web') {
151-
let addr = obj.addr;
151+
let addr = obj['web-addr'];
152152
if (addr == null || addr === '') {
153153
dbg.error('WebTransport address must be set');
154154
return
155155
}
156156
identifier = 'wrpc+web://' + addr;
157157

158-
let bucket = obj.bucket;
158+
let bucket = obj['web-bucket'];
159159
if (bucket != null && bucket != '') {
160160
identifier = identifier + ';' + bucket;
161161
}
@@ -205,6 +205,10 @@
205205
}
206206

207207
async function handleGet() {
208+
/** @type {HTMLInputElement | null} */
209+
const getValue = document.querySelector('#get input[name="get-value"]');
210+
if (getValue) getValue.value = null;
211+
208212
if (!transport) {
209213
dbg.error('transport not connected')
210214
return
@@ -260,8 +264,6 @@
260264

261265
dbg.info(`got value from bucket: ${bucketName}`, JSON.stringify(value, null, 2));
262266

263-
/** @type {HTMLInputElement | null} */
264-
const getValue = document.querySelector('#get input[name="get-value"]');
265267
if (getValue) getValue.value = value;
266268
}
267269

@@ -423,22 +425,7 @@ <h1 class="title">wRPC Transports</h1>
423425
<form id="settings">
424426
<div id="template-output"></div>
425427

426-
<template class="form-fields" data-option="default">
427-
<div class="field">
428-
<label class="label">Bucket identifier</label>
429-
<div class="control">
430-
<input class="input" type="text" name="bucket" />
431-
</div>
432-
</div>
433-
<div class="field">
434-
<label class="label">Target address</label>
435-
<div class="control">
436-
<input class="input" type="text" name="addr" placeholder="localhost:1234" />
437-
</div>
438-
</div>
439-
</template>
440-
441-
<template class="form-fields" data-option="mem"></template>
428+
<template class="form-fields" data-option="default"></template>
442429

443430
<template class="form-fields" data-option="redis">
444431
<div class="field">
@@ -472,6 +459,21 @@ <h1 class="title">wRPC Transports</h1>
472459
</div>
473460
</template>
474461

462+
<template class="form-fields" data-option="quic">
463+
<div class="field">
464+
<label class="label">Bucket identifier</label>
465+
<div class="control">
466+
<input class="input" type="text" name="quic-bucket" />
467+
</div>
468+
</div>
469+
<div class="field">
470+
<label class="label">QUIC socket address</label>
471+
<div class="control">
472+
<input class="input" type="text" name="quic-addr" placeholder="[::1]:4433" value="[::1]:4433" />
473+
</div>
474+
</div>
475+
</template>
476+
475477
<template class="form-fields" data-option="tcp">
476478
<div class="field">
477479
<label class="label">Bucket identifier</label>
@@ -502,6 +504,22 @@ <h1 class="title">wRPC Transports</h1>
502504
</div>
503505
</div>
504506
</template>
507+
508+
<template class="form-fields" data-option="web">
509+
<div class="field">
510+
<label class="label">Bucket identifier</label>
511+
<div class="control">
512+
<input class="input" type="text" name="web-bucket" />
513+
</div>
514+
</div>
515+
<div class="field">
516+
<label class="label">WebTransport address</label>
517+
<div class="control">
518+
<input class="input" type="text" name="web-addr" placeholder="localhost:4433"
519+
value="localhost:4433" />
520+
</div>
521+
</div>
522+
</template>
505523
</form>
506524
</section>
507525

0 commit comments

Comments
 (0)