#include "ui.h" #include "utils.h" #include #include #include #include #define PIPE_PROTO_VER MAKEWORD(0, 1) ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // ipc class namespace devui { class pipe_reader : public refer { std::string pipe_path_; int fd_ = -1; safe_thread worker_; volatile bool run_ = true; std::function handler_; void worker(void) { uint8_t buf[512] = {0}; while(run_) { int len = read(fd_, buf, sizeof(buf)); if(len == -1) break; handler_(buf, len); } } public: pipe_reader(const char* fifo, std::function h) : pipe_path_(fifo), handler_(h) { auto r = [this](void) -> void { int cycle = 0; while(run_) { fd_ = ::open(pipe_path_.c_str(), O_RDONLY); utils::to_log(LOG_LEVEL_ALL, "open pipe_reader(%s) %d times = %d\n", ++cycle, fd_); if(fd_ != -1) { worker(); if(fd_ != -1) { ::close(fd_); fd_ = -1; } } } }; worker_.start(r, SIZE_MB(1), "worker", (void*)&pipe_reader::worker); } protected: virtual ~pipe_reader() { stop(); } public: bool is_ready(void) { return fd_ != -1; } void stop(void) { run_ = false; if(fd_ != -1) { int fd = fd_; fd_ = -1; ::close(fd); } } }; class pipe_sender : public refer { std::string pipe_path_; int fd_ = -1; safe_thread worker_; volatile bool run_ = true; safe_fifo sent_que_; void worker(void) { while(run_) { std::string cont(""); if(sent_que_.take(cont, true)) { int s = 0, off = 0; do { s = write(fd_, cont.c_str() + off, cont.length() - off); if(s == -1) { utils::to_log(LOG_LEVEL_FATAL, "Send UI message failed: %d(%s)\n", errno, strerror(errno)); break; } off += s; }while(off < cont.length()); if(s == -1) break; } } } public: pipe_sender(const char* fifo) : pipe_path_(fifo), sent_que_("sent-que") { sent_que_.enable_wait_log(false); auto r = [this](void) -> void { int cycle = 0; while(run_) { fd_ = ::open(pipe_path_.c_str(), O_WRONLY); utils::to_log(LOG_LEVEL_ALL, "open pipe_sender(%s) %d times = %d\n", ++cycle, fd_); if(fd_ != -1) { worker(); if(fd_ != -1) { ::close(fd_); fd_ = -1; } } } }; worker_.start(r, SIZE_MB(1), "worker", (void*)&pipe_sender::worker); } protected: virtual ~pipe_sender() { stop(); } public: bool is_ready(void) { return fd_ != -1; } void stop(void) { run_ = false; if(fd_ != -1) { int fd = fd_; fd_ = -1; ::close(fd); } } void send(std::string& msg) { sent_que_.save(msg, true); } }; class ui_messenger { std::function cb_; safe_fifo sent_que_; volatile bool run_ = true; bool ui_; bool ready_ = true; safe_thread workers_; int fdo_ = -1; int fdi_ = -1; std::string fmode(int m) { if(m == O_RDONLY) return "O_RDONLY"; if(m == O_WRONLY) return "O_WRONLY"; return "Unk"; } void init(void) { const char* fifo[] = {"/tmp/worker", "/tmp/ui"}; int mode[] = {O_RDONLY, O_WRONLY}; int *fd[] = {&fdi_, &fdo_}; mkfifo(fifo[!ui_], 0777); mkfifo(fifo[ui_], 0777); *fd[ui_] = open(fifo[1], mode[ui_]); utils::to_log(LOG_LEVEL_ALL, "open pipe(%s) = %d\n", fifo[1], *fd[ui_]); *fd[!ui_] = open(fifo[0], mode[!ui_]); utils::to_log(LOG_LEVEL_ALL, "open pipe(%s) = %d\n", fifo[0], *fd[!ui_]); if(fdo_ == -1 || fdi_ == -1) { printf("Out fd = %d, In fd = %d\n", fdo_, fdi_); this->close(); } else { auto r = [this](void) -> void { receiver(); }; auto s = [this](void) -> void { sender(); }; workers_.start(r, SIZE_MB(1), "receiver"); workers_.start(s, SIZE_MB(1), "sender"); } } void close(void) { ready_ = false; if(fdo_ != -1) ::close(fdo_); if(fdi_ != -1) ::close(fdi_); fdi_ = fdo_ = -1; } void receiver(void) { std::string rcv(""); char buf[300] = {0}; LPMSGSTREAM pack = nullptr; chronograph watch; printf("ui-receiver running ...\n"); while(run_) { watch.reset(); int r = read(fdi_, buf, _countof(buf)); if(r == -1) { printf("Read UI message failed: %d(%s)\n", errno, strerror(errno)); utils::to_log(LOG_LEVEL_FATAL, "Read UI message failed: %d(%s)\n", errno, strerror(errno)); this->close(); break; } else if(r == 0 /*&& errno == ENOENT*/) // errno maybe ZERO, here ommit the error code { // peer closed, wait 10ms ... if(watch.elapse_ms() > 10) { MSGSTREAM ms; memset(&ms, 0, sizeof(ms)); ms.ver = PIPE_PROTO_VER; ms.msg = UI_STATUS_PEER_CLOSED; cb_(&ms); printf("PIPE: peer closed(read ZERO byte and error = %d).\n", errno); utils::to_log(LOG_LEVEL_DEBUG, "PIPE: peer closed(read ZERO byte and error = %d).\n", errno); } std::this_thread::sleep_for(std::chrono::milliseconds(10)); continue; } rcv += std::string(buf, r); if(rcv.length()) { int off = 0; pack = (LPMSGSTREAM)&rcv[off]; while(pack->whole_size() <= rcv.length() - off) { cb_(pack); off += pack->whole_size(); pack = (LPMSGSTREAM)&rcv[off]; } if(off) rcv.erase(0, off); } } printf("ui-receiver exited.\n"); } void sender(void) { printf("ui-sender running ...\n"); while(run_) { std::string cont(""); if(sent_que_.take(cont, true)) { int s = 0, off = 0; do { s = write(fdo_, cont.c_str() + off, cont.length() - off); if(s == -1) { printf("Send UI message failed: %d(%s)\n", errno, strerror(errno)); utils::to_log(LOG_LEVEL_FATAL, "Send UI message failed: %d(%s)\n", errno, strerror(errno)); this->close(); break; } off += s; }while(off < cont.length()); if(s == -1) break; } } printf("ui-sender exited.\n"); } public: ui_messenger(std::function uicb , bool ui) : sent_que_("ui-sent-que") , cb_(uicb), ui_(ui) { sent_que_.enable_wait_log(false); init(); } ~ui_messenger() { close(); } public: bool send(std::string& msg) { if(ready_) sent_que_.save(msg, true); return ready_; } void stop(void) { sent_que_.trigger(); run_ = false; this->close(); } }; }; ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // interface static MUTEX msg_lk_; static devui::ui_messenger *msgr = nullptr; namespace devui { void init_ui(std::function uicb, bool ui) { SIMPLE_LOCK(msg_lk_); if(!msgr) msgr = new ui_messenger(uicb, ui); } void uninit_ui(void) { SIMPLE_LOCK(msg_lk_); if(msgr) { msgr->stop(); delete msgr; } msgr = nullptr; } bool send_message(uint32_t msgid, uint8_t* data, uint8_t size) { std::string stream(""); MSGSTREAM pack; bool ret = false; size_t fix = sizeof(pack.data); memset(&pack, 0, sizeof(pack)); pack.ver = PIPE_PROTO_VER; pack.msg = msgid; pack.size = size; if(size > fix) { memcpy(pack.data, data, fix); stream = std::string((char*)&pack, sizeof(pack)); stream += std::string((char*)data + fix, size - fix); } else { memcpy(pack.data, data, size); stream = std::string((char*)&pack, sizeof(pack)); } { SIMPLE_LOCK(msg_lk_); if(msgr) ret = msgr->send(stream); } return ret; } bool send_status_message(uint32_t msgid, int align_h, int align_v, int font, int clears) { devui::STATMSG msg; msg.msg_words_id = msgid; msg.align_h = align_h; msg.align_v = align_v; msg.clear = clears; msg.font = font; msg.reserved = 0; return send_message(devui::UI_STATUS_MESSAGE, (uint8_t*)&msg, sizeof(msg)); } };