11#include " sweep.h"
22#include " protocol.h"
3+ #include " queue.h"
34#include " serial.h"
45
5- #include < atomic>
66#include < chrono>
7- #include < condition_variable>
8- #include < mutex>
9- #include < queue>
107#include < thread>
118
12- // A threadsafe-queue to store and retrieve scans
13- class ScanQueue {
14- public:
15- ScanQueue (int32_t max) : max_size(max), the_queue(), the_mutex(), the_cond_var() {}
16-
17- // Empty the queue
18- void flush () {
19- std::unique_lock<std::mutex> lock (the_mutex);
20- while (!the_queue.empty ()) {
21- the_queue.pop ();
22- }
23- }
24-
25- // Add an element to the queue.
26- void enqueue (sweep_scan_s scan) {
27- std::lock_guard<std::mutex> lock (the_mutex);
28-
29- // if necessary, remove the oldest scan to make room for new
30- if (static_cast <int32_t >(the_queue.size ()) >= max_size)
31- the_queue.pop ();
32-
33- the_queue.push (scan);
34- the_cond_var.notify_one ();
35- }
36-
37- // If the queue is empty, wait till an element is avaiable.
38- sweep_scan_s dequeue () {
39- std::unique_lock<std::mutex> lock (the_mutex);
40- // wait until queue is not empty
41- while (the_queue.empty ()) {
42- // the_cond_var could wake up the thread spontaneously, even if the queue is still empty...
43- // so put this wakeup inside a while loop, such that the empty check is performed whenever it wakes up
44- the_cond_var.wait (lock); // release lock as long as the wait and reaquire it afterwards.
45- }
46- sweep_scan_s scan = the_queue.front ();
47- the_queue.pop ();
48- return scan;
49- }
50-
51- private:
52- int32_t max_size;
53- std::queue<sweep_scan_s> the_queue;
54- mutable std::mutex the_mutex;
55- std::condition_variable the_cond_var;
56- };
57-
589int32_t sweep_get_version (void ) { return SWEEP_VERSION; }
5910bool sweep_is_abi_compatible (void ) { return sweep_get_version () >> 16u == SWEEP_VERSION_MAJOR; }
6011
@@ -65,7 +16,7 @@ typedef struct sweep_error {
6516typedef struct sweep_device {
6617 sweep::serial::device_s serial; // serial port communication
6718 bool is_scanning;
68- std::unique_ptr<ScanQueue > scan_queue;
19+ sweep::queue::queue<sweep_scan_s > scan_queue;
6920 std::atomic<bool > stop_thread;
7021} sweep_device;
7122
@@ -119,8 +70,7 @@ sweep_device_s sweep_device_construct(const char* port, int32_t bitrate, sweep_e
11970 }
12071
12172 // initialize assuming the device is scanning
122- auto out =
123- new sweep_device{serial, /* is_scanning=*/ true , std::unique_ptr<ScanQueue>(new ScanQueue (20 )), /* stop_thread=*/ {false }};
73+ auto out = new sweep_device{serial, /* is_scanning=*/ true , /* scan_queue=*/ {20 }, /* stop_thread=*/ {false }};
12474
12575 // send a stop scanning command in case the scanner was powered on and scanning
12676 sweep_error_s stoperror = nullptr ;
@@ -192,7 +142,7 @@ void sweep_device_start_scanning(sweep_device_s device, sweep_error_s* error) {
192142 }
193143
194144 // Start SCAN WORKER
195- device->scan_queue -> flush ();
145+ device->scan_queue . clear ();
196146 device->is_scanning = true ;
197147 // START background worker thread
198148 device->stop_thread = false ;
@@ -287,7 +237,7 @@ sweep_scan_s sweep_device_get_scan(sweep_device_s device, sweep_error_s* error)
287237 SWEEP_ASSERT (device->is_scanning );
288238 (void )error;
289239
290- auto out = device->scan_queue -> dequeue ();
240+ auto out = device->scan_queue . dequeue ();
291241 return out;
292242}
293243
@@ -329,7 +279,7 @@ void sweep_device_accumulate_scans(sweep_device_s device) {
329279 }
330280
331281 // place the scan in the queue
332- device->scan_queue -> enqueue (out);
282+ device->scan_queue . enqueue (out);
333283
334284 // place the sync reading at the start for the next scan
335285 responses[0 ] = responses[received - 1 ];
0 commit comments