Skip to content

Commit 23f8ac0

Browse files
committed
refactor buffered + fix size_hint for TestSource
add tests for buffered, fix buffered `parameters_changed`
1 parent 8b0b1cc commit 23f8ac0

File tree

5 files changed

+245
-133
lines changed

5 files changed

+245
-133
lines changed

src/source/buffered.rs

Lines changed: 124 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -1,137 +1,45 @@
1+
/// A iterator that stores extracted data in memory while allowing
2+
/// concurrent reading in real time.
13
use std::mem;
24
use std::sync::{Arc, Mutex};
35
use std::time::Duration;
46

57
use super::SeekError;
68
use crate::common::{ChannelCount, SampleRate};
9+
use crate::math::PrevMultipleOf;
710
use crate::Source;
811

9-
/// Internal function that builds a `Buffered` object.
10-
#[inline]
11-
pub fn buffered<I>(input: I) -> Buffered<I>
12-
where
13-
I: Source,
14-
{
15-
let total_duration = input.total_duration();
16-
let first_span = extract(input);
17-
18-
Buffered {
19-
current_span: first_span,
20-
position_in_span: 0,
21-
total_duration,
22-
}
23-
}
24-
25-
/// Iterator that at the same time extracts data from the iterator and stores it in a buffer.
12+
/// Iterator that at the same time extracts data from the iterator and
13+
/// stores it in a buffer.
2614
pub struct Buffered<I>
2715
where
2816
I: Source,
2917
{
3018
/// Immutable reference to the next span of data. Cannot be `Span::Input`.
3119
current_span: Arc<Span<I>>,
3220

21+
parameters_changed: bool,
22+
3323
/// The position in number of samples of this iterator inside `current_span`.
3424
position_in_span: usize,
3525

3626
/// Obtained once at creation and never modified again.
3727
total_duration: Option<Duration>,
3828
}
3929

40-
enum Span<I>
41-
where
42-
I: Source,
43-
{
44-
/// Data that has already been extracted from the iterator. Also contains a pointer to the
45-
/// next span.
46-
Data(SpanData<I>),
47-
48-
/// No more data.
49-
End,
50-
51-
/// Unextracted data. The `Option` should never be `None` and is only here for easier data
52-
/// processing.
53-
Input(Mutex<Option<I>>),
54-
}
55-
56-
struct SpanData<I>
57-
where
58-
I: Source,
59-
{
60-
data: Vec<I::Item>,
61-
channels: ChannelCount,
62-
rate: SampleRate,
63-
next: Mutex<Arc<Span<I>>>,
64-
}
65-
66-
impl<I> Drop for SpanData<I>
67-
where
68-
I: Source,
69-
{
70-
fn drop(&mut self) {
71-
// This is necessary to prevent stack overflows deallocating long chains of the mutually
72-
// recursive `Span` and `SpanData` types. This iteratively traverses as much of the
73-
// chain as needs to be deallocated, and repeatedly "pops" the head off the list. This
74-
// solves the problem, as when the time comes to actually deallocate the `SpanData`,
75-
// the `next` field will contain a `Span::End`, or an `Arc` with additional references,
76-
// so the depth of recursive drops will be bounded.
77-
while let Ok(arc_next) = self.next.get_mut() {
78-
if let Some(next_ref) = Arc::get_mut(arc_next) {
79-
// This allows us to own the next Span.
80-
let next = mem::replace(next_ref, Span::End);
81-
if let Span::Data(next_data) = next {
82-
// Swap the current SpanData with the next one, allowing the current one
83-
// to go out of scope.
84-
*self = next_data;
85-
} else {
86-
break;
87-
}
88-
} else {
89-
break;
90-
}
91-
}
92-
}
93-
}
94-
95-
/// Builds a span from the input iterator.
96-
fn extract<I>(mut input: I) -> Arc<Span<I>>
97-
where
98-
I: Source,
99-
{
100-
if input.parameters_changed() {
101-
return Arc::new(Span::End);
102-
}
103-
104-
let channels = input.channels();
105-
let rate = input.sample_rate();
30+
impl<I: Source> Buffered<I> {
31+
pub(crate) fn new(input: I) -> Buffered<I> {
32+
let total_duration = input.total_duration();
33+
let first_span = extract(input);
10634

107-
let mut data = Vec::new();
108-
loop {
109-
let Some(element) = input.next() else { break };
110-
data.push(element);
111-
if input.parameters_changed() {
112-
break;
113-
}
114-
if data.len() > 32768 {
115-
break;
35+
Buffered {
36+
current_span: first_span,
37+
position_in_span: 0,
38+
total_duration,
39+
parameters_changed: false,
11640
}
11741
}
11842

119-
if data.is_empty() {
120-
return Arc::new(Span::End);
121-
}
122-
123-
Arc::new(Span::Data(SpanData {
124-
data,
125-
channels,
126-
rate,
127-
next: Mutex::new(Arc::new(Span::Input(Mutex::new(Some(input))))),
128-
}))
129-
}
130-
131-
impl<I> Buffered<I>
132-
where
133-
I: Source,
134-
{
13543
/// Advances to the next span.
13644
fn next_span(&mut self) {
13745
let next_span = {
@@ -145,6 +53,7 @@ where
14553
Span::End => next_span_ptr.clone(),
14654
Span::Input(input) => {
14755
let input = input.lock().unwrap().take().unwrap();
56+
dbg!();
14857
extract(input)
14958
}
15059
};
@@ -169,7 +78,7 @@ where
16978
let current_sample;
17079
let advance_span;
17180

172-
match &*self.current_span {
81+
match dbg!(&*self.current_span) {
17382
Span::Data(SpanData { data, .. }) => {
17483
current_sample = Some(data[self.position_in_span]);
17584
self.position_in_span += 1;
@@ -185,39 +94,31 @@ where
18594
};
18695

18796
if advance_span {
97+
dbg!();
98+
self.parameters_changed = true;
18899
self.next_span();
100+
} else {
101+
self.parameters_changed = false;
189102
}
190103

191104
current_sample
192105
}
193-
194-
#[inline]
195-
fn size_hint(&self) -> (usize, Option<usize>) {
196-
// TODO:
197-
(0, None)
198-
}
199106
}
200107

201-
// TODO: implement exactsize iterator when size_hint is done
202-
203108
impl<I> Source for Buffered<I>
204109
where
205110
I: Source,
206111
{
207112
#[inline]
208113
fn parameters_changed(&self) -> bool {
209-
match &*self.current_span {
210-
Span::Data(_) => false,
211-
Span::End => true,
212-
Span::Input(_) => unreachable!(),
213-
}
114+
self.parameters_changed
214115
}
215116

216117
#[inline]
217118
fn channels(&self) -> ChannelCount {
218119
match *self.current_span {
219120
Span::Data(SpanData { channels, .. }) => channels,
220-
Span::End => 1,
121+
Span::End => 0,
221122
Span::Input(_) => unreachable!(),
222123
}
223124
}
@@ -226,7 +127,7 @@ where
226127
fn sample_rate(&self) -> SampleRate {
227128
match *self.current_span {
228129
Span::Data(SpanData { rate, .. }) => rate,
229-
Span::End => 44100,
130+
Span::End => 0,
230131
Span::Input(_) => unreachable!(),
231132
}
232133
}
@@ -256,6 +157,105 @@ where
256157
current_span: self.current_span.clone(),
257158
position_in_span: self.position_in_span,
258159
total_duration: self.total_duration,
160+
parameters_changed: self.parameters_changed,
161+
}
162+
}
163+
}
164+
165+
enum Span<I>
166+
where
167+
I: Source,
168+
{
169+
/// Data that has already been extracted from the iterator.
170+
/// Also contains a pointer to the next span.
171+
Data(SpanData<I>),
172+
173+
/// No more data.
174+
End,
175+
176+
/// Unextracted data. The `Option` should never be `None` and is only here for easier data
177+
/// processing.
178+
Input(Mutex<Option<I>>),
179+
}
180+
181+
impl<I: Source> std::fmt::Debug for Span<I> {
182+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
183+
match self {
184+
Span::Data(_) => f.write_str("Span::Data"),
185+
Span::End => f.write_str("Span::End"),
186+
Span::Input(_) => f.write_str("Span::Input"),
259187
}
260188
}
261189
}
190+
191+
struct SpanData<I>
192+
where
193+
I: Source,
194+
{
195+
data: Vec<I::Item>,
196+
channels: ChannelCount,
197+
rate: SampleRate,
198+
next: Mutex<Arc<Span<I>>>,
199+
}
200+
201+
impl<I> Drop for SpanData<I>
202+
where
203+
I: Source,
204+
{
205+
fn drop(&mut self) {
206+
// This is necessary to prevent stack overflows deallocating long chains of the mutually
207+
// recursive `Span` and `SpanData` types. This iteratively traverses as much of the
208+
// chain as needs to be deallocated, and repeatedly "pops" the head off the list. This
209+
// solves the problem, as when the time comes to actually deallocate the `SpanData`,
210+
// the `next` field will contain a `Span::End`, or an `Arc` with additional references,
211+
// so the depth of recursive drops will be bounded.
212+
while let Ok(arc_next) = self.next.get_mut() {
213+
if let Some(next_ref) = Arc::get_mut(arc_next) {
214+
// This allows us to own the next Span.
215+
let next = mem::replace(next_ref, Span::End);
216+
if let Span::Data(next_data) = next {
217+
// Swap the current SpanData with the next one, allowing the current one
218+
// to go out of scope.
219+
*self = next_data;
220+
} else {
221+
break;
222+
}
223+
} else {
224+
break;
225+
}
226+
}
227+
}
228+
}
229+
230+
/// Builds a span from the input iterator.
231+
fn extract<I>(mut input: I) -> Arc<Span<I>>
232+
where
233+
I: Source,
234+
{
235+
let channels = input.channels();
236+
let rate = input.sample_rate();
237+
238+
let mut data = Vec::new();
239+
loop {
240+
let Some(sample) = input.next() else { break };
241+
data.push(sample);
242+
dbg!(sample);
243+
if input.parameters_changed() {
244+
break;
245+
}
246+
if data.len() > 32768.prev_multiple_of(channels) {
247+
break;
248+
}
249+
}
250+
251+
if dbg!(data.is_empty()) {
252+
return Arc::new(Span::End);
253+
}
254+
255+
Arc::new(Span::Data(SpanData {
256+
data,
257+
channels,
258+
rate,
259+
next: Mutex::new(Arc::new(Span::Input(Mutex::new(Some(input))))),
260+
}))
261+
}

src/source/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ pub trait Source: Iterator<Item = Sample> {
179179
where
180180
Self: Sized,
181181
{
182-
buffered::buffered(self)
182+
Buffered::new(self)
183183
}
184184

185185
/// Mixes this source with another one.

0 commit comments

Comments
 (0)