Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![cfg_attr(not(feature = "std"), no_std)]
#![allow(stable_features, unused_features)]
#![feature(allocator_api)]
#![feature(new_range_api)]
#![feature(fn_traits)]
#![cfg_attr(feature = "std", feature(layout_for_ptr))]
#![feature(negative_impls)]
Expand Down
8 changes: 7 additions & 1 deletion common/src/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,10 @@ pub use ring::{RingData, RingHeader};
pub use ring_consumer::RingConsumer;
pub use ring_producer::RingProducer;
pub use shared_memory_region::{RawSharedMemoryRegion, SharedMemoryRegion};
pub use traits::{Read, Write};
pub use traits::{DoorBell, Read, Write};

#[cfg(feature = "std")]
pub use traits::DoorBellWaiter;

#[cfg(test)]
pub(crate) use bidirectional_pipe::test_utils;
140 changes: 109 additions & 31 deletions common/src/pipe/bidirectional_pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::pipe::ring::{RingData, RingHeader};
use crate::pipe::ring_consumer::RingConsumer;
use crate::pipe::ring_producer::RingProducer;
use crate::pipe::shared_memory_region::SharedMemoryRegion;
use crate::pipe::traits::{self, OwnedSplit};
use crate::pipe::traits::{self, DoorBell, OwnedSplit};

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Side {
Expand All @@ -22,24 +22,28 @@ pub enum Side {
/// whenever `R: Send`.
///
/// Memory layout: `[HeaderA][Ring A->B data][HeaderB][Ring B->A data]`.
pub struct BidirectionalPipe<R: SharedMemoryRegion> {
pub struct BidirectionalPipe<R: SharedMemoryRegion, D: DoorBell> {
writer: RingProducer<R>,
reader: RingConsumer<R>,
read_available_doorbell: D,
write_available_doorbell: D,
}

#[allow(unused)]
pub struct BidirectionalPipeWriteEnd<R: SharedMemoryRegion> {
pub struct BidirectionalPipeWriteEnd<R: SharedMemoryRegion, D: DoorBell> {
writer: RingProducer<R>,
read_available_doorbell: D,
}

#[allow(unused)]
pub struct BidirectionalPipeReadEnd<R: SharedMemoryRegion> {
pub struct BidirectionalPipeReadEnd<R: SharedMemoryRegion, D: DoorBell> {
reader: RingConsumer<R>,
write_available_doorbell: D,
}

const HEADER_SIZE: u64 = core::mem::size_of::<RingHeader>() as u64;

impl<R: SharedMemoryRegion> BidirectionalPipe<R> {
impl<R: SharedMemoryRegion, D: DoorBell> BidirectionalPipe<R, D> {
/// Total bytes of shared memory needed for a given `ring_size`.
pub const fn required_size(ring_size: u64) -> u64 {
2 * (HEADER_SIZE + ring_size)
Expand All @@ -51,7 +55,13 @@ impl<R: SharedMemoryRegion> BidirectionalPipe<R> {
/// each keep the mapping alive on their own. Caller must ensure the region
/// is zero-initialized before the first side is constructed, and that
/// exactly one `Side::A` and one `Side::B` are created per region.
pub fn new(region: R, ring_size: u64, side: Side) -> Self {
pub fn new(
region: R,
ring_size: u64,
side: Side,
read_available_doorbell: D,
write_available_doorbell: D,
) -> Self {
assert!(region.len() >= Self::required_size(ring_size));
assert!(
ring_size.is_multiple_of(core::mem::align_of::<RingHeader>() as u64),
Expand Down Expand Up @@ -82,7 +92,12 @@ impl<R: SharedMemoryRegion> BidirectionalPipe<R> {
let reader = RingConsumer::new(region, reader_header, unsafe {
RingData::new(reader_data, ring_size)
});
Self { writer, reader }
Self {
writer,
reader,
read_available_doorbell,
write_available_doorbell,
}
}

/// Consume the pipe and split it into independent, owned read and write
Expand All @@ -91,13 +106,20 @@ impl<R: SharedMemoryRegion> BidirectionalPipe<R> {
/// Each end already owns its own clone of the region handle, so the two can
/// be moved to separate threads / async tasks and each independently keeps
/// the shared mapping alive.
pub fn split(self) -> (BidirectionalPipeReadEnd<R>, BidirectionalPipeWriteEnd<R>) {
pub fn split(
self,
) -> (
BidirectionalPipeReadEnd<R, D>,
BidirectionalPipeWriteEnd<R, D>,
) {
(
BidirectionalPipeReadEnd {
reader: self.reader,
write_available_doorbell: self.write_available_doorbell,
},
BidirectionalPipeWriteEnd {
writer: self.writer,
read_available_doorbell: self.read_available_doorbell,
},
)
}
Expand Down Expand Up @@ -128,31 +150,49 @@ impl<R: SharedMemoryRegion> BidirectionalPipe<R> {
}
}

impl<R: SharedMemoryRegion> traits::Read for BidirectionalPipe<R> {
impl<R: SharedMemoryRegion, D: DoorBell> traits::Read for BidirectionalPipe<R, D> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, PipeError> {
self.reader.read(buf)
let res = self.reader.read(buf);
if let Ok(s) = res {
self.write_available_doorbell.ring();
if s > 0 {}
}
res
}
}

impl<R: SharedMemoryRegion> traits::Read for BidirectionalPipeReadEnd<R> {
impl<R: SharedMemoryRegion, D: DoorBell> traits::Read for BidirectionalPipeReadEnd<R, D> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, PipeError> {
self.reader.read(buf)
let res = self.reader.read(buf);
if let Ok(s) = res {
self.write_available_doorbell.ring();
if s > 0 {}
}
res
}
}

impl<R: SharedMemoryRegion> traits::Write for BidirectionalPipe<R> {
impl<R: SharedMemoryRegion, D: traits::DoorBell> traits::Write for BidirectionalPipe<R, D> {
fn write(&mut self, buf: &[u8]) -> Result<usize, PipeError> {
self.writer.write(buf)
let res = self.writer.write(buf);
if let Ok(s) = res {
if s > 0 {
self.read_available_doorbell.ring();
}
}
res
}
}

impl<R: SharedMemoryRegion> traits::Write for BidirectionalPipeWriteEnd<R> {
impl<R: SharedMemoryRegion, D: DoorBell> traits::Write for BidirectionalPipeWriteEnd<R, D> {
fn write(&mut self, buf: &[u8]) -> Result<usize, PipeError> {
self.writer.write(buf)
}
}

impl<R: SharedMemoryRegion + Send + 'static> OwnedSplit for BidirectionalPipe<R> {
impl<R: SharedMemoryRegion + Send + 'static, D: DoorBell + Send + 'static> OwnedSplit
for BidirectionalPipe<R, D>
{
fn split(
self,
) -> (
Expand All @@ -164,37 +204,75 @@ impl<R: SharedMemoryRegion + Send + 'static> OwnedSplit for BidirectionalPipe<R>
}

#[cfg(test)]
mod tests {
use super::*;
use crate::pipe::shared_memory_region::RawSharedMemoryRegion;
use crate::pipe::traits::{Read, Write};
pub(crate) mod test_utils {
use crate::pipe::DoorBell;
use core::sync::atomic::{AtomicUsize, Ordering};

#[repr(align(8))]
struct Aligned<const N: usize>([u8; N]);
pub struct TestDoorBell {
count: AtomicUsize,
}

impl DoorBell for TestDoorBell {
fn ring(&self) {
self.count.fetch_add(1, Ordering::Release);
}
}

impl TestDoorBell {
pub fn new() -> Self {
Self {
count: AtomicUsize::new(0),
}
}
}

#[macro_export]
macro_rules! pipe_pair {
($ring:expr, $mem:ident, $a:ident, $b:ident) => {
let mut $mem = Aligned(
[0u8; BidirectionalPipe::<RawSharedMemoryRegion>::required_size($ring as u64)
as usize],
[0u8; BidirectionalPipe::<RawSharedMemoryRegion, TestDoorBell>::required_size(
$ring as u64,
) as usize],
);
// A `RawSharedMemoryRegion` is a `Copy` handle, so each side gets its
// own owned clone pointing at the same backing bytes.
let region = unsafe {
RawSharedMemoryRegion::from_raw($mem.0.as_mut_ptr(), $mem.0.len() as u64)
};
// `mut` is needed by tests that read/write; split-only tests don't.
#[allow(unused_mut)]
let mut $a = BidirectionalPipe::new(region, $ring, Side::A);
let mut $a = BidirectionalPipe::new(
region,
$ring,
Side::A,
TestDoorBell::new(),
TestDoorBell::new(),
);
#[allow(unused_mut)]
let mut $b = BidirectionalPipe::new(region, $ring, Side::B);
let mut $b = BidirectionalPipe::new(
region,
$ring,
Side::B,
TestDoorBell::new(),
TestDoorBell::new(),
);
};
}

pub(crate) use pipe_pair;
}

#[cfg(test)]
mod tests {
use super::*;
use crate::pipe::traits::{Read, Write};
use crate::pipe::{RawSharedMemoryRegion, RingConsumer, RingProducer};
use test_utils::{pipe_pair, TestDoorBell};

#[repr(align(8))]
struct Aligned<const N: usize>([u8; N]);

#[test]
fn required_size_matches_layout() {
assert_eq!(
BidirectionalPipe::<RawSharedMemoryRegion>::required_size(64),
BidirectionalPipe::<RawSharedMemoryRegion, TestDoorBell>::required_size(64),
2 * (24 + 64)
);
}
Expand All @@ -220,7 +298,7 @@ mod tests {
// The whole point of dropping the lifetime: owned, `Send` endpoints
// that can move to other threads / async tasks.
fn assert_send<T: Send>() {}
assert_send::<BidirectionalPipe<RawSharedMemoryRegion>>();
assert_send::<BidirectionalPipe<RawSharedMemoryRegion, TestDoorBell>>();
assert_send::<RingConsumer<RawSharedMemoryRegion>>();
assert_send::<RingProducer<RawSharedMemoryRegion>>();
}
Expand Down
15 changes: 15 additions & 0 deletions common/src/pipe/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,18 @@ pub trait Write {
pub trait OwnedSplit {
fn split(self) -> (impl Read + Send + 'static, impl Write + Send + 'static);
}

/// Notify the waiter on newly available event (readable/writable)
///
/// ring blocks until it's possible to ring
pub trait DoorBell {
fn ring(&self);
}

/// Wait for an event.
///
/// DoorBellWaiter on the arca-side is handled by kthreads wfi
#[cfg(feature = "std")]
pub trait DoorBellWaiter {
fn wait(&mut self);
}
30 changes: 13 additions & 17 deletions common/src/sync_stream.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use crate::pipe::{BidirectionalPipe, PipeError, Read, SharedMemoryRegion, Write};
use crate::pipe::{BidirectionalPipe, DoorBell, PipeError, Read, SharedMemoryRegion, Write};

#[derive(Debug)]
pub enum StreamError {
WriteClosed,
}

pub struct SyncStream<R: SharedMemoryRegion> {
pipe: BidirectionalPipe<R>,
pub struct SyncStream<R: SharedMemoryRegion, D: DoorBell> {
/// BuddyAllocator offset of the SHM region backing this pipe.
pipe: BidirectionalPipe<R, D>,
}

impl<R: SharedMemoryRegion> SyncStream<R> {
pub fn from_pipe(pipe: BidirectionalPipe<R>) -> Self {
impl<R: SharedMemoryRegion, D: DoorBell> SyncStream<R, D> {
pub fn from_pipe(pipe: BidirectionalPipe<R, D>) -> Self {
Self { pipe }
}

Expand Down Expand Up @@ -53,7 +54,10 @@ impl<R: SharedMemoryRegion> SyncStream<R> {
}
}

fn read_exact<R: SharedMemoryRegion>(pipe: &mut BidirectionalPipe<R>, buf: &mut [u8]) -> usize {
fn read_exact<R: SharedMemoryRegion, D: DoorBell>(
pipe: &mut BidirectionalPipe<R, D>,
buf: &mut [u8],
) -> usize {
let mut filled = 0;
while filled < buf.len() {
match pipe.read(&mut buf[filled..]) {
Expand All @@ -72,24 +76,16 @@ fn read_exact<R: SharedMemoryRegion>(pipe: &mut BidirectionalPipe<R>, buf: &mut
#[cfg(test)]
mod tests {
use super::*;
use crate::pipe::test_utils::pipe_pair;
use crate::pipe::test_utils::TestDoorBell;
use crate::pipe::{BidirectionalPipe, RawSharedMemoryRegion, Side};

#[repr(align(8))]
struct Aligned<const N: usize>([u8; N]);

macro_rules! stream_pair {
($ring:expr, $mem:ident, $a:ident, $b:ident) => {
let mut $mem = Aligned(
[0u8; BidirectionalPipe::<RawSharedMemoryRegion>::required_size($ring as u64)
as usize],
);
// A `RawSharedMemoryRegion` is a `Copy` handle, so each side gets its
// own owned clone pointing at the same backing bytes.
let region = unsafe {
RawSharedMemoryRegion::from_raw($mem.0.as_mut_ptr(), $mem.0.len() as u64)
};
let pipe_a = BidirectionalPipe::new(region, $ring, Side::A);
let pipe_b = BidirectionalPipe::new(region, $ring, Side::B);
pipe_pair!($ring, $mem, pipe_a, pipe_b);
let mut $a = SyncStream::from_pipe(pipe_a);
let mut $b = SyncStream::from_pipe(pipe_b);
};
Expand Down
27 changes: 27 additions & 0 deletions kernel/src/doorbell.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#![allow(unused)]

use common::{
pipe::{DoorBell, PipeError},
BuddyAllocator,
};

pub struct VMToHostDoorBell {
addr: *mut u64,
datamatch: u64,
}

impl VMToHostDoorBell {
fn from_raw_parts(addr: u64, datamatch: u64) -> Self {
let addr: *const u64 = BuddyAllocator.from_offset(addr as usize);
let addr: *mut u64 = addr as *mut u64;
Self { addr, datamatch }
}
}

impl DoorBell for VMToHostDoorBell {
fn ring(&self) {
unsafe {
core::ptr::write_volatile(self.addr, self.datamatch);
}
}
}
1 change: 1 addition & 0 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub mod tsc;
pub mod types;
pub mod vm;

mod doorbell;
mod gdt;
mod idt;
mod interrupts;
Expand Down
Loading
Loading