#ifndef BLOCKING_QUEUE_H #define BLOCKING_QUEUE_H //#include //#include #ifdef WIN32 #include #endif #include #include #include #include #include using namespace std; template class BlockingQueue { private: BlockingQueue(const BlockingQueue& rhs); BlockingQueue& operator =(const BlockingQueue& rhs); mutable std::mutex _mutex; std::condition_variable _condvar; deque _queue; bool isShutDown; T tRet; public: BlockingQueue() : _mutex() , _condvar() , _queue() , isShutDown(false) { } ~BlockingQueue() { ShutDown(); std::cout << "blocking queue release" << std::endl; } void Clear() { lock_guard lock(_mutex); _condvar.notify_all(); _queue.clear(); } void ShutDown() { isShutDown = true; _condvar.notify_all(); _queue.clear(); } bool IsShutDown() { return isShutDown; } void Put(const T task) { lock_guard lock(_mutex); if (!isShutDown) { { _queue.push_back(task); } _condvar.notify_all(); } } T Take() { unique_lock lock(_mutex); if (_queue.size() <= 0) _condvar.wait(lock); if (isShutDown || _queue.empty()) { return tRet; } T front(_queue.front()); _queue.pop_front(); return front; } T Front() { unique_lock lock(_mutex); if (_queue.size() <= 0) _condvar.wait(lock); if (isShutDown || _queue.empty()) { return tRet; } T front(_queue.front()); return front; } size_t Size() const { lock_guard lock(_mutex); return _queue.size(); } }; #include typedef struct _img_header { int width; int height; int bits; int channels; int line_bytes; unsigned bytes; }IMH; typedef struct _img { IMH header; unsigned offset; std::shared_ptr> data; }IMGDT; class image_data { private: image_data(const image_data& rhs); image_data& operator =(const image_data& rhs); mutable std::mutex _mutex; std::condition_variable _condvar; deque _queue; bool isShutDown; IMGDT tRet; public: image_data() : _mutex() , _condvar() , _queue() , isShutDown(false) { } ~image_data() { ShutDown(); std::cout << "blocking queue release" << std::endl; } void Clear() { lock_guard lock(_mutex); _condvar.notify_all(); _queue.clear(); } void ShutDown() { isShutDown = true; _condvar.notify_all(); _queue.clear(); } bool IsShutDown() { return isShutDown; } void Put(const IMGDT task) { lock_guard lock(_mutex); if (!isShutDown) { { _queue.push_back(task); } _condvar.notify_all(); } } void put(int w, int h, int bpp, int channels, int line_bytes, void* data, unsigned bytes) { IMGDT img; img.header.width = w; img.header.height = h; img.header.bits = bpp; img.header.channels = channels; img.header.line_bytes = line_bytes; img.offset = 0; img.header.bytes = bytes; img.data.reset(new std::vector); img.data->resize(bytes); memcpy(img.data->data(), data, bytes); Put(img); } IMGDT Take() { unique_lock lock(_mutex); if (_queue.size() <= 0) _condvar.wait(lock); if (isShutDown || _queue.empty()) { return tRet; } IMGDT front(_queue.front()); _queue.pop_front(); return front; } IMGDT Front() { unique_lock lock(_mutex); if (_queue.size() <= 0) _condvar.wait(lock); if (isShutDown || _queue.empty()) { return tRet; } IMGDT front(_queue.front()); return front; } bool front(IMH* header) { unique_lock lock(_mutex); if (_queue.size() <= 0) _condvar.wait(lock); if (isShutDown || _queue.empty()) { return false; } memcpy(header, &_queue.front().header, sizeof(*header)); return true; } size_t Size() const { lock_guard lock(_mutex); return _queue.size(); } void fetch_front(void* buf, int* len, bool* over) { if (Size() == 0) *len = 0; else { lock_guard lock(_mutex); if (_queue.front().data->size() - _queue.front().offset <= *len) { *len = _queue.front().data->size() - _queue.front().offset; memcpy(buf, _queue.front().data->data() + _queue.front().offset, *len); _queue.pop_front(); if (over) *over = true; } else { memcpy(buf, _queue.front().data->data() + _queue.front().offset, *len); _queue.front().offset += *len; if (over) *over = false; } } } }; #endif