Revert to declarative macros and single crate structure

main
Adrian Alic 2 weeks ago
parent 25bc304c6d
commit 5f6fe6e138
  1. 4
      Cargo.lock
  2. 8
      Cargo.toml
  3. 2
      benches/alignment.rs
  4. 22
      src/bin/main.rs
  5. 193
      src/lib.rs
  6. 8
      wfmpsc-proc/Cargo.toml
  7. 14
      wfmpsc-proc/src/lib.rs

4
Cargo.lock generated

@ -14,7 +14,3 @@ version = "0.1.0"
dependencies = [
"paste",
]
[[package]]
name = "wfmpsc-proc"
version = "0.1.0"

@ -7,15 +7,15 @@ description = "Wait-free, cache-aware MPSC queue for serial output"
autotests = false
edition = "2021"
[workspace]
members = ["wfmpsc-proc"]
[dependencies]
paste = "1.0.9"
[lib]
doctest = false
[[bin]]
name = "main"
doctest = false
[[bench]]
name = "alignment"

@ -12,7 +12,7 @@ extern crate test;
#[cfg(test)]
mod _t {
use test::{black_box, Bencher};
use wfmpsc::*;
use wfmpsc::{mpscq, TLQ};
#[bench]
fn eval_checked_fill(b: &mut Bencher) {

@ -0,0 +1,22 @@
use wfmpsc;
fn main() {
let mpscq = wfmpsc::mpscq!(
bitsize: 16,
producers: 9,
l1_cache: 128
);
mpscq.get_producer_handle(8);
// let alloc = nostd::FixedAllocStub::<0x2ffff, 0x20000> {};
// let no_std_queue = mpscq_alloc!(
// bitsize: 16,
// producers: 9,
// l1_cache: 128,
// allocator: alloc
// );
//
// eprintln!("0x{:x}", no_std_queue as *const _ as usize);
}

@ -7,21 +7,6 @@
#![feature(allocator_api)]
use core::fmt::Debug;
use core::hash::Hash;
/// This trait allows you to create thread-local queue handles, as well as
/// consumer handles.
pub trait MpscqUnsafe<const T: usize, const C: usize, const S: usize, const L: usize> {
/// Returns the head of queue `qid`. Every thread should know their
/// own qid.
fn get_producer_handle(&self, qid: u8) -> TLQ<C, L>;
/// Returns the consumer handle. This is the main interface from which the
/// consumer reads data and clears the thread-local queues.
/// NOTE: Only one single thread is allowed to use the consumer handle!
/// This is a single-consumer(!) queue.
fn get_consumer_handle(&self) -> ConsumerHandle<T, C, L>;
}
#[derive(Debug)]
pub struct ConsumerHandle<const T: usize, const C: usize, const L: usize> {}
@ -51,12 +36,22 @@ unsafe impl<const C: usize, const S: usize> Send for TLQ<C, S> {}
#[derive(Debug)] // TODO: Add custom debug implementation!
pub struct ThreadLocalBuffer<const L: usize>(*mut [u8; L]);
impl<const L: usize> ThreadLocalBuffer<L> {
pub fn new(ptr: *mut [u8; L]) -> Self {
Self { 0: ptr }
}
}
/// A tail that refers to the queue of a single, specific thread-local queue.
/// The tail may only be modified safely by the consumer!
#[derive(Debug)]
pub struct ThreadLocalTail<const C: usize>(*const u32);
impl<const C: usize> ThreadLocalTail<C> {}
impl<const C: usize> ThreadLocalTail<C> {
pub fn new(ptr: *const u32) -> Self {
Self { 0: ptr }
}
}
/// A head that may only be modified by exactly one thread! Also:
/// the head references exactly 2^C element, which means the queue's
@ -64,6 +59,9 @@ impl<const C: usize> ThreadLocalTail<C> {}
#[derive(Debug)]
pub struct ThreadLocalHead<const C: usize>(*mut u32);
impl<const C: usize> ThreadLocalHead<C> {
pub fn new(ptr: *mut u32) -> Self {
Self { 0: ptr }
}
/// Increments the head pointer, thereby committing the written
/// bytes to the consumer
#[inline]
@ -114,25 +112,24 @@ macro_rules! create_aligned {
impl<'a,
const T: usize, const C: usize, const S: usize, const L: usize>
MpscqUnsafe<T, C, S, L>
for [<__MPSCQ $ALIGN>]<T, C, S, L> {
[<__MPSCQ $ALIGN>]<T, C, S, L> {
/// Returns a TLQ handle. The only threads that are allowed to modify
/// the data behind this TLQ are the consumer thread and the producer
/// thread with the given qid. Any other access is unsafe and may
/// lead to critical failure!
fn get_producer_handle(&self, qid: u8) -> TLQ<C, L> {
pub fn get_producer_handle(&self, qid: u8) -> TLQ<C, L> {
assert!((qid as usize) < T);
TLQ::<C, L> {
tail: ThreadLocalTail(&self.tails.0[qid as usize] as *const u32),
head: ThreadLocalHead(&self.heads[qid as usize].0 as *const u32 as *mut u32),
buffer: ThreadLocalBuffer::<L>(
tail: ThreadLocalTail::new(&self.tails.0[qid as usize] as *const u32),
head: ThreadLocalHead::new(&self.heads[qid as usize].0 as *const u32 as *mut u32),
buffer: ThreadLocalBuffer::<L>::new(
//
(&self.buffer as *const u8 as usize
+ {qid as usize * {1 << C}}) as *mut [u8; L]
),
}
}
fn get_consumer_handle(&self) -> ConsumerHandle<T, C, L> {
pub fn get_consumer_handle(&self) -> ConsumerHandle<T, C, L> {
ConsumerHandle::<T, C, L> {
}
@ -152,113 +149,83 @@ macro_rules! create_aligned {
)*)
}
// Create structs for the following L1 cache alignments:
// Create types for common cache alignments
create_aligned! {32, 64, 128}
// pub static queue: __MPSCQ64<9, 16, 589824, 65536> = __MPSCQ64::<9, 16, 589824, 65536> {
// tails: __Tails64([0u32; 9]),
// heads: [__Head64(0); 9],
// buffer: &[0u8;589824] as *const [u8] as *mut [u8;589824],
// dealloc: |_, _, _| {},
// };
unsafe impl<const T: usize, const C: usize, const S: usize, const L: usize> Sync
for __MPSCQ64<T, C, S, L>
{
}
/// Allocate an mpscq with the default system allocator.
#[macro_export]
macro_rules! mpscq {
(
bitsize: $capacity:expr,
producers: $producers:expr,
l1_cache: $l1_cache:expr
) => {{
let alloc = std::alloc::System {};
mpscq_alloc!(
bitsize: $capacity,
producers: $producers,
l1_cache: $l1_cache,
allocator: alloc
)
}};
}
/// This module contains all #[no_std] compatible code.
pub mod nostd {
use core::{
alloc::{AllocError, Allocator, Layout},
ptr::NonNull,
};
/// Creates a MPSC queue with a custom allocator. The allocator must implement
/// the core::alloc::Allocator interface. If you want to use the default system
/// allocator, use [`mpscq!`].
#[macro_export]
macro_rules! mpscq_alloc {
(
bitsize: $b:expr,
producers: $p:expr,
l1_cache: $l1:expr,
allocator: $alloc:expr
l1_cache: $l1:expr
) => {{
if $p * 4 > $l1 {
panic!("Too many producers! Maximum is L1_CACHE / 4. TODO");
}
use core::alloc::{Allocator, Layout};
let size = $l1 + ($p * $l1) + $p * (1 << $b);
let align = 1 << $b;
let layout = Layout::from_size_align(size, align).unwrap();
let queue = paste::paste! {
$alloc.allocate(layout).unwrap().as_ptr() as *mut
[<__MPSCQ $l1>]::<$p, $b,
if $p * 4 > $l1 {
panic!("Too many producers! Maximum is L1_CACHE / 4. TODO");
}
use core::alloc::Layout;
let size = $l1 + ($p * $l1) + $p * (1 << $b);
let align = 1 << $b;
let layout = Layout::from_size_align(size, align).unwrap();
let queue = unsafe {
paste::paste! {
std::alloc::alloc(layout) as *mut
wfmpsc::[<__MPSCQ $l1>]::<$p, $b,
// S = T * 2^C is the global buffer size
{$p * (1 << $b)},
// L = 2^C is the thread-local capacity
{1 << $b}>
};
unsafe { queue.as_ref().unwrap() }
}};
}
/// A stub allocator that always returns them same given memory region.
/// The region is given by START and SIZE parameter (address and length
/// of the memory region)
pub struct FixedAllocStub<const START: usize, const SIZE: usize>;
unsafe impl<const START: usize, const SIZE: usize> Allocator for FixedAllocStub<START, SIZE> {
fn allocate(&self, _: Layout) -> Result<NonNull<[u8]>, AllocError> {
match NonNull::new(core::ptr::slice_from_raw_parts_mut(START as *mut u8, SIZE)) {
None => Err(AllocError {}),
Some(ptr) => Ok(ptr),
}
};
unsafe { queue.as_ref().unwrap() }
}};
}
/// Creates a MPSC queue with a custom allocator. The allocator must implement
/// the core::alloc::Allocator interface. If you want to use the default system
/// allocator, use [`mpscq!`].
#[macro_export]
macro_rules! mpscq_alloc {
(
bitsize: $b:expr,
producers: $p:expr,
l1_cache: $l1:expr,
allocator: $alloc:expr
) => {{
if $p * 4 > $l1 {
panic!("Too many producers! Maximum is L1_CACHE / 4. TODO");
}
unsafe fn deallocate(&self, _: NonNull<u8>, _: Layout) {}
}
use core::alloc::{Allocator, Layout};
let size = $l1 + ($p * $l1) + $p * (1 << $b);
let align = 1 << $b;
let layout = Layout::from_size_align(size, align).unwrap();
let queue = wfmpsc::paste! {
$alloc.allocate(layout).unwrap().as_ptr() as *mut
wfmpsc::[<__MPSCQ $l1>]::<$p, $b,
// S = T * 2^C is the global buffer size
{$p * (1 << $b)},
// L = 2^C is the thread-local capacity
{1 << $b}>
};
unsafe { queue.as_ref().unwrap() }
}};
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_dev() {
let mpscq = mpscq!(
bitsize: 16,
producers: 9,
l1_cache: 128
);
let alloc = nostd::FixedAllocStub::<0x2ffff, 0x20000> {};
let no_std_queue = mpscq_alloc!(
bitsize: 16,
producers: 9,
l1_cache: 128,
allocator: alloc
);
eprintln!("0x{:x}", no_std_queue as *const _ as usize);
use core::{
alloc::{AllocError, Allocator, Layout},
ptr::NonNull,
};
/// A stub allocator that always returns them same given memory region.
/// The region is given by START and SIZE parameter (address and length
/// of the memory region)
pub struct FixedAllocStub<const START: usize, const SIZE: usize>;
unsafe impl<const START: usize, const SIZE: usize> Allocator for FixedAllocStub<START, SIZE> {
fn allocate(&self, _: Layout) -> Result<NonNull<[u8]>, AllocError> {
match NonNull::new(core::ptr::slice_from_raw_parts_mut(START as *mut u8, SIZE)) {
None => Err(AllocError {}),
Some(ptr) => Ok(ptr),
}
}
unsafe fn deallocate(&self, _: NonNull<u8>, _: Layout) {}
}

@ -1,8 +0,0 @@
[package]
name = "wfmpsc-proc"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]

@ -1,14 +0,0 @@
pub fn add(left: usize, right: usize) -> usize {
left + right
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
}
}
Loading…
Cancel
Save