code_scanner/common/event_monitor.cpp

256 lines
6.0 KiB
C++
Raw Normal View History

2022-12-05 08:03:17 +00:00
#include "event_monitor.h"
#include <fcntl.h> /* nonblocking */
#include <sys/resource.h> /*setrlimit */
#include <sys/eventfd.h>
#include <sys/sysinfo.h>
#include <sys/ipc.h>
#include <sys/ioctl.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <unistd.h>
#include <vector>
#include <string.h>
#include "log_util.h"
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// event_handler
event_handler::event_handler()
{}
event_handler::~event_handler()
{}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// parent_holder
parent_holder::parent_holder()
{}
parent_holder::~parent_holder()
{}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// linux - epoll wrapper ...
class epoll_wrapper : public event_monitor
{
int32_t epoll_fd_;
int32_t quit_fd_[2];
volatile bool run_;
std::vector<THREAD_PTR> threads_;
static int32_t epoll_max;
void clear_threads(void)
{
if(threads_.size())
{
for (size_t i = 0; i < threads_.size(); ++i)
{
if (threads_[i]->joinable())
threads_[i]->join();
threads_[i].reset();
}
threads_.clear();
}
}
void close_monitor_fd(void)
{
if (epoll_fd_ != -1)
close(epoll_fd_);
epoll_fd_ = -1;
}
void clear(void)
{
clear_threads();
close_monitor_fd();
}
void monitor_thread(void)
{
log_cls::log(LOG_LEVEL_DEBUG, "monitor thread(%p) of object(%p) is working ...\n", gettid(), this);
while (run_)
{
struct epoll_event evs;
memset(&evs, 0, sizeof(evs));
if (epoll_wait(epoll_fd_, &evs, 1, -1) == -1)
continue;
if (evs.events == EPOLLOUT && evs.data.fd == quit_fd_[1])
break;
if (evs.events == EPOLLIN)
((event_handler*)evs.data.ptr)->on_event(event_handler::EVENT_READ, nullptr, 0);
else if (evs.events == EPOLLOUT)
((event_handler*)evs.data.ptr)->on_event(event_handler::EVENT_WRITE, nullptr, 0);
else
;
}
log_cls::log(LOG_LEVEL_DEBUG, "monitor thread(%p) of object(%p) finished working.\n", gettid(), this);
}
protected:
virtual ~epoll_wrapper()
{
stop();
}
public:
epoll_wrapper(const char* desc) : event_monitor(desc), epoll_fd_(-1), run_(true)
{
memset(quit_fd_, -1, sizeof(quit_fd_));
}
public:
virtual int32_t start(int32_t threads = 1) override
{
int32_t ret = stop();
struct rlimit rt;
rt.rlim_max = rt.rlim_cur = epoll_wrapper::epoll_max;
if (ret == 0 && setrlimit(RLIMIT_NOFILE, &rt) == 0)
{
epoll_fd_ = epoll_create(epoll_wrapper::epoll_max);
if (epoll_fd_ == -1)
{
ret = errno;
log_cls::log(LOG_LEVEL_FATAL, "epoll_create for '%s' failed: %s\n", desc_.c_str(), strerror(ret));
}
else
{
run_ = true;
for (size_t i = 0; i < (size_t)threads; ++i)
{
THREAD_PTR t;//(new std::thread(&epoll_wrapper::monitor_thread, this));
t.reset(new std::thread(&epoll_wrapper::monitor_thread, this));
threads_.push_back(t);
}
}
}
else
{
log_cls::log(LOG_LEVEL_FATAL, "setrlimit for '%s-epoll' failed: %s\n", desc_.c_str(), strerror(ret));
}
return ret;
}
virtual int32_t stop(void) override
{
if(threads_.size())
{
struct epoll_event ev;
#ifdef __USE_GNU
pipe2(quit_fd_, O_NONBLOCK);
#else
pipe(quit_fd_);
#endif
log_cls::log(LOG_LEVEL_DEBUG, "quit fd[0] = %p, fd[1] = %p\n", quit_fd_[0], quit_fd_[1]);
run_ = false;
ev.data.fd = quit_fd_[1];
ev.events = EPOLLOUT | EPOLLET | EPOLLONESHOT;
for(size_t i = 0; i < threads_.size(); ++i)
epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, ev.data.fd, &ev);
clear_threads();
close(quit_fd_[0]);
close(quit_fd_[1]);
memset(quit_fd_, -1, sizeof(quit_fd_));
}
close_monitor_fd();
return 0;
}
virtual int32_t add_fd(event_handler* handler) override
{
struct epoll_event ev;
int32_t ret = -1;
if (!handler || handler->get_fd() == -1)
return EINVAL;
if (epoll_fd_ == -1)
return EFAULT;
handler->add_ref(); // add ref for epoll_event holder ...
ev.data.ptr = handler;
ev.events = EPOLLIN | EPOLLOUT | EPOLLET; // EPOLLONESHOT | EPOLLHUP
ret = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, handler->get_fd(), &ev);
if (ret == -1)
{
ret = errno;
log_cls::log(LOG_LEVEL_FATAL, "add fd(%d) to %s-epoll failed: %s\n", handler->get_fd(), desc_.c_str(), strerror(ret));
handler->release();
}
return ret;
}
virtual int32_t remove_fd(event_handler* handler) override
{
struct epoll_event ev;
int32_t ret = -1;
if (!handler || handler->get_fd() == -1)
return EINVAL;
if (epoll_fd_ == -1)
return EFAULT;
ev.data.ptr = handler;
ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLHUP; // EPOLLONESHOT
ret = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, handler->get_fd(), &ev);
if (ret == 0)
{
// ENOENT returned if object 'handler' has not registered, so we can free it here when success in EPOLL_CTL_DEL ...
handler->release();
}
else
{
ret = errno;
log_cls::log(LOG_LEVEL_FATAL, "remove fd(%d) from %s-epoll failed: %s\n", handler->get_fd(), desc_.c_str(), strerror(ret));
}
return ret;
}
};
int32_t epoll_wrapper::epoll_max = 10;
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// event_handler
event_monitor::event_monitor(const char* desc) : desc_(desc ? desc : "")
{
log_cls::log(LOG_LEVEL_DEBUG, "+event_monitor(%p) of '%s' contructing ...\n", this, desc_.c_str());
}
event_monitor::~event_monitor()
{
log_cls::log(LOG_LEVEL_DEBUG, "-event_monitor(%p) of '%s' destroyed\n", this, desc_.c_str());
}
event_monitor* event_monitor::create(const char* desc, int32_t type)
{
if (type == EV_TYPE_EPOLL)
return dynamic_cast<event_monitor*>(new epoll_wrapper(desc));
else
return nullptr;
}