Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions benches/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,24 @@ use futures_executor::block_on;
use futures_signals::signal::{channel, SignalExt};

fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("channel", |b| b.iter(|| {
let (sender, receiver) = channel(black_box(0));
c.bench_function("channel", |b| {
b.iter(|| {
let (sender, receiver) = channel(black_box(0));

let handle = std::thread::spawn(move || {
block_on(receiver.for_each(|value| {
assert!(value >= 0);
async move {}
}));
});
let handle = std::thread::spawn(move || {
block_on(receiver.for_each(|value| {
assert!(value >= 0);
async move {}
}));
});

let _ = sender.send(black_box(1));
let _ = sender.send(black_box(1));

drop(sender);
drop(sender);

let _ = handle.join();
}));
let _ = handle.join();
})
});
}

criterion_group!(benches, criterion_benchmark);
Expand Down
7 changes: 3 additions & 4 deletions src/atomic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::sync::atomic::{AtomicPtr, Ordering};


/*#[derive(Debug)]
pub(crate) struct Atomic<A> {
// TODO only box if the value is too big
Expand Down Expand Up @@ -47,7 +46,6 @@ impl<A> Drop for Atomic<A> {
}
}*/


/// The same as `Atomic<Option<A>>` except faster and uses less memory.
///
/// This is because it represents `None` as a null pointer, which avoids boxing.
Expand All @@ -68,7 +66,6 @@ impl<A> AtomicOption<A> {
fn from_ptr(ptr: *mut A) -> Option<A> {
if ptr.is_null() {
None

} else {
// This is safe because we only do this for pointers created with `Box::into_raw`
unsafe { Some(*Box::from_raw(ptr)) }
Expand Down Expand Up @@ -106,7 +103,9 @@ impl<A> Drop for AtomicOption<A> {

if !ptr.is_null() {
// This is safe because we only do this for pointers created with `Box::into_raw`
unsafe { drop(Box::from_raw(ptr)); }
unsafe {
drop(Box::from_raw(ptr));
}
}
}
}
41 changes: 24 additions & 17 deletions src/future.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
use std::pin::Pin;
// TODO use parking_lot ?
use std::sync::{Arc, Weak, Mutex};
use std::future::Future;
use std::task::{Poll, Waker, Context};
use std::sync::{Arc, Mutex, Weak};
use std::task::{Context, Poll, Waker};
// TODO use parking_lot ?
use std::sync::atomic::{AtomicBool, Ordering};
use discard::{Discard, DiscardOnDrop};
use pin_project::pin_project;

use std::sync::atomic::{AtomicBool, Ordering};

#[derive(Debug)]
struct CancelableFutureState {
is_cancelled: AtomicBool,
waker: Mutex<Option<Waker>>,
}


#[derive(Debug)]
pub struct CancelableFutureHandle {
state: Weak<CancelableFutureState>,
Expand All @@ -37,7 +35,6 @@ impl Discard for CancelableFutureHandle {
}
}


#[pin_project(project = CancelableFutureProj)]
#[derive(Debug)]
#[must_use = "Futures do nothing unless polled"]
Expand All @@ -49,14 +46,19 @@ pub struct CancelableFuture<A, B> {
}

impl<A, B> Future for CancelableFuture<A, B>
where A: Future,
B: FnOnce() -> A::Output {

where
A: Future,
B: FnOnce() -> A::Output,
{
type Output = A::Output;

// TODO should this inline ?
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let CancelableFutureProj { state, mut future, when_cancelled } = self.project();
let CancelableFutureProj {
state,
mut future,
when_cancelled,
} = self.project();

// TODO is this correct ?
if state.is_cancelled.load(Ordering::SeqCst) {
Expand All @@ -65,27 +67,32 @@ impl<A, B> Future for CancelableFuture<A, B>
let callback = when_cancelled.take().unwrap();
// TODO figure out how to call the callback immediately when discard is called, e.g. using two Arc<Mutex<>>
Poll::Ready(callback())

} else {
match future.as_pin_mut().unwrap().poll(cx) {
Poll::Pending => {
// TODO is this correct ?
*state.waker.lock().unwrap() = Some(cx.waker().clone());
Poll::Pending
},
}
a => a,
}
}
}
}


// TODO figure out a more efficient way to implement this
// TODO replace with futures_util::abortable ?
pub fn cancelable_future<A, B>(future: A, when_cancelled: B) -> (DiscardOnDrop<CancelableFutureHandle>, CancelableFuture<A, B>)
where A: Future,
B: FnOnce() -> A::Output {

pub fn cancelable_future<A, B>(
future: A,
when_cancelled: B,
) -> (
DiscardOnDrop<CancelableFutureHandle>,
CancelableFuture<A, B>,
)
where
A: Future,
B: FnOnce() -> A::Output,
{
let state = Arc::new(CancelableFutureState {
is_cancelled: AtomicBool::new(false),
waker: Mutex::new(None),
Expand Down
Loading