code_scanner/common/ipc_wrapper.cpp

373 lines
9.3 KiB
C++
Raw Normal View History

2022-12-05 08:03:17 +00:00
#include "ipc_wrapper.h"
#include <string.h>
#include <vector>
#include "log_util.h"
#include "ipc_util.h"
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ipc_wrapper_shm
class ipc_wrapper_shm : public ipc_wrapper
{
enum data_ind
{
DATA_INTER_CMD = 0,
};
enum internal_cmd
{
INTER_CMD_NONE = 0,
INTER_CMD_EXIT,
};
typedef struct _shm_pack
{
size_t space; // buffer length
size_t bytes; // data length
char data[4];
}SHMPACK, *LPSHMPACK;
typedef struct _sync_shm // 1-lock(write_lock) --> 2-write content --> 3-notify read --> 4-wait(wait_sent) --> 5-return step 2 if data has not sent finished, or else to 6 --> 6-release(write_lock) --> 7-over
{
sem_t wait_arrive; // read event
sem_t wait_sent; // peer has read, i can re-write now
sem_t notify_read; // notify peer to read
sem_t notify_write; // notify peer to write
char data[8]; // internal data
}SYNCSHM, *LPSYNCSHM;
volatile bool run_;
2022-12-28 02:40:59 +00:00
MUTEX write_lock_;
2022-12-05 08:03:17 +00:00
int32_t id_;
shared_mem* shm_;
LPSYNCSHM sync_shm_;
linux_event* wait_arrive_;
linux_event* wait_sent_;
linux_event* notify_write_;
linux_event* notify_read_;
THREAD_PTR thread_;
LPSHMPACK buf_in_;
LPSHMPACK buf_out_;
bool buf_out_used_;
volatile bool cancel_write_;
void create(const char* file, int32_t id, size_t bytes, unsigned sent_percent)
{
int32_t err = 0;
shm_ = new shared_mem();
err = shm_->open(id, &bytes, file);
if (err)
{
log_cls::log(LOG_LEVEL_FATAL, "ipc_wrapper_shm(%p) open shared memory(%s, %d, %s) failed: %s\n"
, this, file, id, sys_util::format_readable_bytes(bytes).c_str(), strerror(err));
shm_->release();
shm_ = nullptr;
return;
}
SHMPACK pack = { 0 };
std::string desc("ipc_wrapper_shm-");
char buf[80] = { 0 }, *ptr = shm_->get_mem(&pack.space);
int32_t pack_head = ALIGN_INT(sizeof(pack), 16), head = ALIGN_INT(sizeof(SYNCSHM), 16);
sprintf(buf, "%d-", id);
desc += buf;
sync_shm_ = (LPSYNCSHM)ptr;
ptr += head;
pack.space -= head + pack_head * 2;
head = ALIGN_INT(pack.space * sent_percent / 100, 16);
if (shm_->is_first())
{
SYNCSHM ss;
memcpy(sync_shm_, &ss, sizeof(ss));
buf_in_ = (LPSHMPACK)ptr;
buf_in_->space = pack.space - head;
ptr += buf_in_->space + pack_head;
buf_out_ = (LPSHMPACK)ptr;
buf_out_->space = head;
//*
wait_arrive_ = new linux_event(&sync_shm_->wait_arrive, true, (desc + "wait_arrive").c_str());
wait_sent_ = new linux_event(&sync_shm_->wait_sent, true, (desc + "wait_sent").c_str());
notify_read_ = new linux_event(&sync_shm_->notify_read, true, (desc + "notify_read").c_str());
notify_write_ = new linux_event(&sync_shm_->notify_write, true, (desc + "notify_write").c_str());
/*/
wait_arrive_ = new linux_event((desc + "wait_arrive").c_str(), "");
wait_sent_ = new linux_event((desc + "wait_sent").c_str(), "");
notify_write_ = new linux_event((desc + "notify_write").c_str(), "");
notify_read_ = new linux_event((desc + "notify_read").c_str(), "");
///*////////////
memset(sync_shm_->data, 0, sizeof(sync_shm_->data));
}
else
{
//*
wait_sent_ = new linux_event(&sync_shm_->notify_write, false, (desc + "wait_sent").c_str());
wait_arrive_ = new linux_event(&sync_shm_->notify_read, false, (desc + "wait_arrive").c_str());
notify_write_ = new linux_event(&sync_shm_->wait_sent, false, (desc + "notify_write").c_str());
notify_read_ = new linux_event(&sync_shm_->wait_arrive, false, (desc + "notify_read").c_str());
/*/
wait_arrive_ = new linux_event((desc + "notify_read").c_str(), "");
wait_sent_ = new linux_event((desc + "notify_write").c_str(), "");
notify_read_ = new linux_event((desc + "wait_arrive").c_str(), "");
notify_write_ = new linux_event((desc + "wait_sent").c_str(), "");
///*////////////
buf_out_ = (LPSHMPACK)ptr;
buf_in_ = (LPSHMPACK)(ptr + pack_head + buf_out_->space);
}
thread_.reset(new std::thread(&ipc_wrapper_shm::read_thread, this));
// handler_->on_event(event_handler::EVENT_WRITE, buf_out_->data, buf_size_);
}
void read_thread(void)
{
event_handler* handler = handler_;
handler->add_ref();
while (run_)
{
wait_arrive_->wait();
if (!run_)
break;
if (!shm_->is_first() && sync_shm_->data[DATA_INTER_CMD] == INTER_CMD_EXIT)
break;
2022-12-28 02:40:59 +00:00
handler->on_event(SCANNER_EVENT_IPC_DATA_RECEIVED, buf_in_->data, buf_in_->bytes);
2022-12-05 08:03:17 +00:00
notify_write_->trigger();
}
handler->release();
}
public:
ipc_wrapper_shm(const char* file, int32_t id
, size_t bytes
, event_handler* handler
, unsigned sent_percent = 50)
: ipc_wrapper(handler), run_(true), shm_(nullptr), sync_shm_(nullptr)
, wait_arrive_(nullptr), wait_sent_(nullptr), notify_write_(nullptr), notify_read_(nullptr)
, buf_in_(nullptr), buf_out_(nullptr), id_(id), buf_out_used_(false), cancel_write_(false)
{
create(file, id, bytes, sent_percent);
}
protected:
~ipc_wrapper_shm()
{}
public:
virtual int32_t write(const char* pack, size_t * bytes, bool kbuf, unsigned timeout = WAIT_INFINITE) override
{
if (!shm_)
return ENOTCONN;
// invoke in read-thread is not allowed
if(std::this_thread::get_id() == thread_->get_id())
return EDEADLOCK;
2022-12-28 02:40:59 +00:00
LOCKER lock(write_lock_);
2022-12-05 08:03:17 +00:00
size_t rest = *bytes;
int32_t ret = 0;
cancel_write_ = false;
if (kbuf)
{
buf_out_->bytes = rest;
notify_read_->trigger();
if (wait_sent_->wait(timeout))
rest = 0;
else
ret = ETIME;
}
else
{
chronograph timer;
while (rest)
{
buf_out_->bytes = rest > buf_out_->space ? buf_out_->space : rest;
memcpy(buf_out_->data, pack, buf_out_->bytes);
notify_read_->trigger();
pack += buf_out_->space;
if (!wait_sent_->wait(timeout))
break;
if (cancel_write_)
break;
if (rest <= buf_out_->space)
{
rest = 0;
break;
}
rest -= buf_out_->space;
if (timeout)
{
uint64_t t = timer.elapse_ms();
if (t >= timeout)
break;
timeout -= t;
timer.reset();
}
}
if (cancel_write_)
ret = ECANCELED;
else if (rest)
ret = ETIME;
}
*bytes -= rest;
return ret;
}
virtual bool cancel_write(void) override
{
cancel_write_ = true;
if (wait_sent_->is_waiting())
{
wait_sent_->trigger();
wait_sent_->reset(); // sure ?
}
return true;
}
virtual bool is_ok(void) override
{
return shm_ != nullptr;
}
virtual bool is_first(void) override
{
return shm_ && shm_->is_first();
}
virtual void* get_kbuf(size_t* bytes) override
{
if (!bytes)
return nullptr;
if (buf_out_used_)
{
*bytes = 0;
return nullptr;
}
buf_out_used_ = true;
*bytes = buf_out_->space;
return buf_out_->data;
}
virtual void release_kbuf(void* buf) override
{
buf_out_used_ = false;
}
virtual void clear_kernel_objects(void) override
{
std::string desc("ipc_wrapper_shm-");
char id[20] = { 0 };
sprintf(id, "%d-", id_);
desc += id;
linux_event::clear_named_event((desc + "wait_arrive").c_str());
linux_event::clear_named_event((desc + "wait_sent").c_str());
linux_event::clear_named_event((desc + "notify_write").c_str());
linux_event::clear_named_event((desc + "notify_read").c_str());
shm_->clear_kernel_object();
}
virtual int32_t stop(void) override
{
run_ = false;
if (shm_)
{
wait_arrive_->trigger();
if (thread_.get() && thread_->joinable())
thread_->join();
thread_.reset();
if (is_first())
{
sync_shm_->data[DATA_INTER_CMD] = INTER_CMD_EXIT;
notify_read_->trigger();
wait_sent_->wait(100);
}
notify_read_->release();
notify_read_ = nullptr;
notify_write_->release();
notify_write_ = nullptr;
wait_arrive_->release();
wait_arrive_ = nullptr;
wait_sent_->release();
wait_sent_ = nullptr;
sync_shm_ = nullptr;
buf_in_ = buf_out_ = nullptr;
shm_->close();
shm_->release();
}
shm_ = nullptr;
return ipc_wrapper::stop();
}
};
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ipc_wrapper
ipc_wrapper::ipc_wrapper(event_handler* handler) : handler_(handler)
{
if (handler_)
handler_->add_ref();
}
ipc_wrapper::~ipc_wrapper()
{}
void* ipc_wrapper::get_kbuf(size_t* bytes)
{
if (bytes)
*bytes = 0;
return nullptr;
}
void ipc_wrapper::release_kbuf(void* buf)
{}
void ipc_wrapper::clear_kernel_objects(void)
{}
bool ipc_wrapper::cancel_write(void)
{
return false;
}
int32_t ipc_wrapper::stop(void)
{
if (handler_)
handler_->release();
handler_ = nullptr;
return 0;
}
ipc_wrapper* ipc_wrapper::create_ipc(event_handler* handler, ipc_type type, const char* param)
{
if (type == IPC_SHARED_MEM)
{
// path-file:id:size[:write-ratio-> percent of size of sent buffer, default is 50, only valid in owner]
std::string file(param);
size_t pos = file.rfind(':');
std::vector<size_t> params;
while(pos != std::string::npos && params.size() < 3)
{
params.push_back(std::stold(file.substr(pos + 1)));
file.erase(pos);
pos = file.rfind(':');
}
if(params.size() >= 2)
{
params.insert(params.begin(), 50);
pos = params.size();
return dynamic_cast<ipc_wrapper*>(new ipc_wrapper_shm(file.c_str(), params[pos - 1], params[pos - 2], handler, params[pos - 3]));
}
}
return nullptr;
}