Skip to content

Commit 5774e2a

Browse files
committed
chore(examples): use consistent serving method
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
1 parent b185523 commit 5774e2a

6 files changed

Lines changed: 147 additions & 56 deletions

File tree

examples/rust/hello-nats-server/src/main.rs

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ use core::pin::pin;
33
use anyhow::Context as _;
44
use clap::Parser;
55
use futures::stream::select_all;
6-
use futures::{StreamExt as _, TryStreamExt as _};
6+
use futures::StreamExt as _;
7+
use tokio::task::JoinSet;
78
use tokio::{select, signal};
8-
use tracing::{info, warn};
9+
use tracing::{debug, error, info, warn};
910
use url::Url;
1011

1112
mod bindings {
@@ -60,28 +61,45 @@ async fn main() -> anyhow::Result<()> {
6061
.context("failed to serve `wrpc-examples.hello/handler.hello`")?;
6162
// NOTE: This will conflate all invocation streams into a single stream via `futures::stream::SelectAll`,
6263
// to customize this, iterate over the returned `invocations` and set up custom handling per export
63-
let mut invocations = select_all(invocations.into_iter().map(
64-
|(instance, name, invocations)| {
65-
invocations
66-
.try_buffer_unordered(16) // handle up to 16 invocations concurrently
67-
.map(move |res| (instance, name, res))
68-
},
69-
));
64+
let mut invocations = select_all(
65+
invocations
66+
.into_iter()
67+
.map(|(instance, name, invocations)| invocations.map(move |res| (instance, name, res))),
68+
);
7069
let shutdown = signal::ctrl_c();
7170
let mut shutdown = pin!(shutdown);
71+
let mut tasks = JoinSet::new();
7272
loop {
7373
select! {
7474
Some((instance, name, res)) = invocations.next() => {
7575
match res {
76-
Ok(()) => {
77-
info!(instance, name, "invocation successfully handled");
76+
Ok(fut) => {
77+
debug!(instance, name, "invocation accepted");
78+
tasks.spawn(async move {
79+
if let Err(err) = fut.await {
80+
warn!(?err, "failed to handle invocation");
81+
} else {
82+
info!(instance, name, "invocation successfully handled");
83+
}
84+
});
7885
}
7986
Err(err) => {
8087
warn!(?err, instance, name, "failed to accept invocation");
8188
}
8289
}
8390
}
91+
Some(res) = tasks.join_next() => {
92+
if let Err(err) = res {
93+
error!(?err, "failed to join task")
94+
}
95+
}
8496
res = &mut shutdown => {
97+
// wait for all invocations to complete
98+
while let Some(res) = tasks.join_next().await {
99+
if let Err(err) = res {
100+
error!(?err, "failed to join task")
101+
}
102+
}
85103
return res.context("failed to listen for ^C")
86104
}
87105
}

examples/rust/hello-tcp-server/src/main.rs

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@ use std::sync::Arc;
66
use anyhow::Context as _;
77
use clap::Parser;
88
use futures::stream::select_all;
9-
use futures::{StreamExt as _, TryStreamExt as _};
9+
use futures::StreamExt as _;
10+
use tokio::task::JoinSet;
1011
use tokio::{select, signal};
11-
use tracing::{error, info, warn};
12+
use tracing::{debug, error, info, warn};
1213

1314
mod bindings {
1415
wit_bindgen_wrpc::generate!({
@@ -61,29 +62,46 @@ async fn main() -> anyhow::Result<()> {
6162
.context("failed to serve `wrpc-examples.hello/handler.hello`")?;
6263
// NOTE: This will conflate all invocation streams into a single stream via `futures::stream::SelectAll`,
6364
// to customize this, iterate over the returned `invocations` and set up custom handling per export
64-
let mut invocations = select_all(invocations.into_iter().map(
65-
|(instance, name, invocations)| {
66-
invocations
67-
.try_buffer_unordered(16) // handle up to 16 invocations concurrently
68-
.map(move |res| (instance, name, res))
69-
},
70-
));
65+
let mut invocations = select_all(
66+
invocations
67+
.into_iter()
68+
.map(|(instance, name, invocations)| invocations.map(move |res| (instance, name, res))),
69+
);
7170
let shutdown = signal::ctrl_c();
7271
let mut shutdown = pin!(shutdown);
72+
let mut tasks = JoinSet::new();
7373
loop {
7474
select! {
7575
Some((instance, name, res)) = invocations.next() => {
7676
match res {
77-
Ok(()) => {
78-
info!(instance, name, "invocation successfully handled");
77+
Ok(fut) => {
78+
debug!(instance, name, "invocation accepted");
79+
tasks.spawn(async move {
80+
if let Err(err) = fut.await {
81+
warn!(?err, "failed to handle invocation");
82+
} else {
83+
info!(instance, name, "invocation successfully handled");
84+
}
85+
});
7986
}
8087
Err(err) => {
8188
warn!(?err, instance, name, "failed to accept invocation");
8289
}
8390
}
8491
}
92+
Some(res) = tasks.join_next() => {
93+
if let Err(err) = res {
94+
error!(?err, "failed to join task")
95+
}
96+
}
8597
res = &mut shutdown => {
8698
accept.abort();
99+
// wait for all invocations to complete
100+
while let Some(res) = tasks.join_next().await {
101+
if let Err(err) = res {
102+
error!(?err, "failed to join task")
103+
}
104+
}
87105
return res.context("failed to listen for ^C")
88106
}
89107
}

examples/rust/hello-web-server/src/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,9 @@ async fn main() -> anyhow::Result<()> {
105105
.into_iter()
106106
.map(|(instance, name, invocations)| invocations.map(move |res| (instance, name, res))),
107107
);
108-
let mut tasks = JoinSet::new();
109108
let shutdown = signal::ctrl_c();
110109
let mut shutdown = pin!(shutdown);
110+
let mut tasks = JoinSet::new();
111111
loop {
112112
select! {
113113
Some((instance, name, res)) = invocations.next() => {
@@ -134,6 +134,7 @@ async fn main() -> anyhow::Result<()> {
134134
}
135135
res = &mut shutdown => {
136136
accept.abort();
137+
// wait for all invocations to complete
137138
while let Some(res) = tasks.join_next().await {
138139
if let Err(err) = res {
139140
error!(?err, "failed to join task")

examples/rust/streams-nats-server/src/main.rs

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ use anyhow::Context as _;
44
use bytes::Bytes;
55
use clap::Parser;
66
use futures::stream::select_all;
7-
use futures::{Stream, StreamExt as _, TryStreamExt as _};
7+
use futures::{Stream, StreamExt as _};
8+
use tokio::task::JoinSet;
89
use tokio::{select, signal};
9-
use tracing::{info, warn};
10+
use tracing::{debug, error, info, warn};
1011
use url::Url;
1112

1213
mod bindings {
@@ -70,28 +71,45 @@ async fn main() -> anyhow::Result<()> {
7071
.context("failed to serve `wrpc-examples:streams/handler.echo`")?;
7172
// NOTE: This will conflate all invocation streams into a single stream via `futures::stream::SelectAll`,
7273
// to customize this, iterate over the returned `invocations` and set up custom handling per export
73-
let mut invocations = select_all(invocations.into_iter().map(
74-
|(instance, name, invocations)| {
75-
invocations
76-
.try_buffer_unordered(16) // handle up to 16 invocations concurrently
77-
.map(move |res| (instance, name, res))
78-
},
79-
));
74+
let mut invocations = select_all(
75+
invocations
76+
.into_iter()
77+
.map(|(instance, name, invocations)| invocations.map(move |res| (instance, name, res))),
78+
);
8079
let shutdown = signal::ctrl_c();
8180
let mut shutdown = pin!(shutdown);
81+
let mut tasks = JoinSet::new();
8282
loop {
8383
select! {
8484
Some((instance, name, res)) = invocations.next() => {
8585
match res {
86-
Ok(()) => {
87-
info!(instance, name, "invocation successfully handled");
86+
Ok(fut) => {
87+
debug!(instance, name, "invocation accepted");
88+
tasks.spawn(async move {
89+
if let Err(err) = fut.await {
90+
warn!(?err, "failed to handle invocation");
91+
} else {
92+
info!(instance, name, "invocation successfully handled");
93+
}
94+
});
8895
}
8996
Err(err) => {
9097
warn!(?err, instance, name, "failed to accept invocation");
9198
}
9299
}
93100
}
101+
Some(res) = tasks.join_next() => {
102+
if let Err(err) = res {
103+
error!(?err, "failed to join task")
104+
}
105+
}
94106
res = &mut shutdown => {
107+
// wait for all invocations to complete
108+
while let Some(res) = tasks.join_next().await {
109+
if let Err(err) = res {
110+
error!(?err, "failed to join task")
111+
}
112+
}
95113
return res.context("failed to listen for ^C")
96114
}
97115
}

examples/rust/wasi-keyvalue-nats-server/src/main.rs

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ use core::pin::pin;
33
use anyhow::Context as _;
44
use clap::Parser;
55
use futures::stream::select_all;
6-
use futures::{StreamExt as _, TryStreamExt as _};
6+
use futures::StreamExt as _;
7+
use tokio::task::JoinSet;
78
use tokio::{select, signal};
8-
use tracing::{info, warn};
9+
use tracing::{debug, error, info, warn};
910
use url::Url;
1011

1112
#[derive(Parser, Debug)]
@@ -41,28 +42,45 @@ async fn main() -> anyhow::Result<()> {
4142
.context("failed to serve `wasi:keyvalue`")?;
4243
// NOTE: This will conflate all invocation streams into a single stream via `futures::stream::SelectAll`,
4344
// to customize this, iterate over the returned `invocations` and set up custom handling per export
44-
let mut invocations = select_all(invocations.into_iter().map(
45-
|(instance, name, invocations)| {
46-
invocations
47-
.try_buffer_unordered(16) // handle up to 16 invocations concurrently
48-
.map(move |res| (instance, name, res))
49-
},
50-
));
45+
let mut invocations = select_all(
46+
invocations
47+
.into_iter()
48+
.map(|(instance, name, invocations)| invocations.map(move |res| (instance, name, res))),
49+
);
5150
let shutdown = signal::ctrl_c();
5251
let mut shutdown = pin!(shutdown);
52+
let mut tasks = JoinSet::new();
5353
loop {
5454
select! {
5555
Some((instance, name, res)) = invocations.next() => {
5656
match res {
57-
Ok(()) => {
58-
info!(instance, name, "invocation successfully handled");
57+
Ok(fut) => {
58+
debug!(instance, name, "invocation accepted");
59+
tasks.spawn(async move {
60+
if let Err(err) = fut.await {
61+
warn!(?err, "failed to handle invocation");
62+
} else {
63+
info!(instance, name, "invocation successfully handled");
64+
}
65+
});
5966
}
6067
Err(err) => {
6168
warn!(?err, instance, name, "failed to accept invocation");
6269
}
6370
}
6471
}
72+
Some(res) = tasks.join_next() => {
73+
if let Err(err) = res {
74+
error!(?err, "failed to join task")
75+
}
76+
}
6577
res = &mut shutdown => {
78+
// wait for all invocations to complete
79+
while let Some(res) = tasks.join_next().await {
80+
if let Err(err) = res {
81+
error!(?err, "failed to join task")
82+
}
83+
}
6684
return res.context("failed to listen for ^C")
6785
}
6886
}

examples/rust/wasi-keyvalue-tcp-server/src/main.rs

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ use std::sync::Arc;
44
use anyhow::Context as _;
55
use clap::Parser;
66
use futures::stream::select_all;
7-
use futures::{StreamExt as _, TryStreamExt as _};
7+
use futures::StreamExt as _;
8+
use tokio::task::JoinSet;
89
use tokio::{select, signal};
9-
use tracing::{error, info, warn};
10+
use tracing::{debug, error, info, warn};
1011

1112
#[derive(Parser, Debug)]
1213
#[command(author, version, about, long_about = None)]
@@ -42,29 +43,46 @@ async fn main() -> anyhow::Result<()> {
4243
.context("failed to serve `wasi:keyvalue`")?;
4344
// NOTE: This will conflate all invocation streams into a single stream via `futures::stream::SelectAll`,
4445
// to customize this, iterate over the returned `invocations` and set up custom handling per export
45-
let mut invocations = select_all(invocations.into_iter().map(
46-
|(instance, name, invocations)| {
47-
invocations
48-
.try_buffer_unordered(16) // handle up to 16 invocations concurrently
49-
.map(move |res| (instance, name, res))
50-
},
51-
));
46+
let mut invocations = select_all(
47+
invocations
48+
.into_iter()
49+
.map(|(instance, name, invocations)| invocations.map(move |res| (instance, name, res))),
50+
);
5251
let shutdown = signal::ctrl_c();
5352
let mut shutdown = pin!(shutdown);
53+
let mut tasks = JoinSet::new();
5454
loop {
5555
select! {
5656
Some((instance, name, res)) = invocations.next() => {
5757
match res {
58-
Ok(()) => {
59-
info!(instance, name, "invocation successfully handled");
58+
Ok(fut) => {
59+
debug!(instance, name, "invocation accepted");
60+
tasks.spawn(async move {
61+
if let Err(err) = fut.await {
62+
warn!(?err, "failed to handle invocation");
63+
} else {
64+
info!(instance, name, "invocation successfully handled");
65+
}
66+
});
6067
}
6168
Err(err) => {
6269
warn!(?err, instance, name, "failed to accept invocation");
6370
}
6471
}
6572
}
73+
Some(res) = tasks.join_next() => {
74+
if let Err(err) = res {
75+
error!(?err, "failed to join task")
76+
}
77+
}
6678
res = &mut shutdown => {
6779
accept.abort();
80+
// wait for all invocations to complete
81+
while let Some(res) = tasks.join_next().await {
82+
if let Err(err) = res {
83+
error!(?err, "failed to join task")
84+
}
85+
}
6886
return res.context("failed to listen for ^C")
6987
}
7088
}

0 commit comments

Comments
 (0)