code_device/hgdriver/hgdev/BlockingQueue.h

280 lines
4.4 KiB
C++

#ifndef BLOCKING_QUEUE_H
#define BLOCKING_QUEUE_H
//#include <boost/thread/mutex.hpp>
//#include <boost/thread/condition_variable.hpp>
#ifdef WIN32
#include <Windows.h>
#endif
#include <mutex>
#include <condition_variable>
#include <deque>
#include <iostream>
#include <exception>
using namespace std;
template <typename T>
class BlockingQueue
{
private:
BlockingQueue(const BlockingQueue& rhs);
BlockingQueue& operator =(const BlockingQueue& rhs);
mutable std::mutex _mutex;
std::condition_variable _condvar;
deque<T> _queue;
bool isShutDown;
T tRet;
public:
BlockingQueue()
: _mutex()
, _condvar()
, _queue()
, isShutDown(false)
{
}
~BlockingQueue()
{
ShutDown();
std::cout << "blocking queue release" << std::endl;
}
void Clear()
{
lock_guard<mutex> lock(_mutex);
_condvar.notify_all();
_queue.clear();
}
void ShutDown()
{
isShutDown = true;
_condvar.notify_all();
_queue.clear();
}
bool IsShutDown()
{
return isShutDown;
}
void Put(const T task)
{
lock_guard<mutex> lock(_mutex);
if (!isShutDown)
{
{
_queue.push_back(task);
}
_condvar.notify_all();
}
}
T Take()
{
unique_lock<mutex> lock(_mutex);
if (_queue.size() <= 0)
_condvar.wait(lock);
if (isShutDown || _queue.empty())
{
return tRet;
}
T front(_queue.front());
_queue.pop_front();
return front;
}
T Front()
{
unique_lock<mutex> lock(_mutex);
if (_queue.size() <= 0)
_condvar.wait(lock);
if (isShutDown || _queue.empty())
{
return tRet;
}
T front(_queue.front());
return front;
}
size_t Size() const
{
lock_guard<mutex> lock(_mutex);
return _queue.size();
}
};
#include <string.h>
typedef struct _img_header
{
int width;
int height;
int bits;
int channels;
int line_bytes;
unsigned bytes;
}IMH;
typedef struct _img
{
IMH header;
unsigned offset;
std::shared_ptr<std::vector<unsigned char>> data;
}IMGDT;
class image_data
{
private:
image_data(const image_data& rhs);
image_data& operator =(const image_data& rhs);
mutable std::mutex _mutex;
std::condition_variable _condvar;
deque<IMGDT> _queue;
bool isShutDown;
IMGDT tRet;
public:
image_data()
: _mutex()
, _condvar()
, _queue()
, isShutDown(false)
{
}
~image_data()
{
ShutDown();
std::cout << "blocking queue release" << std::endl;
}
void Clear()
{
lock_guard<mutex> lock(_mutex);
_condvar.notify_all();
_queue.clear();
}
void ShutDown()
{
isShutDown = true;
_condvar.notify_all();
_queue.clear();
}
bool IsShutDown()
{
return isShutDown;
}
void Put(const IMGDT task)
{
lock_guard<mutex> lock(_mutex);
if (!isShutDown)
{
{
_queue.push_back(task);
}
_condvar.notify_all();
}
}
void put(int w, int h, int bpp, int channels, int line_bytes, void* data, unsigned bytes)
{
IMGDT img;
img.header.width = w;
img.header.height = h;
img.header.bits = bpp;
img.header.channels = channels;
img.header.line_bytes = line_bytes;
img.offset = 0;
img.header.bytes = bytes;
img.data.reset(new std::vector<unsigned char>);
img.data->resize(bytes);
memcpy(img.data->data(), data, bytes);
Put(img);
}
IMGDT Take()
{
unique_lock<mutex> lock(_mutex);
if (_queue.size() <= 0)
_condvar.wait(lock);
if (isShutDown || _queue.empty())
{
return tRet;
}
IMGDT front(_queue.front());
_queue.pop_front();
return front;
}
IMGDT Front()
{
unique_lock<mutex> lock(_mutex);
if (_queue.size() <= 0)
_condvar.wait(lock);
if (isShutDown || _queue.empty())
{
return tRet;
}
IMGDT front(_queue.front());
return front;
}
bool front(IMH* header)
{
unique_lock<mutex> lock(_mutex);
if (_queue.size() <= 0)
_condvar.wait(lock);
if (isShutDown || _queue.empty())
{
return false;
}
memcpy(header, &_queue.front().header, sizeof(*header));
return true;
}
size_t Size() const
{
lock_guard<mutex> lock(_mutex);
return _queue.size();
}
void fetch_front(void* buf, int* len, bool* over)
{
if (Size() == 0)
*len = 0;
else
{
lock_guard<mutex> lock(_mutex);
if (_queue.front().data->size() - _queue.front().offset <= *len)
{
*len = _queue.front().data->size() - _queue.front().offset;
memcpy(buf, _queue.front().data->data() + _queue.front().offset, *len);
_queue.pop_front();
if (over)
*over = true;
}
else
{
memcpy(buf, _queue.front().data->data() + _queue.front().offset, *len);
_queue.front().offset += *len;
if (over)
*over = false;
}
}
}
};
#endif