Skip to content

Commit 1c13b2c

Browse files
committed
Merge pull request #275 from xlz/transfer-pool
Fix transfer pool thread safety
2 parents 78ea54f + 229ee33 commit 1c13b2c

File tree

3 files changed

+73
-45
lines changed

3 files changed

+73
-45
lines changed

examples/protonect/include/libfreenect2/usb/transfer_pool.h

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,11 @@
2727
#ifndef TRANSFER_POOL_H_
2828
#define TRANSFER_POOL_H_
2929

30-
#include <deque>
30+
#include <vector>
3131
#include <libusb.h>
3232

3333
#include <libfreenect2/data_callback.h>
34+
#include <libfreenect2/threading.h>
3435

3536
namespace libfreenect2
3637
{
@@ -56,6 +57,26 @@ class TransferPool
5657

5758
void setCallback(DataCallback *callback);
5859
protected:
60+
libfreenect2::mutex stopped_mutex;
61+
struct Transfer
62+
{
63+
libusb_transfer *transfer;
64+
TransferPool *pool;
65+
bool stopped;
66+
Transfer(libusb_transfer *transfer, TransferPool *pool):
67+
transfer(transfer), pool(pool), stopped(true) {}
68+
void setStopped(bool value)
69+
{
70+
libfreenect2::lock_guard guard(pool->stopped_mutex);
71+
stopped = value;
72+
}
73+
bool getStopped()
74+
{
75+
libfreenect2::lock_guard guard(pool->stopped_mutex);
76+
return stopped;
77+
}
78+
};
79+
5980
void allocateTransfers(size_t num_transfers, size_t transfer_size);
6081

6182
virtual libusb_transfer *allocateTransfer() = 0;
@@ -65,20 +86,20 @@ class TransferPool
6586

6687
DataCallback *callback_;
6788
private:
68-
typedef std::deque<libusb_transfer *> TransferQueue;
89+
typedef std::vector<Transfer> TransferQueue;
6990

7091
libusb_device_handle *device_handle_;
7192
unsigned char device_endpoint_;
7293

73-
TransferQueue idle_transfers_, pending_transfers_;
94+
TransferQueue transfers_;
7495
unsigned char *buffer_;
7596
size_t buffer_size_;
7697

7798
bool enable_submit_;
7899

79100
static void onTransferCompleteStatic(libusb_transfer *transfer);
80101

81-
void onTransferComplete(libusb_transfer *transfer);
102+
void onTransferComplete(Transfer *transfer);
82103
};
83104

84105
class BulkTransferPool : public TransferPool

examples/protonect/src/libfreenect2.cpp

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -566,10 +566,6 @@ void Freenect2DeviceImpl::stop()
566566
rgb_transfer_pool_.cancel();
567567
ir_transfer_pool_.cancel();
568568

569-
// wait for completion of transfer cancelation
570-
// TODO: better implementation
571-
libfreenect2::this_thread::sleep_for(libfreenect2::chrono::milliseconds(1500));
572-
573569
usb_control_.setIrInterfaceState(UsbControl::Disabled);
574570

575571
CommandTransaction::Result result;

examples/protonect/src/transfer_pool.cpp

Lines changed: 48 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626

2727
#include <libfreenect2/usb/transfer_pool.h>
2828
#include <iostream>
29-
#include <algorithm>
3029

3130
namespace libfreenect2
3231
{
@@ -60,11 +59,11 @@ void TransferPool::disableSubmission()
6059

6160
void TransferPool::deallocate()
6261
{
63-
for(TransferQueue::iterator it = idle_transfers_.begin(); it != idle_transfers_.end(); ++it)
62+
for(TransferQueue::iterator it = transfers_.begin(); it != transfers_.end(); ++it)
6463
{
65-
libusb_free_transfer(*it);
64+
libusb_free_transfer(it->transfer);
6665
}
67-
idle_transfers_.clear();
66+
transfers_.clear();
6867

6968
if(buffer_ != 0)
7069
{
@@ -82,44 +81,49 @@ void TransferPool::submit(size_t num_parallel_transfers)
8281
return;
8382
}
8483

85-
if(idle_transfers_.size() < num_parallel_transfers)
84+
if(transfers_.size() < num_parallel_transfers)
8685
{
8786
std::cerr << "[TransferPool::submit] too few idle transfers!" << std::endl;
8887
}
8988

9089
for(size_t i = 0; i < num_parallel_transfers; ++i)
9190
{
92-
libusb_transfer *transfer = idle_transfers_.front();
93-
idle_transfers_.pop_front();
91+
libusb_transfer *transfer = transfers_[i].transfer;
92+
transfers_[i].setStopped(false);
9493

9594
int r = libusb_submit_transfer(transfer);
9695

97-
// put transfer in pending queue on success otherwise put it back in the idle queue
98-
if(r == LIBUSB_SUCCESS)
99-
{
100-
pending_transfers_.push_back(transfer);
101-
}
102-
else
96+
if(r != LIBUSB_SUCCESS)
10397
{
104-
idle_transfers_.push_back(transfer);
105-
std::cerr << "[TransferPool::submit] failed to submit transfer" << std::endl;
98+
std::cerr << "[TransferPool::submit] failed to submit transfer: " << libusb_error_name(r) << std::endl;
99+
transfers_[i].setStopped(true);
106100
}
107101
}
108102
}
109103

110104
void TransferPool::cancel()
111105
{
112-
for(TransferQueue::iterator it = pending_transfers_.begin(); it != pending_transfers_.end(); ++it)
106+
for(TransferQueue::iterator it = transfers_.begin(); it != transfers_.end(); ++it)
113107
{
114-
int r = libusb_cancel_transfer(*it);
108+
int r = libusb_cancel_transfer(it->transfer);
115109

116-
if(r != LIBUSB_SUCCESS)
110+
if(r != LIBUSB_SUCCESS && r != LIBUSB_ERROR_NOT_FOUND)
117111
{
118-
// TODO: error reporting
112+
std::cerr << "[TransferPool::cancel] failed to cancel transfer: " << libusb_error_name(r) << std::endl;
119113
}
120114
}
121115

122-
//idle_transfers_.insert(idle_transfers_.end(), pending_transfers_.begin(), pending_transfers_.end());
116+
for(;;)
117+
{
118+
libfreenect2::this_thread::sleep_for(libfreenect2::chrono::milliseconds(100));
119+
size_t stopped_transfers = 0;
120+
for(TransferQueue::iterator it = transfers_.begin(); it != transfers_.end(); ++it)
121+
stopped_transfers += it->getStopped();
122+
if (stopped_transfers == transfers_.size())
123+
break;
124+
std::cerr << "[TransferPool::cancel] waiting for transfer cancellation" << std::endl;
125+
libfreenect2::this_thread::sleep_for(libfreenect2::chrono::milliseconds(1000));
126+
}
123127
}
124128

125129
void TransferPool::setCallback(DataCallback *callback)
@@ -131,6 +135,7 @@ void TransferPool::allocateTransfers(size_t num_transfers, size_t transfer_size)
131135
{
132136
buffer_size_ = num_transfers * transfer_size;
133137
buffer_ = new unsigned char[buffer_size_];
138+
transfers_.reserve(num_transfers);
134139

135140
unsigned char *ptr = buffer_;
136141

@@ -139,45 +144,51 @@ void TransferPool::allocateTransfers(size_t num_transfers, size_t transfer_size)
139144
libusb_transfer *transfer = allocateTransfer();
140145
fillTransfer(transfer);
141146

147+
transfers_.push_back(TransferPool::Transfer(transfer, this));
148+
142149
transfer->dev_handle = device_handle_;
143150
transfer->endpoint = device_endpoint_;
144151
transfer->buffer = ptr;
145152
transfer->length = transfer_size;
146153
transfer->timeout = 1000;
147154
transfer->callback = (libusb_transfer_cb_fn) &TransferPool::onTransferCompleteStatic;
148-
transfer->user_data = this;
149-
150-
idle_transfers_.push_back(transfer);
155+
transfer->user_data = &transfers_.back();
151156

152157
ptr += transfer_size;
153158
}
154159
}
155160

156161
void TransferPool::onTransferCompleteStatic(libusb_transfer* transfer)
157162
{
158-
reinterpret_cast<TransferPool*>(transfer->user_data)->onTransferComplete(transfer);
163+
TransferPool::Transfer *t = reinterpret_cast<TransferPool::Transfer*>(transfer->user_data);
164+
t->pool->onTransferComplete(t);
159165
}
160166

161-
void TransferPool::onTransferComplete(libusb_transfer* transfer)
167+
void TransferPool::onTransferComplete(TransferPool::Transfer* t)
162168
{
163-
// remove transfer from pending queue - should be fast as it is somewhere at the front
164-
TransferQueue::iterator it = std::find(pending_transfers_.begin(), pending_transfers_.end(), transfer);
165-
166-
if(it == pending_transfers_.end())
169+
if(t->transfer->status == LIBUSB_TRANSFER_CANCELLED)
167170
{
168-
// TODO: error reporting
171+
t->setStopped(true);
172+
return;
169173
}
170174

171-
pending_transfers_.erase(it);
172-
173175
// process data
174-
processTransfer(transfer);
176+
processTransfer(t->transfer);
175177

176-
// put transfer back in idle queue
177-
idle_transfers_.push_back(transfer);
178+
if(!enable_submit_)
179+
{
180+
t->setStopped(true);
181+
return;
182+
}
183+
184+
// resubmit self
185+
int r = libusb_submit_transfer(t->transfer);
178186

179-
// submit new transfer
180-
submit(1);
187+
if(r != LIBUSB_SUCCESS)
188+
{
189+
std::cerr << "[TransferPool::onTransferComplete] failed to submit transfer: " << libusb_error_name(r) << std::endl;
190+
t->setStopped(true);
191+
}
181192
}
182193

183194
BulkTransferPool::BulkTransferPool(libusb_device_handle* device_handle, unsigned char device_endpoint) :

0 commit comments

Comments
 (0)