Skip to content

Commit 5332614

Browse files
committed
Implement Reduce operator and examples; enhance platformio.ini for new Scan example
1 parent eec6fca commit 5332614

File tree

7 files changed

+227
-11
lines changed

7 files changed

+227
-11
lines changed

examples/Reduce/Reduce.cpp

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,42 @@ using namespace Reactive;
1313
int values[] = { 0, 1, 4, 6, 2, 5, 7, 3, 5, 8 };
1414
int valuesLength = sizeof(values) / sizeof(values[0]);
1515

16+
// Reduce function: Start with a float value of 10.0 and add 1.5 times the integer value
17+
float accumulateValues(float acc, int value) {
18+
return acc + value * 1.5f;
19+
}
20+
21+
// Action to print the final reduced value
22+
void printFinalResult(float x) {
23+
Serial.print("Final reduced result: ");
24+
Serial.println(x);
25+
}
26+
27+
// Action when reduction is complete
28+
void printComplete() {
29+
Serial.println("Reduction complete!");
30+
}
31+
1632
void setup()
1733
{
1834
Serial.begin(115200);
1935
while (!Serial) delay(1);
36+
37+
Serial.println("=== Reduce Operator Example ===");
38+
Serial.println("Values: 0, 1, 4, 6, 2, 5, 7, 3, 5, 8");
39+
Serial.println("Formula: acc + value * 1.5, starting with 10.0");
40+
Serial.println();
2041
}
2142

22-
// Start with a float value of 10.0 (optional) and add 1.5 times the (integer) value
23-
2443
void loop()
2544
{
45+
Serial.println("Running reduction...");
46+
47+
// Using ReduceToFinal - only emits the final accumulated result
2648
FromArray(values, valuesLength)
27-
.Reduce<float>([](float acc, int value) {
28-
return acc + value * 1.5f;
29-
}, 10.0f)
30-
.DoAndFinally(
31-
[](float x) { Serial.println(x); },
32-
[]() { Serial.println("No more items"); }
33-
);
34-
35-
delay(2000);
49+
.ReduceToFinal<float>(accumulateValues, 10.0f)
50+
.DoAndFinally(printFinalResult, printComplete);
51+
52+
Serial.println();
53+
delay(3000);
3654
}

examples/Scan/Scan.cpp

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/***************************************************
2+
Copyright (c) 2019 Luis Llamas
3+
(www.luisllamas.es)
4+
5+
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
7+
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License
8+
****************************************************/
9+
10+
#include "ReactiveArduinoLib.h"
11+
using namespace Reactive;
12+
13+
int values[] = { 1, 2, 3, 4, 5 };
14+
int valuesLength = sizeof(values) / sizeof(values[0]);
15+
16+
// Accumulator function for sum calculation
17+
int addValues(int acc, int value) {
18+
return acc + value;
19+
}
20+
21+
// Accumulator function for product calculation
22+
int multiplyValues(int acc, int value) {
23+
return acc * value;
24+
}
25+
26+
// Action to print intermediate scan results
27+
void printScanResult(int x) {
28+
Serial.print("Scan result: ");
29+
Serial.println(x);
30+
}
31+
32+
// Action when scan is complete
33+
void printScanComplete() {
34+
Serial.println("Scan complete!");
35+
Serial.println();
36+
}
37+
38+
// Example showing running average calculation
39+
float runningAverageValues[] = { 10.0, 20.0, 30.0, 40.0, 50.0 };
40+
int avgValuesLength = sizeof(runningAverageValues) / sizeof(runningAverageValues[0]);
41+
int valueCount = 0;
42+
43+
// Running average accumulator
44+
float calculateRunningAverage(float acc, float value) {
45+
valueCount++;
46+
return ((acc * (valueCount - 1)) + value) / valueCount;
47+
}
48+
49+
// Action to print running average
50+
void printRunningAverage(float x) {
51+
Serial.print("Running average: ");
52+
Serial.println(x, 2);
53+
}
54+
55+
// Action when running average is complete
56+
void printAverageComplete() {
57+
Serial.println("Running average complete!");
58+
Serial.println();
59+
valueCount = 0; // Reset for next iteration
60+
}
61+
62+
void setup()
63+
{
64+
Serial.begin(115200);
65+
while (!Serial) delay(1);
66+
67+
Serial.println("=== Scan Operator Example ===");
68+
Serial.println("Scan emits intermediate accumulated results");
69+
Serial.println("Unlike Reduce which only emits the final result");
70+
Serial.println();
71+
}
72+
73+
void loop()
74+
{
75+
Serial.println("Example 1: Running Sum with Scan");
76+
Serial.println("Values: 1, 2, 3, 4, 5 (starting with 0)");
77+
78+
// Scan emits: 1, 3, 6, 10, 15
79+
FromArray(values, valuesLength)
80+
.Scan<int>(0, addValues) // Start with 0, add each value
81+
.DoAndFinally(printScanResult, printScanComplete);
82+
83+
delay(1000);
84+
85+
Serial.println("Example 2: Running Product with Scan");
86+
Serial.println("Values: 1, 2, 3, 4, 5 (starting with 1)");
87+
88+
// Scan emits: 1, 2, 6, 24, 120
89+
FromArray(values, valuesLength)
90+
.Scan<int>(1, multiplyValues) // Start with 1, multiply each value
91+
.DoAndFinally(printScanResult, printScanComplete);
92+
93+
delay(1000);
94+
95+
Serial.println("Example 3: Running Average with Scan");
96+
Serial.println("Values: 10.0, 20.0, 30.0, 40.0, 50.0");
97+
98+
// Reset counter for this example
99+
valueCount = 0;
100+
101+
// Scan emits running averages: 10.0, 15.0, 20.0, 25.0, 30.0
102+
FromArray(runningAverageValues, avgValuesLength)
103+
.Scan<float>(0.0f, calculateRunningAverage)
104+
.DoAndFinally(printRunningAverage, printAverageComplete);
105+
106+
delay(3000);
107+
Serial.println("========================================");
108+
}

platformio.ini

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ build_src_filter = +<../examples/Reduce/>
5353
[env:rotary_encoder]
5454
build_src_filter = +<../examples/RotaryEncoder/>
5555

56+
[env:scan]
57+
build_src_filter = +<../examples/Scan/>
58+
5659
[env:throttle]
5760
build_src_filter = +<../examples/Throttle/>
5861

src/Operators/OperatorReduce.h

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/***************************************************
2+
Copyright (c) 2019 Luis Llamas
3+
(www.luisllamas.es)
4+
5+
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
7+
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License
8+
****************************************************/
9+
10+
#ifndef _REACTIVEOPERATORREDUCE_h
11+
#define _REACTIVEOPERATORREDUCE_h
12+
13+
template <typename Torig, typename Tdest>
14+
class OperatorReduce : public Operator<Torig, Tdest>
15+
{
16+
public:
17+
typedef Tdest(*ReactiveReduceFunction)(Tdest, Torig);
18+
19+
OperatorReduce(ReactiveReduceFunction function, Tdest initialValue);
20+
21+
void OnNext(Torig value) override;
22+
void OnComplete() override;
23+
void Reset() override;
24+
25+
private:
26+
ReactiveReduceFunction _function;
27+
Tdest _accumulator;
28+
Tdest _initialValue;
29+
bool _hasValue;
30+
};
31+
32+
template <typename Torig, typename Tdest>
33+
OperatorReduce<Torig, Tdest>::OperatorReduce(ReactiveReduceFunction function, Tdest initialValue)
34+
{
35+
_function = function;
36+
_initialValue = initialValue;
37+
_accumulator = initialValue;
38+
_hasValue = false;
39+
}
40+
41+
template <typename Torig, typename Tdest>
42+
void OperatorReduce<Torig, Tdest>::OnNext(Torig value)
43+
{
44+
// Accumulate the value but don't emit yet
45+
_accumulator = _function(_accumulator, value);
46+
_hasValue = true;
47+
}
48+
49+
template <typename Torig, typename Tdest>
50+
void OperatorReduce<Torig, Tdest>::OnComplete()
51+
{
52+
// Only emit the final accumulated result when the observable completes
53+
if (_hasValue || _accumulator != _initialValue)
54+
{
55+
this->_childObservers.OnNext(_accumulator);
56+
}
57+
58+
// Forward the completion signal
59+
Operator<Torig, Tdest>::OnComplete();
60+
}
61+
62+
template <typename Torig, typename Tdest>
63+
void OperatorReduce<Torig, Tdest>::Reset()
64+
{
65+
_accumulator = _initialValue;
66+
_hasValue = false;
67+
}
68+
69+
#endif

src/Operators/Operators.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,5 +47,6 @@ Unless required by applicable law or agreed to in writing, software distributed
4747
#include "OperatorStartWith.h"
4848
#include "OperatorDebounce.h"
4949
#include "OperatorSelect.h"
50+
#include "OperatorReduce.h"
5051

5152
#endif

src/ReactiveArduinoCore.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ template <typename T> class OperatorWhere;
5757
template <typename T> class OperatorDistinct;
5858
template <typename T> class OperatorDistinctUntilChanged;
5959
template <typename Torig, typename Tdest> class OperatorSelect;
60+
template <typename Torig, typename Tdest> class OperatorReduce;
6061
template <typename T> class OperatorLast;
6162
template <typename T> class OperatorFirst;
6263
template <typename T> class OperatorTake;
@@ -201,6 +202,7 @@ class Observable : IObservable<T>, IResetable<T>
201202
template <class Tdest> OperatorSelect<T, Tdest>& SelectTo(ReactiveMap<T, Tdest> selector);
202203
template <class Tdest> TransformationMap<T, Tdest>& Map(ReactiveMap<T, Tdest> map);
203204
template <class Tdest> TransformationReduce<T, Tdest>& Reduce(ReactiveReduce<T, Tdest> function, Tdest init = Tdest());
205+
template <class Tdest> OperatorReduce<T, Tdest>& ReduceToFinal(ReactiveReduce<T, Tdest> function, Tdest init = Tdest());
204206
TransformationUpperLimit<T>& LimitUpper(T upperLimit);
205207
TransformationLowerLimit<T>& LimitLower(T lowerLimit);
206208
TransformationLimit<T>& Limit(T lowerLimit, T upperLimit);
@@ -528,6 +530,15 @@ auto Observable<T>::Reduce(ReactiveReduce<T, Tdest> function, Tdest init) -> Tra
528530
return *newOp;
529531
}
530532

533+
template <typename T>
534+
template <typename Tdest>
535+
auto Observable<T>::ReduceToFinal(ReactiveReduce<T, Tdest> function, Tdest init) -> OperatorReduce<T, Tdest>&
536+
{
537+
auto newOp = new OperatorReduce<T, Tdest>(function, init);
538+
Compound(*this, *newOp);
539+
return *newOp;
540+
}
541+
531542

532543
template <typename T>
533544
auto Observable<T>::LimitUpper(T upperLimit) -> TransformationUpperLimit<T>&

src/ReactiveArduinoLib.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,12 @@ namespace Reactive
373373
{
374374
return *(new OperatorRepeat<T>(N));
375375
}
376+
377+
template <typename Torig, typename Tdest>
378+
OperatorReduce<Torig, Tdest>& ReduceToFinal(typename OperatorReduce<Torig, Tdest>::ReactiveReduceFunction function, Tdest init)
379+
{
380+
return *(new OperatorReduce<Torig, Tdest>(function, init));
381+
}
376382
// #pragma endregion
377383

378384

0 commit comments

Comments
 (0)