#include "ipc_wrapper.h" #include #include #include "log_util.h" #include "ipc_util.h" ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // ipc_wrapper_shm class ipc_wrapper_shm : public ipc_wrapper { enum data_ind { DATA_INTER_CMD = 0, }; enum internal_cmd { INTER_CMD_NONE = 0, INTER_CMD_EXIT, }; typedef struct _shm_pack { size_t space; // buffer length size_t bytes; // data length char data[4]; }SHMPACK, *LPSHMPACK; typedef struct _sync_shm // 1-lock(write_lock) --> 2-write content --> 3-notify read --> 4-wait(wait_sent) --> 5-return step 2 if data has not sent finished, or else to 6 --> 6-release(write_lock) --> 7-over { sem_t wait_arrive; // read event sem_t wait_sent; // peer has read, i can re-write now sem_t notify_read; // notify peer to read sem_t notify_write; // notify peer to write char data[8]; // internal data }SYNCSHM, *LPSYNCSHM; volatile bool run_; MUTEX write_lock_; int32_t id_; shared_mem* shm_; LPSYNCSHM sync_shm_; linux_event* wait_arrive_; linux_event* wait_sent_; linux_event* notify_write_; linux_event* notify_read_; THREAD_PTR thread_; LPSHMPACK buf_in_; LPSHMPACK buf_out_; bool buf_out_used_; volatile bool cancel_write_; void create(const char* file, int32_t id, size_t bytes, unsigned sent_percent) { int32_t err = 0; shm_ = new shared_mem(); err = shm_->open(id, &bytes, file); if (err) { log_cls::log(LOG_LEVEL_FATAL, "ipc_wrapper_shm(%p) open shared memory(%s, %d, %s) failed: %s\n" , this, file, id, sys_util::format_readable_bytes(bytes).c_str(), strerror(err)); shm_->release(); shm_ = nullptr; return; } SHMPACK pack = { 0 }; std::string desc("ipc_wrapper_shm-"); char buf[80] = { 0 }, *ptr = shm_->get_mem(&pack.space); int32_t pack_head = ALIGN_INT(sizeof(pack), 16), head = ALIGN_INT(sizeof(SYNCSHM), 16); sprintf(buf, "%d-", id); desc += buf; sync_shm_ = (LPSYNCSHM)ptr; ptr += head; pack.space -= head + pack_head * 2; head = ALIGN_INT(pack.space * sent_percent / 100, 16); if (shm_->is_first()) { SYNCSHM ss; memcpy(sync_shm_, &ss, sizeof(ss)); buf_in_ = (LPSHMPACK)ptr; buf_in_->space = pack.space - head; ptr += buf_in_->space + pack_head; buf_out_ = (LPSHMPACK)ptr; buf_out_->space = head; //* wait_arrive_ = new linux_event(&sync_shm_->wait_arrive, true, (desc + "wait_arrive").c_str()); wait_sent_ = new linux_event(&sync_shm_->wait_sent, true, (desc + "wait_sent").c_str()); notify_read_ = new linux_event(&sync_shm_->notify_read, true, (desc + "notify_read").c_str()); notify_write_ = new linux_event(&sync_shm_->notify_write, true, (desc + "notify_write").c_str()); /*/ wait_arrive_ = new linux_event((desc + "wait_arrive").c_str(), ""); wait_sent_ = new linux_event((desc + "wait_sent").c_str(), ""); notify_write_ = new linux_event((desc + "notify_write").c_str(), ""); notify_read_ = new linux_event((desc + "notify_read").c_str(), ""); ///*//////////// memset(sync_shm_->data, 0, sizeof(sync_shm_->data)); } else { //* wait_sent_ = new linux_event(&sync_shm_->notify_write, false, (desc + "wait_sent").c_str()); wait_arrive_ = new linux_event(&sync_shm_->notify_read, false, (desc + "wait_arrive").c_str()); notify_write_ = new linux_event(&sync_shm_->wait_sent, false, (desc + "notify_write").c_str()); notify_read_ = new linux_event(&sync_shm_->wait_arrive, false, (desc + "notify_read").c_str()); /*/ wait_arrive_ = new linux_event((desc + "notify_read").c_str(), ""); wait_sent_ = new linux_event((desc + "notify_write").c_str(), ""); notify_read_ = new linux_event((desc + "wait_arrive").c_str(), ""); notify_write_ = new linux_event((desc + "wait_sent").c_str(), ""); ///*//////////// buf_out_ = (LPSHMPACK)ptr; buf_in_ = (LPSHMPACK)(ptr + pack_head + buf_out_->space); } thread_.reset(new std::thread(&ipc_wrapper_shm::read_thread, this)); // handler_->on_event(event_handler::EVENT_WRITE, buf_out_->data, buf_size_); } void read_thread(void) { event_handler* handler = handler_; handler->add_ref(); while (run_) { wait_arrive_->wait(); if (!run_) break; if (!shm_->is_first() && sync_shm_->data[DATA_INTER_CMD] == INTER_CMD_EXIT) break; handler->on_event(SCANNER_EVENT_IPC_DATA_RECEIVED, buf_in_->data, buf_in_->bytes); notify_write_->trigger(); } handler->release(); } public: ipc_wrapper_shm(const char* file, int32_t id , size_t bytes , event_handler* handler , unsigned sent_percent = 50) : ipc_wrapper(handler), run_(true), shm_(nullptr), sync_shm_(nullptr) , wait_arrive_(nullptr), wait_sent_(nullptr), notify_write_(nullptr), notify_read_(nullptr) , buf_in_(nullptr), buf_out_(nullptr), id_(id), buf_out_used_(false), cancel_write_(false) { create(file, id, bytes, sent_percent); } protected: ~ipc_wrapper_shm() {} public: virtual int32_t write(const char* pack, size_t * bytes, bool kbuf, unsigned timeout = WAIT_INFINITE) override { if (!shm_) return ENOTCONN; // invoke in read-thread is not allowed if(std::this_thread::get_id() == thread_->get_id()) return EDEADLOCK; LOCKER lock(write_lock_); size_t rest = *bytes; int32_t ret = 0; cancel_write_ = false; if (kbuf) { buf_out_->bytes = rest; notify_read_->trigger(); if (wait_sent_->wait(timeout)) rest = 0; else ret = ETIME; } else { chronograph timer; while (rest) { buf_out_->bytes = rest > buf_out_->space ? buf_out_->space : rest; memcpy(buf_out_->data, pack, buf_out_->bytes); notify_read_->trigger(); pack += buf_out_->space; if (!wait_sent_->wait(timeout)) break; if (cancel_write_) break; if (rest <= buf_out_->space) { rest = 0; break; } rest -= buf_out_->space; if (timeout) { uint64_t t = timer.elapse_ms(); if (t >= timeout) break; timeout -= t; timer.reset(); } } if (cancel_write_) ret = ECANCELED; else if (rest) ret = ETIME; } *bytes -= rest; return ret; } virtual bool cancel_write(void) override { cancel_write_ = true; if (wait_sent_->is_waiting()) { wait_sent_->trigger(); wait_sent_->reset(); // sure ? } return true; } virtual bool is_ok(void) override { return shm_ != nullptr; } virtual bool is_first(void) override { return shm_ && shm_->is_first(); } virtual void* get_kbuf(size_t* bytes) override { if (!bytes) return nullptr; if (buf_out_used_) { *bytes = 0; return nullptr; } buf_out_used_ = true; *bytes = buf_out_->space; return buf_out_->data; } virtual void release_kbuf(void* buf) override { buf_out_used_ = false; } virtual void clear_kernel_objects(void) override { std::string desc("ipc_wrapper_shm-"); char id[20] = { 0 }; sprintf(id, "%d-", id_); desc += id; linux_event::clear_named_event((desc + "wait_arrive").c_str()); linux_event::clear_named_event((desc + "wait_sent").c_str()); linux_event::clear_named_event((desc + "notify_write").c_str()); linux_event::clear_named_event((desc + "notify_read").c_str()); shm_->clear_kernel_object(); } virtual int32_t stop(void) override { run_ = false; if (shm_) { wait_arrive_->trigger(); if (thread_.get() && thread_->joinable()) thread_->join(); thread_.reset(); if (is_first()) { sync_shm_->data[DATA_INTER_CMD] = INTER_CMD_EXIT; notify_read_->trigger(); wait_sent_->wait(100); } notify_read_->release(); notify_read_ = nullptr; notify_write_->release(); notify_write_ = nullptr; wait_arrive_->release(); wait_arrive_ = nullptr; wait_sent_->release(); wait_sent_ = nullptr; sync_shm_ = nullptr; buf_in_ = buf_out_ = nullptr; shm_->close(); shm_->release(); } shm_ = nullptr; return ipc_wrapper::stop(); } }; ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // ipc_wrapper ipc_wrapper::ipc_wrapper(event_handler* handler) : handler_(handler) { if (handler_) handler_->add_ref(); } ipc_wrapper::~ipc_wrapper() {} void* ipc_wrapper::get_kbuf(size_t* bytes) { if (bytes) *bytes = 0; return nullptr; } void ipc_wrapper::release_kbuf(void* buf) {} void ipc_wrapper::clear_kernel_objects(void) {} bool ipc_wrapper::cancel_write(void) { return false; } int32_t ipc_wrapper::stop(void) { if (handler_) handler_->release(); handler_ = nullptr; return 0; } ipc_wrapper* ipc_wrapper::create_ipc(event_handler* handler, ipc_type type, const char* param) { if (type == IPC_SHARED_MEM) { // path-file:id:size[:write-ratio-> percent of size of sent buffer, default is 50, only valid in owner] std::string file(param); size_t pos = file.rfind(':'); std::vector params; while(pos != std::string::npos && params.size() < 3) { params.push_back(std::stold(file.substr(pos + 1))); file.erase(pos); pos = file.rfind(':'); } if(params.size() >= 2) { params.insert(params.begin(), 50); pos = params.size(); return dynamic_cast(new ipc_wrapper_shm(file.c_str(), params[pos - 1], params[pos - 2], handler, params[pos - 3])); } } return nullptr; }