From 6d1f12e55a8b69b0f46bca5a8b48565a64c59f06 Mon Sep 17 00:00:00 2001 From: gb <741021719@qq.com> Date: Mon, 5 Dec 2022 16:03:17 +0800 Subject: [PATCH] base, shared memory IPC --- common/event_monitor.cpp | 255 ++++++++++++++++++++++ common/event_monitor.h | 70 ++++++ common/ipc_util.cpp | 417 ++++++++++++++++++++++++++++++++++++ common/ipc_util.h | 134 ++++++++++++ common/ipc_wrapper.cpp | 373 ++++++++++++++++++++++++++++++++ common/ipc_wrapper.h | 73 +++++++ common/log_util.cpp | 112 ++++++++++ common/log_util.h | 67 ++++++ common/referer.cpp | 355 ++++++++++++++++++++++++++++++ common/referer.h | 120 +++++++++++ scanner/CMakeLists.txt | 22 ++ scanner/main/scanner.cpp | 201 +++++++++++++++++ sln/scanner.vcxproj | 166 ++++++++++++++ sln/scanner.vcxproj.filters | 63 ++++++ 14 files changed, 2428 insertions(+) create mode 100644 common/event_monitor.cpp create mode 100644 common/event_monitor.h create mode 100644 common/ipc_util.cpp create mode 100644 common/ipc_util.h create mode 100644 common/ipc_wrapper.cpp create mode 100644 common/ipc_wrapper.h create mode 100644 common/log_util.cpp create mode 100644 common/log_util.h create mode 100644 common/referer.cpp create mode 100644 common/referer.h create mode 100644 scanner/CMakeLists.txt create mode 100644 scanner/main/scanner.cpp create mode 100644 sln/scanner.vcxproj create mode 100644 sln/scanner.vcxproj.filters diff --git a/common/event_monitor.cpp b/common/event_monitor.cpp new file mode 100644 index 0000000..f75fddf --- /dev/null +++ b/common/event_monitor.cpp @@ -0,0 +1,255 @@ +#include "event_monitor.h" + + +#include /* nonblocking */ +#include /*setrlimit */ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "log_util.h" + +///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// event_handler +event_handler::event_handler() +{} +event_handler::~event_handler() +{} + + + + + + +///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// parent_holder +parent_holder::parent_holder() +{} +parent_holder::~parent_holder() +{} + + + + + + + + +///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// linux - epoll wrapper ... +class epoll_wrapper : public event_monitor +{ + int32_t epoll_fd_; + int32_t quit_fd_[2]; + volatile bool run_; + std::vector threads_; + + static int32_t epoll_max; + + void clear_threads(void) + { + if(threads_.size()) + { + for (size_t i = 0; i < threads_.size(); ++i) + { + if (threads_[i]->joinable()) + threads_[i]->join(); + threads_[i].reset(); + } + threads_.clear(); + } + } + void close_monitor_fd(void) + { + if (epoll_fd_ != -1) + close(epoll_fd_); + epoll_fd_ = -1; + } + void clear(void) + { + clear_threads(); + close_monitor_fd(); + } + void monitor_thread(void) + { + log_cls::log(LOG_LEVEL_DEBUG, "monitor thread(%p) of object(%p) is working ...\n", gettid(), this); + while (run_) + { + struct epoll_event evs; + + memset(&evs, 0, sizeof(evs)); + if (epoll_wait(epoll_fd_, &evs, 1, -1) == -1) + continue; + if (evs.events == EPOLLOUT && evs.data.fd == quit_fd_[1]) + break; + + if (evs.events == EPOLLIN) + ((event_handler*)evs.data.ptr)->on_event(event_handler::EVENT_READ, nullptr, 0); + else if (evs.events == EPOLLOUT) + ((event_handler*)evs.data.ptr)->on_event(event_handler::EVENT_WRITE, nullptr, 0); + else + ; + } + log_cls::log(LOG_LEVEL_DEBUG, "monitor thread(%p) of object(%p) finished working.\n", gettid(), this); + } + +protected: + virtual ~epoll_wrapper() + { + stop(); + } + +public: + epoll_wrapper(const char* desc) : event_monitor(desc), epoll_fd_(-1), run_(true) + { + memset(quit_fd_, -1, sizeof(quit_fd_)); + } + +public: + virtual int32_t start(int32_t threads = 1) override + { + int32_t ret = stop(); + struct rlimit rt; + + rt.rlim_max = rt.rlim_cur = epoll_wrapper::epoll_max; + if (ret == 0 && setrlimit(RLIMIT_NOFILE, &rt) == 0) + { + epoll_fd_ = epoll_create(epoll_wrapper::epoll_max); + if (epoll_fd_ == -1) + { + ret = errno; + log_cls::log(LOG_LEVEL_FATAL, "epoll_create for '%s' failed: %s\n", desc_.c_str(), strerror(ret)); + } + else + { + run_ = true; + for (size_t i = 0; i < (size_t)threads; ++i) + { + THREAD_PTR t;//(new std::thread(&epoll_wrapper::monitor_thread, this)); + t.reset(new std::thread(&epoll_wrapper::monitor_thread, this)); + threads_.push_back(t); + } + } + } + else + { + log_cls::log(LOG_LEVEL_FATAL, "setrlimit for '%s-epoll' failed: %s\n", desc_.c_str(), strerror(ret)); + } + + return ret; + } + virtual int32_t stop(void) override + { + if(threads_.size()) + { + struct epoll_event ev; + #ifdef __USE_GNU + pipe2(quit_fd_, O_NONBLOCK); + #else + pipe(quit_fd_); + #endif + + log_cls::log(LOG_LEVEL_DEBUG, "quit fd[0] = %p, fd[1] = %p\n", quit_fd_[0], quit_fd_[1]); + + run_ = false; + ev.data.fd = quit_fd_[1]; + ev.events = EPOLLOUT | EPOLLET | EPOLLONESHOT; + for(size_t i = 0; i < threads_.size(); ++i) + epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, ev.data.fd, &ev); + clear_threads(); + close(quit_fd_[0]); + close(quit_fd_[1]); + memset(quit_fd_, -1, sizeof(quit_fd_)); + } + + close_monitor_fd(); + + return 0; + } + + virtual int32_t add_fd(event_handler* handler) override + { + struct epoll_event ev; + int32_t ret = -1; + + if (!handler || handler->get_fd() == -1) + return EINVAL; + + if (epoll_fd_ == -1) + return EFAULT; + + handler->add_ref(); // add ref for epoll_event holder ... + ev.data.ptr = handler; + ev.events = EPOLLIN | EPOLLOUT | EPOLLET; // EPOLLONESHOT | EPOLLHUP + ret = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, handler->get_fd(), &ev); + if (ret == -1) + { + ret = errno; + log_cls::log(LOG_LEVEL_FATAL, "add fd(%d) to %s-epoll failed: %s\n", handler->get_fd(), desc_.c_str(), strerror(ret)); + handler->release(); + } + + return ret; + } + virtual int32_t remove_fd(event_handler* handler) override + { + struct epoll_event ev; + int32_t ret = -1; + + if (!handler || handler->get_fd() == -1) + return EINVAL; + + if (epoll_fd_ == -1) + return EFAULT; + + ev.data.ptr = handler; + ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLHUP; // EPOLLONESHOT + ret = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, handler->get_fd(), &ev); + + if (ret == 0) + { + // ENOENT returned if object 'handler' has not registered, so we can free it here when success in EPOLL_CTL_DEL ... + handler->release(); + } + else + { + ret = errno; + log_cls::log(LOG_LEVEL_FATAL, "remove fd(%d) from %s-epoll failed: %s\n", handler->get_fd(), desc_.c_str(), strerror(ret)); + } + + return ret; + } +}; +int32_t epoll_wrapper::epoll_max = 10; + + + + + + + + +///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// event_handler +event_monitor::event_monitor(const char* desc) : desc_(desc ? desc : "") +{ + log_cls::log(LOG_LEVEL_DEBUG, "+event_monitor(%p) of '%s' contructing ...\n", this, desc_.c_str()); +} +event_monitor::~event_monitor() +{ + log_cls::log(LOG_LEVEL_DEBUG, "-event_monitor(%p) of '%s' destroyed\n", this, desc_.c_str()); +} + +event_monitor* event_monitor::create(const char* desc, int32_t type) +{ + if (type == EV_TYPE_EPOLL) + return dynamic_cast(new epoll_wrapper(desc)); + else + return nullptr; +} diff --git a/common/event_monitor.h b/common/event_monitor.h new file mode 100644 index 0000000..94ead54 --- /dev/null +++ b/common/event_monitor.h @@ -0,0 +1,70 @@ +#pragma once + +// event monitor +// +// created on 2022-11-29 +// + +#include "referer.h" +#include +#include + +typedef std::shared_ptr THREAD_PTR; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// object event_handler +// +// derived from 'event_handler' if your class will handle some events of drived by events +// +class event_handler : public refer +{ +protected: + event_handler(); + virtual ~event_handler(); + +public: + enum + { + EVENT_NONE = 0, + EVENT_READ, // indicates that the 'fd' is readable, you should perform 'read' if data was null + EVENT_WRITE, // indicates that the 'fd' is writeable, data in 'data' has sent 'data_len' bytes, in async-write only + }; + virtual int32_t on_event(int32_t ev, void* data, size_t data_len) = 0; + virtual int32_t get_fd(void) = 0; +}; + +class parent_holder : public refer +{ +protected: + parent_holder(); + virtual ~parent_holder(); + +public: + virtual int32_t stop(void) = 0; // stop work and release parent ptr +}; + +// event_monitor to manage an event-driven model, this will trigger EVENT_READ/EVENT_WRITE events to 'handler' +// +class event_monitor : public refer +{ +protected: + std::string desc_; + +protected: + event_monitor(const char* desc); + virtual ~event_monitor(); + +public: + virtual int32_t start(int32_t threads = 1) = 0; + virtual int32_t stop(void) = 0; + + virtual int32_t add_fd(event_handler* handler) = 0; + virtual int32_t remove_fd(event_handler* handler) = 0; + + enum + { + EV_TYPE_POLL = 1, + EV_TYPE_EPOLL, + }; + static event_monitor* create(const char* desc, int32_t type = EV_TYPE_EPOLL); +}; diff --git a/common/ipc_util.cpp b/common/ipc_util.cpp new file mode 100644 index 0000000..e52e9f2 --- /dev/null +++ b/common/ipc_util.cpp @@ -0,0 +1,417 @@ +#include "ipc_util.h" + +#include +#include +#include +#include +#include +#include + +#include "log_util.h" + +///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// linux_event +unsigned long linux_event::to_abs_time_us = 0; + +linux_event::linux_event(const char* desc) : waiting_(false), sem_(nullptr), desc_(desc ? desc : ""), first_(true), multi_proc_(false) +{ + log_cls::log(LOG_LEVEL_ALL, "+linux_event(%p) unamed for '%s' constructing ...\n", this, desc_.c_str()); + + err_ = sem_init(&local_sem_, 0, 0); + + if (err_ == -1) + { + err_ = errno; + log_cls::log(LOG_LEVEL_FATAL, " %p: sem_init = %s\n", this, strerror(err_)); + } + else + { + sem_ = &local_sem_; + } +} +linux_event::linux_event(const char* name, const char* desc) : waiting_(false), sem_(nullptr), desc_(desc ? desc : ""), first_(true), multi_proc_(true) +{ + log_cls::log(LOG_LEVEL_ALL, "+linux_event(%p) of named '%s' for '%s' constructing ...\n", this, name, desc_.c_str()); + sem_ = sem_open(name, O_CREAT | O_EXCL, 0777, 0); + if (sem_ == (sem_t*)SEM_FAILED) + { + sem_ = nullptr; + err_ = errno; + log_cls::log(LOG_LEVEL_FATAL, " %p: sem_open(O_CREAT | O_EXCL) = %s\n", this, strerror(err_)); + if(err_ = EEXIST) + { + sem_ = sem_open(name, 0666); + if (sem_ == (sem_t*)SEM_FAILED) + { + err_ = errno; + sem_ = nullptr; + log_cls::log(LOG_LEVEL_FATAL, " %p: sem_open = %s\n", this, strerror(err_)); + } + else + { + err_ = 0; + first_ = false; + } + } + } + else + { + name_ = name; + log_cls::log(LOG_LEVEL_DEBUG, " %p: created named sem OK.\n", this); + err_ = sem_init(sem_, 1, 0); // this is used to initialize the event count, whether named or unamed + if (err_ == -1) + { + err_ = errno; + log_cls::log(LOG_LEVEL_FATAL, " %p: sem_init = %s\n", this, strerror(err_)); + sem_close(sem_); + sem_ = nullptr; + sem_unlink(name); + } + } +} +linux_event::linux_event(sem_t* mem_sem, bool first, const char* desc) : waiting_(false), sem_(mem_sem), desc_(desc ? desc : ""), first_(first), multi_proc_(true) +{ + log_cls::log(LOG_LEVEL_ALL, "+linux_event(%p) at mem(%p) for '%s' constructing ...\n", this, mem_sem, desc_.c_str()); + + if(first) + { + err_ = sem_init(sem_, 1, 0); + + if (err_ == -1) + { + err_ = errno; + log_cls::log(LOG_LEVEL_FATAL, " %p: sem_init = %s\n", this, strerror(err_)); + } + } +} +linux_event::~linux_event() +{ + if (sem_) + { + char ptr[40] = {0}; + std::string tips(""); + + sprintf(ptr, " ~%p: ", this); + tips = ptr; + if(sem_ == &local_sem_ || (first_ && name_.empty())) + { + err_ = log_cls::log_when_err(sem_destroy(sem_), (tips + "sem_destroy").c_str()); + } + else + { + err_ = log_cls::log_when_err(sem_close(sem_), (tips + "sem_close").c_str()); + + // else // why not else ? we should ensure delete the kernel object when unused. + if(!name_.empty()) // i am the named object owner ! + { + err_ = log_cls::log_when_err(sem_unlink(name_.c_str()), (tips + "sem_unlink").c_str(), LOG_LEVEL_FATAL); // This will cause previously opened objects to never receive events, even if you reopen it. + } + } + } + log_cls::log(LOG_LEVEL_ALL, "-linux_event(%p) destroyed.\n", this); +} + +int32_t linux_event::clear_named_event(const char* name) +{ + sem_t* sem = sem_open(name, O_CREAT | O_EXCL, 0777, 0); + int32_t err = 0; + + if(sem == (sem_t*)SEM_FAILED) + { + if(errno == EEXIST) + { + err = sem_unlink(name); + if(err == -1) + err = errno; + } + else + { + return 0; + } + } + else + { + sem_close(sem); + sem_unlink(name); + } + + return err; +} + void linux_event::reset_calc_abs_time(unsigned us) + { + if(us == -1) + { + struct timeval now = {0}, after = {0}; + struct timespec abst = {0}; + + linux_event::to_abs_time_us = 10; + if(chronograph::now(&now)) + { + abst.tv_sec = now.tv_sec; + abst.tv_nsec = USEC_2_NS(now.tv_usec); + abst.tv_nsec += MSEC_2_NS(1) + linux_event::to_abs_time_us; + + // overflow ... + abst.tv_sec += abst.tv_nsec / SEC_2_NS(1); + abst.tv_nsec %= SEC_2_NS(1); + if(chronograph::now(&after)) + { + if(after.tv_usec > now.tv_usec) + linux_event::to_abs_time_us = after.tv_usec - now.tv_usec; + else + linux_event::to_abs_time_us = SEC_2_US(1) + after.tv_usec - now.tv_usec; + } + } + } + else + { + linux_event::to_abs_time_us = us; + } + } +bool linux_event::abs_time_after(struct timespec* abstm, unsigned ms) +{ + struct timeval now = {0}; + + if(!chronograph::now(&now)) + { + log_cls::log(LOG_LEVEL_FATAL, "gettimeofday faied: %s\n!", strerror(errno)); + time(&now.tv_sec); + } + abstm->tv_sec = now.tv_sec; + abstm->tv_nsec = USEC_2_NS(now.tv_usec); + abstm->tv_nsec += MSEC_2_NS(ms) + USEC_2_NS(linux_event::to_abs_time_us); + + // overflow ... + abstm->tv_sec += abstm->tv_nsec / SEC_2_NS(1); + abstm->tv_nsec %= SEC_2_NS(1); + + return true; +} + +bool linux_event::is_ready(void) +{ + return sem_ != nullptr; +} +bool linux_event::is_named_first(void) +{ + return !name_.empty(); +} +bool linux_event::wait_try(void) +{ + return sem_trywait(sem_) == 0; +} +bool linux_event::wait(unsigned timeout) +{ + bool waited = true; + + log_cls::log(LOG_LEVEL_ALL, "linux_event(%p): waiting(%u) ...\n", this, timeout); + waiting_ = true; + if (timeout == WAIT_INFINITE) + { + sem_wait(sem_); + } + else + { + struct timespec to = {0}; + + linux_event::abs_time_after(&to, timeout); + waited = sem_timedwait(sem_, &to) == 0; + } + waiting_ = false; + log_cls::log(LOG_LEVEL_ALL, "linux_event(%p): waited(%u) = %d\n", this, timeout, waited); + + return waited; +} +void linux_event::trigger(void) +{ + err_ = sem_post(sem_); + + if(err_) + err_ = errno; + + log_cls::log(LOG_LEVEL_ALL, "linux_event(%p): trigger = %s\n", this, err_ == 0 ? "OK" : strerror(errno)); +} +void linux_event::reset(void) +{ + err_ = sem_init(sem_, multi_proc_, 0); + + if(err_) + err_ = errno; + + log_cls::log(LOG_LEVEL_ALL, "linux_event(%p): reset = %s\n", this, err_ == 0 ? "OK" : strerror(errno)); +} +bool linux_event::is_waiting(void) +{ + return waiting_; +} + + + + + +///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// shared_mem +uint64_t shared_mem::mem_total_ = 0; +uint64_t shared_mem::page_unit_ = 0; +uint64_t shared_mem::huge_page_unit_ = 0; + + +shared_mem::shared_mem() : name_(""), id_(0), size_(0), shm_buf_(shared_mem::invalid_map_addr()), first_(false), shm_id_(-1) +{ + if (shared_mem::page_unit_ == 0) + shared_mem::init_page_info(); + + log_cls::log(LOG_LEVEL_ALL, "+shared_mem(%p) constructed, page size = %ld\n", this, page_unit_); +} +shared_mem::~shared_mem() +{ + close(); + log_cls::log(LOG_LEVEL_ALL, "-shared_mem(%p) destroyed.\n", this); +} + +void shared_mem::init_page_info(void) +{ + shared_mem::page_unit_ = sys_util::get_page_size(); + + // Hugepagesize: 2048 kB + if(sys_util::get_inf_file_data("/proc/meminfo", 80, "MemTotal: %ld", &shared_mem::mem_total_)) + shared_mem::mem_total_ *= 1024; + if(sys_util::get_inf_file_data("/proc/meminfo", 80, "Hugepagesize: %ld", &shared_mem::huge_page_unit_)) + shared_mem::huge_page_unit_ *= 1024; + + log_cls::log(LOG_LEVEL_DEBUG, "TotalMemory: %s, system page size: %s, huge page size %s\n" + , sys_util::format_readable_bytes(shared_mem::mem_total_).c_str() + , sys_util::format_readable_bytes(shared_mem::page_unit_).c_str() + , sys_util::format_readable_bytes(shared_mem::huge_page_unit_).c_str()); +} +char* shared_mem::invalid_map_addr(void) +{ + return (char*)-1; +} + +int32_t shared_mem::open(int32_t id, size_t* bytes, const char* name) +{ + key_t key = (key_t)-1; // ftok(name, id); + int32_t ret = close(); + size_t size = bytes ? *bytes : 0; + std::string pe(""); + + if (ret) + return ret; + + if(!name || *name == 0) + { + pe = sys_util::get_module_path(); + name = pe.c_str(); + } + key = ftok(name, id); + if (key == (key_t)-1) + { + log_cls::log(LOG_LEVEL_FATAL, "shared_memory(%p): ftok('%s', %d) = %s\n", this, name, id, strerror(errno)); + + return errno; + } + + size = ALIGN_INT(size, shared_mem::page_unit_); + if (!bytes || *bytes != size) + { + log_cls::log(LOG_LEVEL_DEBUG, "add %ld upto multiple of page size: %ld\n", bytes ? *bytes : 0, size); + if (bytes) + *bytes = size; + } + + shm_id_ = shmget(key, size, IPC_EXCL | IPC_CREAT | 0600); + if (shm_id_ == -1) + { + ret = errno; + log_cls::log(LOG_LEVEL_WARNING, "%p: create shared memory('%s', %d) failed: %s\n", this, name, id, strerror(ret)); + if (ret == EEXIST) + { + shm_id_ = shmget(key, size, 0600); + if (shm_id_ == -1) + { + ret = errno; + log_cls::log(LOG_LEVEL_WARNING, "%p: open shared memory('%s', %d) failed: %s\n", this, name, id, strerror(ret)); + } + else + { + ret = 0; + first_ = false; + } + } + } + else // i created the shared memory ... + { + first_ = true; + } + + if (ret == 0) + { + shm_buf_ = (char*)shmat(shm_id_, nullptr, 0); + if (shm_buf_ == shared_mem::invalid_map_addr()) + { + ret = errno; + log_cls::log(LOG_LEVEL_WARNING, "%p: shmat failed: %s\n", this, strerror(ret)); + close(); + } + else + { + log_cls::log(LOG_LEVEL_DEBUG, "%p: %s shared memory('%s', %d) at %p(+%s) OK.\n", this, first_ ? "create" : "open", name, id, shm_buf_, sys_util::format_readable_bytes(size).c_str()); + name_ = name; + id_ = id; + size_ = size; + } + } + + return ret; +} +int32_t shared_mem::close(void) +{ + int32_t ret = 0; + + if (shm_buf_ != shared_mem::invalid_map_addr()) + { + ret = log_cls::log_when_err(shmdt(shm_buf_), "shmdt"); + if (ret) + return ret; + + shm_buf_ = shared_mem::invalid_map_addr(); + } + if (first_ && shm_id_ >= 0) + { + ret = log_cls::log_when_err(shmctl(shm_id_, IPC_RMID, nullptr), "shmctrl(IPC_RMID)"); + if (ret) + { + // re-map buffer ... + shm_buf_ = (char*)shmat(shm_id_, nullptr, 0); + + return ret; + } + + shm_id_ = -1; + } + + name_ = ""; + id_ = 0; + first_ = false; + size_ = 0; + + return ret; +} + +char* shared_mem::get_mem(size_t* size) +{ + if (size) + *size = size_; + + return shm_buf_ == shared_mem::invalid_map_addr() ? nullptr : shm_buf_; +} +bool shared_mem::is_first(void) +{ + return first_; +} + +void shared_mem::clear_kernel_object(void) +{ + log_cls::log_when_err(shmctl(shm_id_, IPC_RMID, nullptr), "shmctrl(IPC_RMID)"); +} diff --git a/common/ipc_util.h b/common/ipc_util.h new file mode 100644 index 0000000..a9691e1 --- /dev/null +++ b/common/ipc_util.h @@ -0,0 +1,134 @@ +#pragma once + +// IPC utility +// +// created on 2022-11-29 +// + + +#include "referer.h" +#include +#include + + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +// object event +// +class linux_event : public refer +{ + int32_t err_; + sem_t local_sem_; + sem_t *sem_; + bool first_; + bool multi_proc_; + volatile bool waiting_; + std::string desc_; + std::string name_; + + static unsigned long to_abs_time_us; // used in sem_timedwait to calculate absolute time from elapse + +public: + linux_event(const char* desc); // to initialize a in-process object + linux_event(const char* name, const char* desc); // to initialize a multi-process object + linux_event(sem_t* mem_sem, bool first/*invoke sem_init when true*/, const char* desc); // to initialize an event at given memory (shared memory) + + static int32_t clear_named_event(const char* name); + static void reset_calc_abs_time(unsigned us = -1); // reset 'to_abs_time_us' value, -1 is for re-calc + static bool abs_time_after(struct timespec* abstm, unsigned ms); + +protected: + ~linux_event(); + +public: + bool is_ready(void); + bool is_named_first(void); // whether I created the named event + bool wait_try(void); + bool wait(unsigned timeout = WAIT_INFINITE/*ms*/); // WAIT_INFINITE is waiting unfinite, true when watied and false for wait timeout + void trigger(void); + void reset(void); // re-initialize. DANGEROUS !!! all wait operation before will not receive any event after this !!! + bool is_waiting(void); +}; + + +template +class safe_fifo +{ + MUTEX lock_; + std::deque que_; + +public: + safe_fifo() + {} + ~safe_fifo() + {} + +public: + void save(const T& t) + { + LOCKER lock(lock_); + + que_.push_back(t); + } + bool take(T& t) + { + LOCKER lock(lock_); + + if (que_.size()) + { + t = que_.front(); + que_.pop_front(); + + return true; + } + else + { + return false; + } + } + size_t size(void) + { + LOCKER lock(lock_); + + return que_.size(); + } + void clear(void) + { + LOCKER lock(lock_); + + que_.clear(); + } +}; + +class shared_mem : public refer +{ + std::string name_; + int32_t id_; + int32_t shm_id_; + char *shm_buf_; + size_t size_; + bool first_; + +public: + shared_mem(); + + static void init_page_info(void); + static char* invalid_map_addr(void); + + static uint64_t mem_total_; + static uint64_t page_unit_; + static uint64_t huge_page_unit_; + +protected: + ~shared_mem(); + +public: + int32_t open(int32_t id, size_t* bytes, const char* name = nullptr); + int32_t close(void); + + char* get_mem(size_t* size = nullptr); + bool is_first(void); + + void clear_kernel_object(void); +}; + diff --git a/common/ipc_wrapper.cpp b/common/ipc_wrapper.cpp new file mode 100644 index 0000000..5180c48 --- /dev/null +++ b/common/ipc_wrapper.cpp @@ -0,0 +1,373 @@ +#include "ipc_wrapper.h" + +#include +#include + +#include "log_util.h" +#include "ipc_util.h" + +///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// ipc_wrapper_shm +class ipc_wrapper_shm : public ipc_wrapper +{ + enum data_ind + { + DATA_INTER_CMD = 0, + }; + enum internal_cmd + { + INTER_CMD_NONE = 0, + INTER_CMD_EXIT, + }; + typedef struct _shm_pack + { + size_t space; // buffer length + size_t bytes; // data length + char data[4]; + }SHMPACK, *LPSHMPACK; + typedef struct _sync_shm // 1-lock(write_lock) --> 2-write content --> 3-notify read --> 4-wait(wait_sent) --> 5-return step 2 if data has not sent finished, or else to 6 --> 6-release(write_lock) --> 7-over + { + sem_t wait_arrive; // read event + sem_t wait_sent; // peer has read, i can re-write now + sem_t notify_read; // notify peer to read + sem_t notify_write; // notify peer to write + char data[8]; // internal data + }SYNCSHM, *LPSYNCSHM; + volatile bool run_; + int32_t id_; + shared_mem* shm_; + LPSYNCSHM sync_shm_; + linux_event* wait_arrive_; + linux_event* wait_sent_; + linux_event* notify_write_; + linux_event* notify_read_; + THREAD_PTR thread_; + LPSHMPACK buf_in_; + LPSHMPACK buf_out_; + bool buf_out_used_; + volatile bool cancel_write_; + + void create(const char* file, int32_t id, size_t bytes, unsigned sent_percent) + { + int32_t err = 0; + + shm_ = new shared_mem(); + err = shm_->open(id, &bytes, file); + if (err) + { + log_cls::log(LOG_LEVEL_FATAL, "ipc_wrapper_shm(%p) open shared memory(%s, %d, %s) failed: %s\n" + , this, file, id, sys_util::format_readable_bytes(bytes).c_str(), strerror(err)); + shm_->release(); + shm_ = nullptr; + + return; + } + + SHMPACK pack = { 0 }; + std::string desc("ipc_wrapper_shm-"); + char buf[80] = { 0 }, *ptr = shm_->get_mem(&pack.space); + int32_t pack_head = ALIGN_INT(sizeof(pack), 16), head = ALIGN_INT(sizeof(SYNCSHM), 16); + + sprintf(buf, "%d-", id); + desc += buf; + + printf("Total size: %ld, Head size: %d, pack-head: %d\n", pack.space, head, pack_head); + sync_shm_ = (LPSYNCSHM)ptr; + ptr += head; + pack.space -= head + pack_head * 2; + head = ALIGN_INT(pack.space * sent_percent / 100, 16); + printf("%ld * %d%% = %d\n", pack.space, sent_percent, head); + if (shm_->is_first()) + { + SYNCSHM ss; + + memcpy(sync_shm_, &ss, sizeof(ss)); + buf_in_ = (LPSHMPACK)ptr; + buf_in_->space = pack.space - head; + ptr += buf_in_->space + pack_head; + buf_out_ = (LPSHMPACK)ptr; + buf_out_->space = head; + //* + wait_arrive_ = new linux_event(&sync_shm_->wait_arrive, true, (desc + "wait_arrive").c_str()); + wait_sent_ = new linux_event(&sync_shm_->wait_sent, true, (desc + "wait_sent").c_str()); + notify_read_ = new linux_event(&sync_shm_->notify_read, true, (desc + "notify_read").c_str()); + notify_write_ = new linux_event(&sync_shm_->notify_write, true, (desc + "notify_write").c_str()); + /*/ + wait_arrive_ = new linux_event((desc + "wait_arrive").c_str(), ""); + wait_sent_ = new linux_event((desc + "wait_sent").c_str(), ""); + notify_write_ = new linux_event((desc + "notify_write").c_str(), ""); + notify_read_ = new linux_event((desc + "notify_read").c_str(), ""); + ///*//////////// + memset(sync_shm_->data, 0, sizeof(sync_shm_->data)); + } + else + { + //* + wait_sent_ = new linux_event(&sync_shm_->notify_write, false, (desc + "wait_sent").c_str()); + wait_arrive_ = new linux_event(&sync_shm_->notify_read, false, (desc + "wait_arrive").c_str()); + notify_write_ = new linux_event(&sync_shm_->wait_sent, false, (desc + "notify_write").c_str()); + notify_read_ = new linux_event(&sync_shm_->wait_arrive, false, (desc + "notify_read").c_str()); + /*/ + wait_arrive_ = new linux_event((desc + "notify_read").c_str(), ""); + wait_sent_ = new linux_event((desc + "notify_write").c_str(), ""); + notify_read_ = new linux_event((desc + "wait_arrive").c_str(), ""); + notify_write_ = new linux_event((desc + "wait_sent").c_str(), ""); + ///*//////////// + buf_out_ = (LPSHMPACK)ptr; + buf_in_ = (LPSHMPACK)(ptr + pack_head + buf_out_->space); + } + printf("buf in = %p + %ld, buf out = %p + %ld\n", buf_in_, buf_in_->space, buf_out_, buf_out_->space); + + thread_.reset(new std::thread(&ipc_wrapper_shm::read_thread, this)); + // handler_->on_event(event_handler::EVENT_WRITE, buf_out_->data, buf_size_); + } + void read_thread(void) + { + event_handler* handler = handler_; + + handler->add_ref(); + while (run_) + { + wait_arrive_->wait(); + if (!run_) + break; + if (!shm_->is_first() && sync_shm_->data[DATA_INTER_CMD] == INTER_CMD_EXIT) + break; + + handler->on_event(event_handler::EVENT_READ, buf_in_->data, buf_in_->bytes); + notify_write_->trigger(); + } + handler->release(); + } + +public: + ipc_wrapper_shm(const char* file, int32_t id + , size_t bytes + , event_handler* handler + , unsigned sent_percent = 50) + : ipc_wrapper(handler), run_(true), shm_(nullptr), sync_shm_(nullptr) + , wait_arrive_(nullptr), wait_sent_(nullptr), notify_write_(nullptr), notify_read_(nullptr) + , buf_in_(nullptr), buf_out_(nullptr), id_(id), buf_out_used_(false), cancel_write_(false) + { + create(file, id, bytes, sent_percent); + } + +protected: + ~ipc_wrapper_shm() + {} + +public: + virtual int32_t write(const char* pack, size_t * bytes, bool kbuf, unsigned timeout = WAIT_INFINITE) override + { + if (!shm_) + return ENOTCONN; + + // invoke in read-thread is not allowed + if(std::this_thread::get_id() == thread_->get_id()) + return EDEADLOCK; + + size_t rest = *bytes; + int32_t ret = 0; + + cancel_write_ = false; + if (kbuf) + { + buf_out_->bytes = rest; + notify_read_->trigger(); + if (wait_sent_->wait(timeout)) + rest = 0; + else + ret = ETIME; + } + else + { + chronograph timer; + while (rest) + { + buf_out_->bytes = rest > buf_out_->space ? buf_out_->space : rest; + memcpy(buf_out_->data, pack, buf_out_->bytes); + notify_read_->trigger(); + + pack += buf_out_->space; + if (!wait_sent_->wait(timeout)) + break; + if (cancel_write_) + break; + + if (rest <= buf_out_->space) + { + rest = 0; + break; + } + rest -= buf_out_->space; + + if (timeout) + { + uint64_t t = timer.elapse_ms(); + if (t >= timeout) + break; + + timeout -= t; + timer.reset(); + } + } + if (cancel_write_) + ret = ECANCELED; + else if (rest) + ret = ETIME; + } + *bytes -= rest; + + return ret; + } + virtual bool cancel_write(void) override + { + cancel_write_ = true; + if (wait_sent_->is_waiting()) + { + wait_sent_->trigger(); + wait_sent_->reset(); // sure ? + } + + return true; + } + virtual bool is_ok(void) override + { + return shm_ != nullptr; + } + virtual bool is_first(void) override + { + return shm_ && shm_->is_first(); + } + + virtual void* get_kbuf(size_t* bytes) override + { + if (!bytes) + return nullptr; + + if (buf_out_used_) + { + *bytes = 0; + + return nullptr; + } + + buf_out_used_ = true; + *bytes = buf_out_->space; + + return buf_out_->data; + } + virtual void release_kbuf(void* buf) override + { + buf_out_used_ = false; + } + virtual void clear_kernel_objects(void) override + { + std::string desc("ipc_wrapper_shm-"); + char id[20] = { 0 }; + + sprintf(id, "%d-", id_); + desc += id; + linux_event::clear_named_event((desc + "wait_arrive").c_str()); + linux_event::clear_named_event((desc + "wait_sent").c_str()); + linux_event::clear_named_event((desc + "notify_write").c_str()); + linux_event::clear_named_event((desc + "notify_read").c_str()); + + shm_->clear_kernel_object(); + } + virtual int32_t stop(void) override + { + run_ = false; + if (shm_) + { + wait_arrive_->trigger(); + if (thread_.get() && thread_->joinable()) + thread_->join(); + thread_.reset(); + + if (is_first()) + { + sync_shm_->data[DATA_INTER_CMD] = INTER_CMD_EXIT; + notify_read_->trigger(); + wait_sent_->wait(100); + } + notify_read_->release(); + notify_read_ = nullptr; + notify_write_->release(); + notify_write_ = nullptr; + wait_arrive_->release(); + wait_arrive_ = nullptr; + wait_sent_->release(); + wait_sent_ = nullptr; + sync_shm_ = nullptr; + buf_in_ = buf_out_ = nullptr; + + shm_->close(); + shm_->release(); + } + shm_ = nullptr; + + return ipc_wrapper::stop(); + } + +}; + +///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// ipc_wrapper +ipc_wrapper::ipc_wrapper(event_handler* handler) : handler_(handler) +{ + if (handler_) + handler_->add_ref(); +} +ipc_wrapper::~ipc_wrapper() +{} + +void* ipc_wrapper::get_kbuf(size_t* bytes) +{ + if (bytes) + *bytes = 0; + + return nullptr; +} +void ipc_wrapper::release_kbuf(void* buf) +{} +void ipc_wrapper::clear_kernel_objects(void) +{} +bool ipc_wrapper::cancel_write(void) +{ + return false; +} +int32_t ipc_wrapper::stop(void) +{ + if (handler_) + handler_->release(); + handler_ = nullptr; + + return 0; +} + +ipc_wrapper* ipc_wrapper::create_ipc(event_handler* handler, ipc_type type, const char* param) +{ + if (type == IPC_SHARED_MEM) + { + // path-file:id:size[:write-ratio-> percent of size of sent buffer, default is 50, only valid in owner] + std::string file(param); + size_t pos = file.rfind(':'); + std::vector params; + + while(pos != std::string::npos && params.size() < 3) + { + params.push_back(std::stold(file.substr(pos + 1))); + file.erase(pos); + pos = file.rfind(':'); + } + if(params.size() >= 2) + { + params.insert(params.begin(), 50); + pos = params.size(); + + return dynamic_cast(new ipc_wrapper_shm(file.c_str(), params[pos - 1], params[pos - 2], handler, params[pos - 3])); + } + } + return nullptr; +} diff --git a/common/ipc_wrapper.h b/common/ipc_wrapper.h new file mode 100644 index 0000000..adea885 --- /dev/null +++ b/common/ipc_wrapper.h @@ -0,0 +1,73 @@ +#pragma once + +// IPC utility +// +// created on 2022-12-02 +// + + +#include "event_monitor.h" + +typedef struct _pack_base +{ + uint64_t total_bytes; // total bytes of payload + uint64_t offset; // offset in total of this part + uint64_t bytes; // bytes in data, not include this head + int32_t cmd; // command + char data[0]; // payload +}PACK_BASE, *LPPACK_BASE; + + +class ipc_wrapper : public parent_holder +{ +protected: + event_handler* handler_; + +public: + ipc_wrapper(event_handler* handler); + + enum ipc_type + { + IPC_FILE = 0, // param: path file + IPC_PIPE, // param: pipe name + IPC_NET, // param: dot-ip:port + IPC_SHARED_MEM, // param: path-file:id:size[:write-ratio-> percent of size of sent buffer, default is 50, only valid in owner] + IPC_USB, // param: vid:pid + IPC_COM, // param: COM1 + }; + static ipc_wrapper* create_ipc(event_handler* handler, ipc_type type, const char* param); + +protected: + virtual ~ipc_wrapper(); + +public: + // Function: write content to peer + // + // Parameters: pack - content pack + // + // bytes - [in] bytes of data in 'pack', [out] - bytes of data has sent + // + // kbuf - whether memory 'pack' is from IPC, i.e. return from method get_kbuf() + // + // timeout - time out, in milliseconds + // + // Return: 0 - success + // ENOTCONN - the commuction is not connected, equal to !is_ok() + // EDEADLOCK - calling from current thread is disallowed + // ETIME - time out + // ECANCELED - user cancelled the operation + virtual int32_t write(const char* pack, size_t *bytes, bool kbuf, unsigned timeout = WAIT_INFINITE) = 0; // DON'T call in event_handler::on_event routine, it will be DEAD-LOCK !!! + virtual bool is_ok(void) = 0; // whether the commuction is ready + virtual bool is_first(void) = 0; // whether the communication established by me + + // Function: obtain IPC internal buffer to reduce ONE memory copy for sent data + // + // Parameter: bytes - [in] desired size; [out] - real size + // + // Return: memory pointer if success, or nullptr. call release_kbuf(ptr) if no longer used + virtual void* get_kbuf(size_t* bytes); + virtual void release_kbuf(void* buf); // release the internal buffer returned by get_kbuf + virtual void clear_kernel_objects(void); // clear all kernel objects the IPC used, used to clear exception + virtual bool cancel_write(void); // cancel current write operation + virtual int32_t stop(void) override; // close the connection +}; diff --git a/common/log_util.cpp b/common/log_util.cpp new file mode 100644 index 0000000..c2706e0 --- /dev/null +++ b/common/log_util.cpp @@ -0,0 +1,112 @@ +#include "log_util.h" + +#include +#include +#include +#include +#include + +#include "referer.h" + +///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// class�� +log_cls* log_cls::inst_ = nullptr; + +log_cls::log_cls(const char* path_file, log_level level, int32_t max_size) : file_(path_file), max_size_(max_size), level_(level), dst_(nullptr) +{ + create_log_file(); +} +log_cls::~log_cls() +{ + if (dst_) + fclose(dst_); +} + +void log_cls::create_log_file(void) +{ + dst_ = fopen(file_.c_str(), "a+b"); + if (dst_) + { + fseek(dst_, 0, SEEK_END); + if (ftell(dst_) == 0) + { + unsigned char bom[] = { 0x0ef, 0x0bb, 0x0bf }; + fwrite(bom, sizeof(bom), 1, dst_); + } + else + fwrite("\n\n\n", 1, 3, dst_); + } +} + +void log_cls::log_internal(const char* txt) +{ + std::string now("[" + chronograph::now() + "] "); + + now += txt; + { + LOCKER locker(lock_); + if (dst_) + { + fwrite(now.c_str(), 1, now.length(), dst_); + fflush(dst_); + if (ftell(dst_) >= max_size_) + { + fclose(dst_); + remove(file_.c_str()); + create_log_file(); + } + } + } +} + +void log_cls::initialize(const char* path_file, log_level level, int32_t max_size) +{ + if (log_cls::inst_) + delete log_cls::inst_; + + std::string path(""); + + if (!path_file || *path_file == 0) + { + size_t pos = 0; + char strpid[20] = {0}; + + path = sys_util::get_module_path(); + pos = path.rfind('/'); + if (pos++ != std::string::npos) + path.erase(0, pos); + path.insert(0, "/tmp/scanner/"); + mkdir("/tmp/scanner", S_IREAD | S_IWRITE | S_IEXEC); + sprintf(strpid, "_%p.log", getpid()); + path += strpid; + path_file = path.c_str(); + } + log_cls::inst_ = new log_cls(path_file, level, max_size); +} +int32_t log_cls::log_when_err(int32_t err, const char* oper_desc, log_level level) +{ + if(err == -1) + { + err = errno; + log_cls::log(level, "%s = %s\n", oper_desc, strerror(err)); + } + + return err; +} +log_level log_cls::get_log_level(void) +{ + if (log_cls::inst_) + return log_cls::inst_->level_; + else + return LOG_LEVEL_ALL; +} +std::string log_cls::get_log_file(void) +{ + if (log_cls::inst_) + return log_cls::inst_->file_; + else + return ""; +} + +///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// \ No newline at end of file diff --git a/common/log_util.h b/common/log_util.h new file mode 100644 index 0000000..069c246 --- /dev/null +++ b/common/log_util.h @@ -0,0 +1,67 @@ +#pragma once + +// log utility +// +// created on 2022-11-30 +// + + +#include "referer.h" +#include +#include + +#define SIZE_KB(n) (n) * 1024 +#define SIZE_MB(n) SIZE_KB((n) * 1024) +#define SIZE_GB(n) SIZE_MB((n) * 1024) + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +// object event +// +enum log_level +{ + LOG_LEVEL_ALL = 0, + LOG_LEVEL_DEBUG, + LOG_LEVEL_WARNING, + LOG_LEVEL_FATAL, +}; + +class log_cls +{ + std::string file_; + int32_t max_size_; + log_level level_; + MUTEX lock_; + FILE *dst_; + + static log_cls* inst_; + + void create_log_file(void); + +protected: + log_cls(const char* path_file, log_level level, int32_t max_size); + ~log_cls(); + + void log_internal(const char* txt); + +public: + static void initialize(const char* path_file, log_level level = LOG_LEVEL_ALL, int32_t max_size = SIZE_MB(10)); + + template + static void log(log_level level, const char* fmt, Args ... args) + { + if (level >= log_cls::get_log_level() && log_cls::inst_) + { + size_t size = snprintf(nullptr, 0, fmt, args ...) + 1; + std::unique_ptr buf(new char[size]); + + snprintf(buf.get(), size, fmt, args ...); + + log_cls::inst_->log_internal(buf.get()); + } + } + static int32_t log_when_err(int32_t err, const char* oper_desc, log_level level = LOG_LEVEL_WARNING); // log as: oper_desc = strerror(errno)\n. return real error number errno + + static log_level get_log_level(void); + static std::string get_log_file(void); +}; diff --git a/common/referer.cpp b/common/referer.cpp new file mode 100644 index 0000000..52e4738 --- /dev/null +++ b/common/referer.cpp @@ -0,0 +1,355 @@ +#include "referer.h" + + +#include "log_util.h" + + +///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// refer +refer::refer() : ref_(1) +{ + on_born(); +} +refer::~refer() +{ + on_dead(); +} + +void refer::on_born(void) +{} +void refer::on_dead(void) +{} + +int32_t refer::add_ref(void) +{ + LOCKER lock(mutex_); + + return ++ref_; +} +int32_t refer::release(void) +{ + int32_t ref = 0; + + { + LOCKER lock(mutex_); + ref = --ref_; + } + + if (ref == 0) + delete this; + + return ref; +} + + + + + + + + + + + +///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// chronograph +#include + + +chronograph::chronograph() +{ + reset(); +} +chronograph::~chronograph() +{} + +bool chronograph::now(struct timeval* tv) +{ + struct timezone tz = { 0 }; + + return gettimeofday(tv, &tz) == 0; +} +bool chronograph::now(uint64_t* seconds, uint64_t* u_seconds) +{ + struct timeval tv = { 0 }; + struct timezone tz = { 0 }; + + if (gettimeofday(&tv, &tz) == 0) + { + if (seconds) + *seconds = tv.tv_sec; + if (u_seconds) + *u_seconds = tv.tv_usec; + + return true; + } + else + { + return false; + } +} +std::string chronograph::now(bool with_ms/*whether with milliseconds*/) // return '2022-11-30 10:38:42.123', no '.123' if with_ms was false +{ + struct timeval tv = { 0 }; + + if (!chronograph::now(&tv)) + return ""; + + char buf[40] = { 0 }; + time_t t = tv.tv_sec; + struct tm* l = localtime(&t); + + if (with_ms) + sprintf(buf, "%04d-%02d-%02d %02d:%02d:%02d.%06d", l->tm_year + 1900, l->tm_mon + 1, l->tm_mday + , l->tm_hour, l->tm_min, l->tm_sec, tv.tv_usec); + else + sprintf(buf, "%04d-%02d-%02d %02d:%02d:%02d", l->tm_year + 1900, l->tm_mon + 1, l->tm_mday + , l->tm_hour, l->tm_min, l->tm_sec); + + return buf; +} + +uint64_t chronograph::elapse_s(void) +{ + struct timeval tv = { 0 }; + + chronograph::now(&tv); + + return tv.tv_sec - bgn_.tv_sec; +} +uint64_t chronograph::elapse_ms(void) +{ + struct timeval tv = { 0 }; + uint64_t dif = 0; + + chronograph::now(&tv); + dif = SEC_2_MS(tv.tv_sec - bgn_.tv_sec); + dif += tv.tv_usec / MSEC_2_US(1); + dif -= bgn_.tv_usec / MSEC_2_US(1); + + return dif; +} +uint64_t chronograph::elapse_us(void) +{ + struct timeval tv = { 0 }; + uint64_t dif = 0; + + chronograph::now(&tv); + dif = SEC_2_US(tv.tv_sec - bgn_.tv_sec); + dif += tv.tv_usec; + dif -= bgn_.tv_usec; + + return dif; +} +void chronograph::reset() +{ + chronograph::now(&bgn_); +} + + + + + +///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// sys utility +#include +#include +#include +#include + +namespace sys_util +{ + static bool find_module(const char* path, void* param) + { + std::string* para = (std::string*)param; + + if (para[0].empty()) + { + para[1] = path; + + return false; + } + else + { + const char* name = strrchr(path, '/'); + if (name++ == nullptr) + name = path; + if (strstr(name, para[0].c_str())) + { + para[1] = path; + + return false; + } + + return true; + } + } + + int32_t enum_modules(bool(*on_found)(const char* path_module_name, void* param),// return false to stop enumeratin + void* param, // user defined data, passed into callback on_found + unsigned pid // process id, -1 is self + ) // return errno + { + char path[128] = { 0 }; + + if (pid == -1) + pid = getpid(); + sprintf(path, "/proc/%u/map_files/", pid); + + return enum_files(path, on_found, param, false); + } + + int32_t enum_files(const char* dir, // dir path + bool(*on_found)(const char* path_name, void* param), // return false to stop enumeratin + void* param // user defined data, passed into callback on_found + , bool recursive + ) // return errno + { + int32_t ret = 0; + DIR* pdir = nullptr; + struct dirent* ent = nullptr; + + pdir = opendir(dir); + if (!pdir) + return errno; + + while ((ent = readdir(pdir))) + { + if (ent->d_type & DT_DIR) + { + if (recursive) + { + if (strcmp(ent->d_name, ".") && strcmp(ent->d_name, "..")) + { + std::string sub(dir); + sub += "/"; + sub += ent->d_name; + ret = enum_files(sub.c_str(), on_found, param, recursive); + if (ret == 0x5e17) + break; + } + } + } + else + { + std::string file(dir); + + file += "/"; + file += ent->d_name; + if (!on_found(read_link(file.c_str()).c_str(), param)) + { + ret = 0x5e17; + break; + } + } + } + closedir(pdir); + + return ret == 0x5e17 ? 0 : ret; + } + + std::string get_module_path(const char* module_name, unsigned pid) // get module full path, nullptr is for main-exe + { + std::string param[] = { module_name ? module_name : "", "" }; + + enum_modules(find_module, param, pid); + + return param[1]; + } + std::string read_link(const char* lnk) + { + char path[512] = { 0 }; + + readlink(lnk, path, sizeof(path) - 1); + + return path; + } + size_t get_page_size(void) + { + size_t size = sysconf(_SC_PAGESIZE); + + if (size < 1024 || (size & 0x0fe0000ff)) // nKB && < 16MB + size = getpagesize(); + + return size; + } + bool create_folder(const char* dir) + { + return mkdir(dir, S_IREAD | S_IWRITE | S_IEXEC) == 0 || errno == EEXIST; + } + + int32_t get_memory_info(uint64_t* total, uint64_t* available) + { + if(!total && !available) + return 0; + + char line[128] = {0}; + FILE *src = fopen("/proc/meminfo", "rb"); + int32_t count = total && available ? 2 : 1; + unsigned long val = 0; + + if(!src) + return log_cls::log_when_err(-1, "fopen('/proc/meminfo', 'rb')", LOG_LEVEL_FATAL); + + while(fgets(line, sizeof(line) - 1, src)) + { + if(sscanf(line, "MemTotal: %ld", &val)) + { + if(total) + { + *total = val * 1024; + if(--count == 0) + break; + } + } + else if(sscanf(line, "MemFree: %ld", &val)) + { + if(available) + { + *available = val * 1024; + if(--count == 0) + break; + } + } + } + + fclose(src); + + return 0; + } + std::string format_readable_bytes(uint64_t bytes) + { + std::string str("\0", 80); + + if(bytes >= SIZE_GB(1)) + { + double v = bytes * 1.0f / (SIZE_GB(1)); + size_t pos = 0; + + sprintf(&str[0], "%.2fGB", v); + pos = str.find("."); + while(pos > 3) + { + pos -= 3; + str.insert(pos, ","); + } + } + else if(bytes >= SIZE_MB(1)) + { + double v = bytes * 1.0f / (SIZE_MB(1)); + sprintf(&str[0], "%.2fMB", v); + } + else if(bytes >= SIZE_KB(1)) + { + double v = bytes * 1.0f / (SIZE_KB(1)); + sprintf(&str[0], "%.2fKB", v); + } + else + { + sprintf(&str[0], "%uB", (unsigned)bytes); + } + + return str; + } +} + diff --git a/common/referer.h b/common/referer.h new file mode 100644 index 0000000..2b60b93 --- /dev/null +++ b/common/referer.h @@ -0,0 +1,120 @@ +#pragma once + +// Objects life management +// +// created on 2022-11-29 +// + +#include +#include + +#define ALIGN_INT(val, n) ((((val) + (n) - 1) / (n)) * (n)) +#define WAIT_INFINITE 0 +#define SEC_2_MS(s) ((s) * 1000) +#define MSEC_2_US(ms) ((ms) * 1000) +#define USEC_2_NS(us) ((us) * 1000) +#define SEC_2_US(s) MSEC_2_US(SEC_2_MS(s)) +#define SEC_2_NS(s) USEC_2_NS(MSEC_2_US(SEC_2_MS(s))) +#define MSEC_2_NS(ms) USEC_2_NS(MSEC_2_US(ms)) + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +typedef std::mutex MUTEX; +typedef std::lock_guard LOCKER; + +// object life referer +// +// derived from 'refer' if your class used in multi-threads +// +class refer +{ + volatile int32_t ref_; + MUTEX mutex_; + +protected: + refer(); + virtual ~refer(); + + virtual void on_born(void); + virtual void on_dead(void); + +public: + virtual int32_t add_ref(void); + virtual int32_t release(void); +}; + +#include +class chronograph +{ + struct timeval bgn_; + +public: + chronograph(); + ~chronograph(); + + static bool now(struct timeval* tv); + static bool now(uint64_t* seconds, uint64_t* u_seconds); + static std::string now(bool with_ms = true/*whether with milliseconds*/); // return '2022-11-30 10:38:42.123', no '.123' if with_ms was false + +public: + uint64_t elapse_s(void); + uint64_t elapse_ms(void); + uint64_t elapse_us(void); + void reset(void); +}; + +#include +namespace sys_util +{ + int32_t enum_modules(bool(*on_found)(const char* path_module_name, void* param),// return false to stop enumeratin + void* param, // user defined data, passed into callback on_found + unsigned pid = -1 // process id, -1 is self + ); // return errno + + int32_t enum_files(const char* dir, // dir path + bool(*on_found)(const char* path_name, void* param), // return false to stop enumeratin + void* param // user defined data, passed into callback on_found + , bool recursive = true // walk recursive + ); // return errno + + std::string get_module_path(const char* module_name = nullptr + , unsigned pid = -1); // get module full path, nullptr is for main-exe + std::string read_link(const char* lnk); + size_t get_page_size(void); + bool create_folder(const char* dir); + + // Function: pick single-line info file data, return count of set-value variable + // + // Parameters: file - full path of local file + // + // line_max - max bytes of a line in file 'file' + // + // fmt - line fromat string, e.g. "model name : %60[\x20-\x7e]", "MemoryTotal: %ld", "address sizes : %d bits physical, %d bits virtual", ... + // + // args - variable list + // + // Return: count of the variable which got the value + template + int32_t get_inf_file_data(const char* file, size_t line_max, const char* fmt, Args ... args) + { + std::string buf("\0", line_max + 8); + FILE *src = fopen(file, "rb"); + int32_t count = 0; + + if(!src) + return 0; + + while(fgets(&buf[0], line_max, src)) + { + count = sscanf(&buf[0], fmt, args ...); + if(count > 0) + break; + } + fclose(src); + + return count; + } + + int32_t get_memory_info(uint64_t* total, uint64_t* available); + std::string format_readable_bytes(uint64_t bytes); // convert to readable text: 512B, 1.21KB, 1.10MB, 3.45GB, 1,234.56GB ... +} diff --git a/scanner/CMakeLists.txt b/scanner/CMakeLists.txt new file mode 100644 index 0000000..1e0bef0 --- /dev/null +++ b/scanner/CMakeLists.txt @@ -0,0 +1,22 @@ +project(scanner) +add_compile_options(-std=c++11) +set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fPIC") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC") +aux_source_directory(${PROJECT_SOURCE_DIR} DIR_SRCS) +file(GLOB DIR_HEADS "${PROJECT_SOURCE_DIR}/*.h") +set(DIR_SRCS ${DIR_SRCS} ${DIR_HEADS}) +# add_library(${PROJECT_NAME} SHARED ${DIR_SRCS}) +add_executable(scanner ${PROJECT_SOURCE_DIR}/main/scanner.cpp ${PROJECT_SOURCE_DIR}/../common/event_monitor.cpp ${PROJECT_SOURCE_DIR}/../common/ipc_util.cpp ${PROJECT_SOURCE_DIR}/../common/log_util.cpp ${PROJECT_SOURCE_DIR}/../common/referer.cpp ${PROJECT_SOURCE_DIR}/../common/ipc_wrapper.cpp) +link_libraries(libdl libpthread librt) + +target_link_libraries(${PROJECT_NAME} PRIVATE + dl + pthread + rt + ) +target_include_directories(${PROJECT_NAME} PRIVATE ${PROJECT_SOURCE_DIR} + ${PROJECT_SOURCE_DIR}/main + ${PROJECT_SOURCE_DIR}/../common + ) + +set(LIBRARY_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/out) \ No newline at end of file diff --git a/scanner/main/scanner.cpp b/scanner/main/scanner.cpp new file mode 100644 index 0000000..c6fe8c0 --- /dev/null +++ b/scanner/main/scanner.cpp @@ -0,0 +1,201 @@ +// scanner.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。 +// + +#include +#include +#include +#include + +#include "../../common/ipc_util.h" +#include "../../common/log_util.h" +#include "../../common/event_monitor.h" +#include "../../common/ipc_wrapper.h" + +static void test(const char* oper); + +int32_t main(int32_t argc, char *argv[]) +{ + char *oper = argc > 1 ? argv[1] : nullptr; + + log_cls::initialize(nullptr); + log_cls::log(LOG_LEVEL_DEBUG, "----starting...----\n"); + + test(oper); + + return 0; +} + +class rogger : public event_handler +{ + linux_event* quit_; + ipc_wrapper* ipc_; + safe_fifo reply_; + volatile bool run_; + + int32_t wait(void) + { + while(quit_->wait()) + { + if(!run_) + break; + + std::string answer(""); + + if (reply_.take(answer)) + { + size_t len = answer.length(); + + if (len) + ipc_->write(answer.c_str(), &len, false, 100); + } + } + } + int32_t run_client(ipc_wrapper* s) + { + size_t len = SIZE_MB(1); + char* buf = (char*)s->get_kbuf(&len); + + printf("--->request 1MB buffer of IPC internal, returned %u\n", len); + do + { + size_t l = 0, count = 0; + chronograph watch; + + printf("Input message: "); + while ((buf[l++] = getchar()) != '\n'); + buf[--l] = 0; + + watch.reset(); + s->write(buf, &l, true); + if(strcmp(buf, "exit") == 0) + break; + + while(1) + { + if(quit_->wait(50)) + { + std::string rcvd(""); + if (reply_.take(rcvd)) + { + if (rcvd.length()) + { + count++; + printf("reply(%d): %s\n", rcvd.length(), rcvd.c_str()); + l = rcvd.length(); + strcpy(buf, rcvd.c_str()); + if(s->write(buf, &l, true, 30) == ETIME) + { + printf("send content timouted(%ld in %ld us)!\n", count, watch.elapse_us()); + break; + } + } + } + if(rcvd.empty()) + printf("reply(0)\n"); + } + else + { + printf("wait reply timeout(%ld in %ld us).\n", count, watch.elapse_us()); + break; + } + } + + } while (1); + s->release_kbuf(buf); + } + +public: + rogger() : quit_(new linux_event("quit")), run_(true) + { + std::string param(sys_util::get_module_path() + ":0:1024:10"); + ipc_ = ipc_wrapper::create_ipc(this, ipc_wrapper::IPC_SHARED_MEM, param.c_str()); + } + +protected: + ~rogger() + { + quit_->release(); + } + +public: + int32_t on_event(int32_t ev, void* data, size_t data_len) override + { + if (ev == EVENT_READ) + { + char* d = (char*)data; + std::string str(d, data_len); + + d[data_len] = 0; + if(ipc_->is_first()) + { + printf("R(%d): %s\n", data_len, d); + for (size_t i = 0; i < str.length() / 2; ++i) + { + char c = str[i]; + + str[i] = str[str.length() - i - 1]; + str[str.length() - i - 1] = c; + } + } + + if (strcmp(d, "exit") == 0) + { + printf("Bye-bye :)\n"); + run_ = false; + // quit_->trigger(); + } + else + reply_.save(str); + quit_->trigger(); + } + + return 0; + } + int32_t get_fd(void) + { + return -1; + } + + void run(void) + { + if (ipc_->is_first()) + { + printf("%p run as server ...\n", getpid()); + wait(); + } + else + { + printf("%p run as client ...\n", getpid()); + run_client(ipc_); + } + + ipc_->stop(); + ipc_->release(); + } + void clear_kobjects(void) + { + ipc_->clear_kernel_objects(); + } + + void test_timed_wait(unsigned ms) + { + for(int i = 0; i < 1; ++i) + { + printf("%s: wait(%u) ...\n", chronograph::now().c_str(), ms); + printf("%s: wait result %d\n\n", chronograph::now().c_str(), quit_->wait(ms)); + } + } +}; + +static void test(const char* oper) +{ + rogger* r = new rogger(); + + r->test_timed_wait(123); + + if(oper && strcmp(oper, "clear") == 0) + r->clear_kobjects(); + else + r->run(); + r->release(); +} diff --git a/sln/scanner.vcxproj b/sln/scanner.vcxproj new file mode 100644 index 0000000..bc694dc --- /dev/null +++ b/sln/scanner.vcxproj @@ -0,0 +1,166 @@ + + + + + Debug + Win32 + + + Release + Win32 + + + Debug + x64 + + + Release + x64 + + + + + + + + + + + + + + + + + + + + + + 16.0 + Win32Proj + {0f166a6e-0f74-4c38-99cd-c74b09e283db} + scanner + 10.0 + + + + Application + true + v142 + Unicode + + + Application + false + v142 + true + Unicode + + + Application + true + v142 + Unicode + + + Application + false + v142 + true + Unicode + + + + + + + + + + + + + + + + + + + + + true + $(CurrentVsInstallRoot)\..\..\linux\include\;$(CurrentVsInstallRoot)\..\..\linux\include\x86_64-linux-gnu\;$(IncludePath) + + + false + $(CurrentVsInstallRoot)\..\..\linux\include\;$(CurrentVsInstallRoot)\..\..\linux\include\x86_64-linux-gnu\;$(IncludePath) + + + true + $(CurrentVsInstallRoot)\..\..\linux\include\;$(CurrentVsInstallRoot)\..\..\linux\include\x86_64-linux-gnu\;$(IncludePath) + + + false + $(CurrentVsInstallRoot)\..\..\linux\include\;$(CurrentVsInstallRoot)\..\..\linux\include\x86_64-linux-gnu\;$(IncludePath) + + + + Level3 + true + WIN32;_DEBUG;_CONSOLE;%(PreprocessorDefinitions) + true + + + Console + true + + + + + Level3 + true + true + true + WIN32;NDEBUG;_CONSOLE;%(PreprocessorDefinitions) + true + + + Console + true + true + true + + + + + Level3 + true + _DEBUG;_CONSOLE;%(PreprocessorDefinitions) + true + + + Console + true + + + + + Level3 + true + true + true + NDEBUG;_CONSOLE;%(PreprocessorDefinitions) + true + + + Console + true + true + true + + + + + + \ No newline at end of file diff --git a/sln/scanner.vcxproj.filters b/sln/scanner.vcxproj.filters new file mode 100644 index 0000000..45f3c91 --- /dev/null +++ b/sln/scanner.vcxproj.filters @@ -0,0 +1,63 @@ + + + + + {4FC737F1-C7A5-4376-A066-2A32D752A2FF} + cpp;c;cc;cxx;c++;cppm;ixx;def;odl;idl;hpj;bat;asm;asmx + + + {93995380-89BD-4b04-88EB-625FBE52EBFB} + h;hh;hpp;hxx;h++;hm;inl;inc;ipp;xsd + + + {67DA6AB6-F800-4c08-8B7A-83BB121AAD01} + rc;ico;cur;bmp;dlg;rc2;rct;bin;rgs;gif;jpg;jpeg;jpe;resx;tiff;tif;png;wav;mfcribbon-ms + + + {31dd8340-84a7-434a-9f85-b835fdedf634} + + + {e0ad062f-4e6a-4d9a-b010-83ff181576a4} + + + + + main + + + common + + + common + + + common + + + common + + + common + + + + + common + + + common + + + common + + + common + + + common + + + + + + \ No newline at end of file