From 229ee33ec5861ea14e5256825f4b50746179bb84 Mon Sep 17 00:00:00 2001 From: Lingzhu Xiang Date: Sat, 2 May 2015 15:33:49 -0400 Subject: [PATCH] Fix transfer pool thread safety Avoid unsafe access during transfer resubmission by refactoring TransferPool using std::vector. Wait for all transfers during cancellation. --- .../include/libfreenect2/usb/transfer_pool.h | 29 ++++++- examples/protonect/src/libfreenect2.cpp | 4 - examples/protonect/src/transfer_pool.cpp | 85 +++++++++++-------- 3 files changed, 73 insertions(+), 45 deletions(-) diff --git a/examples/protonect/include/libfreenect2/usb/transfer_pool.h b/examples/protonect/include/libfreenect2/usb/transfer_pool.h index 3a0dbef56..e4091a97a 100644 --- a/examples/protonect/include/libfreenect2/usb/transfer_pool.h +++ b/examples/protonect/include/libfreenect2/usb/transfer_pool.h @@ -27,10 +27,11 @@ #ifndef TRANSFER_POOL_H_ #define TRANSFER_POOL_H_ -#include +#include #include #include +#include namespace libfreenect2 { @@ -56,6 +57,26 @@ class TransferPool void setCallback(DataCallback *callback); protected: + libfreenect2::mutex stopped_mutex; + struct Transfer + { + libusb_transfer *transfer; + TransferPool *pool; + bool stopped; + Transfer(libusb_transfer *transfer, TransferPool *pool): + transfer(transfer), pool(pool), stopped(true) {} + void setStopped(bool value) + { + libfreenect2::lock_guard guard(pool->stopped_mutex); + stopped = value; + } + bool getStopped() + { + libfreenect2::lock_guard guard(pool->stopped_mutex); + return stopped; + } + }; + void allocateTransfers(size_t num_transfers, size_t transfer_size); virtual libusb_transfer *allocateTransfer() = 0; @@ -65,12 +86,12 @@ class TransferPool DataCallback *callback_; private: - typedef std::deque TransferQueue; + typedef std::vector TransferQueue; libusb_device_handle *device_handle_; unsigned char device_endpoint_; - TransferQueue idle_transfers_, pending_transfers_; + TransferQueue transfers_; unsigned char *buffer_; size_t buffer_size_; @@ -78,7 +99,7 @@ class TransferPool static void onTransferCompleteStatic(libusb_transfer *transfer); - void onTransferComplete(libusb_transfer *transfer); + void onTransferComplete(Transfer *transfer); }; class BulkTransferPool : public TransferPool diff --git a/examples/protonect/src/libfreenect2.cpp b/examples/protonect/src/libfreenect2.cpp index 28ae3b3e4..9524fd6ca 100644 --- a/examples/protonect/src/libfreenect2.cpp +++ b/examples/protonect/src/libfreenect2.cpp @@ -566,10 +566,6 @@ void Freenect2DeviceImpl::stop() rgb_transfer_pool_.cancel(); ir_transfer_pool_.cancel(); - // wait for completion of transfer cancelation - // TODO: better implementation - libfreenect2::this_thread::sleep_for(libfreenect2::chrono::milliseconds(1500)); - usb_control_.setIrInterfaceState(UsbControl::Disabled); CommandTransaction::Result result; diff --git a/examples/protonect/src/transfer_pool.cpp b/examples/protonect/src/transfer_pool.cpp index 60c4358b1..b324659a4 100644 --- a/examples/protonect/src/transfer_pool.cpp +++ b/examples/protonect/src/transfer_pool.cpp @@ -26,7 +26,6 @@ #include #include -#include namespace libfreenect2 { @@ -60,11 +59,11 @@ void TransferPool::disableSubmission() void TransferPool::deallocate() { - for(TransferQueue::iterator it = idle_transfers_.begin(); it != idle_transfers_.end(); ++it) + for(TransferQueue::iterator it = transfers_.begin(); it != transfers_.end(); ++it) { - libusb_free_transfer(*it); + libusb_free_transfer(it->transfer); } - idle_transfers_.clear(); + transfers_.clear(); if(buffer_ != 0) { @@ -82,44 +81,49 @@ void TransferPool::submit(size_t num_parallel_transfers) return; } - if(idle_transfers_.size() < num_parallel_transfers) + if(transfers_.size() < num_parallel_transfers) { std::cerr << "[TransferPool::submit] too few idle transfers!" << std::endl; } for(size_t i = 0; i < num_parallel_transfers; ++i) { - libusb_transfer *transfer = idle_transfers_.front(); - idle_transfers_.pop_front(); + libusb_transfer *transfer = transfers_[i].transfer; + transfers_[i].setStopped(false); int r = libusb_submit_transfer(transfer); - // put transfer in pending queue on success otherwise put it back in the idle queue - if(r == LIBUSB_SUCCESS) - { - pending_transfers_.push_back(transfer); - } - else + if(r != LIBUSB_SUCCESS) { - idle_transfers_.push_back(transfer); - std::cerr << "[TransferPool::submit] failed to submit transfer" << std::endl; + std::cerr << "[TransferPool::submit] failed to submit transfer: " << libusb_error_name(r) << std::endl; + transfers_[i].setStopped(true); } } } void TransferPool::cancel() { - for(TransferQueue::iterator it = pending_transfers_.begin(); it != pending_transfers_.end(); ++it) + for(TransferQueue::iterator it = transfers_.begin(); it != transfers_.end(); ++it) { - int r = libusb_cancel_transfer(*it); + int r = libusb_cancel_transfer(it->transfer); - if(r != LIBUSB_SUCCESS) + if(r != LIBUSB_SUCCESS && r != LIBUSB_ERROR_NOT_FOUND) { - // TODO: error reporting + std::cerr << "[TransferPool::cancel] failed to cancel transfer: " << libusb_error_name(r) << std::endl; } } - //idle_transfers_.insert(idle_transfers_.end(), pending_transfers_.begin(), pending_transfers_.end()); + for(;;) + { + libfreenect2::this_thread::sleep_for(libfreenect2::chrono::milliseconds(100)); + size_t stopped_transfers = 0; + for(TransferQueue::iterator it = transfers_.begin(); it != transfers_.end(); ++it) + stopped_transfers += it->getStopped(); + if (stopped_transfers == transfers_.size()) + break; + std::cerr << "[TransferPool::cancel] waiting for transfer cancellation" << std::endl; + libfreenect2::this_thread::sleep_for(libfreenect2::chrono::milliseconds(1000)); + } } void TransferPool::setCallback(DataCallback *callback) @@ -131,6 +135,7 @@ void TransferPool::allocateTransfers(size_t num_transfers, size_t transfer_size) { buffer_size_ = num_transfers * transfer_size; buffer_ = new unsigned char[buffer_size_]; + transfers_.reserve(num_transfers); unsigned char *ptr = buffer_; @@ -139,15 +144,15 @@ void TransferPool::allocateTransfers(size_t num_transfers, size_t transfer_size) libusb_transfer *transfer = allocateTransfer(); fillTransfer(transfer); + transfers_.push_back(TransferPool::Transfer(transfer, this)); + transfer->dev_handle = device_handle_; transfer->endpoint = device_endpoint_; transfer->buffer = ptr; transfer->length = transfer_size; transfer->timeout = 1000; transfer->callback = (libusb_transfer_cb_fn) &TransferPool::onTransferCompleteStatic; - transfer->user_data = this; - - idle_transfers_.push_back(transfer); + transfer->user_data = &transfers_.back(); ptr += transfer_size; } @@ -155,29 +160,35 @@ void TransferPool::allocateTransfers(size_t num_transfers, size_t transfer_size) void TransferPool::onTransferCompleteStatic(libusb_transfer* transfer) { - reinterpret_cast(transfer->user_data)->onTransferComplete(transfer); + TransferPool::Transfer *t = reinterpret_cast(transfer->user_data); + t->pool->onTransferComplete(t); } -void TransferPool::onTransferComplete(libusb_transfer* transfer) +void TransferPool::onTransferComplete(TransferPool::Transfer* t) { - // remove transfer from pending queue - should be fast as it is somewhere at the front - TransferQueue::iterator it = std::find(pending_transfers_.begin(), pending_transfers_.end(), transfer); - - if(it == pending_transfers_.end()) + if(t->transfer->status == LIBUSB_TRANSFER_CANCELLED) { - // TODO: error reporting + t->setStopped(true); + return; } - pending_transfers_.erase(it); - // process data - processTransfer(transfer); + processTransfer(t->transfer); - // put transfer back in idle queue - idle_transfers_.push_back(transfer); + if(!enable_submit_) + { + t->setStopped(true); + return; + } + + // resubmit self + int r = libusb_submit_transfer(t->transfer); - // submit new transfer - submit(1); + if(r != LIBUSB_SUCCESS) + { + std::cerr << "[TransferPool::onTransferComplete] failed to submit transfer: " << libusb_error_name(r) << std::endl; + t->setStopped(true); + } } BulkTransferPool::BulkTransferPool(libusb_device_handle* device_handle, unsigned char device_endpoint) :