newtx/sdk/base/ui.cpp

409 lines
11 KiB
C++

#include "ui.h"
#include "utils.h"
#include <string.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ipc class
namespace devui
{
class pipe_reader : public refer
{
std::string pipe_path_;
int fd_ = -1;
safe_thread worker_;
volatile bool run_ = true;
std::function<void(uint8_t*, size_t)> handler_;
void worker(void)
{
uint8_t buf[512] = {0};
while(run_)
{
int len = read(fd_, buf, sizeof(buf));
if(len == -1)
break;
handler_(buf, len);
}
}
public:
pipe_reader(const char* fifo, std::function<void(uint8_t*, size_t)> h) : pipe_path_(fifo), handler_(h)
{
auto r = [this](void) -> void
{
int cycle = 0;
while(run_)
{
fd_ = ::open(pipe_path_.c_str(), O_RDONLY);
utils::to_log(LOG_LEVEL_ALL, "open pipe_reader(%s) %d times = %d\n", ++cycle, fd_);
if(fd_ != -1)
{
worker();
if(fd_ != -1)
{
::close(fd_);
fd_ = -1;
}
}
}
};
worker_.start(r, SIZE_MB(1), "worker", (void*)&pipe_reader::worker);
}
protected:
virtual ~pipe_reader()
{
stop();
}
public:
bool is_ready(void)
{
return fd_ != -1;
}
void stop(void)
{
run_ = false;
if(fd_ != -1)
{
int fd = fd_;
fd_ = -1;
::close(fd);
}
}
};
class pipe_sender : public refer
{
std::string pipe_path_;
int fd_ = -1;
safe_thread worker_;
volatile bool run_ = true;
safe_fifo<std::string> sent_que_;
void worker(void)
{
while(run_)
{
std::string cont("");
if(sent_que_.take(cont, true))
{
int s = 0, off = 0;
do
{
s = write(fd_, cont.c_str() + off, cont.length() - off);
if(s == -1)
{
utils::to_log(LOG_LEVEL_FATAL, "Send UI message failed: %d(%s)\n", errno, strerror(errno));
break;
}
off += s;
}while(off < cont.length());
if(s == -1)
break;
}
}
}
public:
pipe_sender(const char* fifo) : pipe_path_(fifo), sent_que_("sent-que")
{
sent_que_.enable_wait_log(false);
auto r = [this](void) -> void
{
int cycle = 0;
while(run_)
{
fd_ = ::open(pipe_path_.c_str(), O_WRONLY);
utils::to_log(LOG_LEVEL_ALL, "open pipe_sender(%s) %d times = %d\n", ++cycle, fd_);
if(fd_ != -1)
{
worker();
if(fd_ != -1)
{
::close(fd_);
fd_ = -1;
}
}
}
};
worker_.start(r, SIZE_MB(1), "worker", (void*)&pipe_sender::worker);
}
protected:
virtual ~pipe_sender()
{
stop();
}
public:
bool is_ready(void)
{
return fd_ != -1;
}
void stop(void)
{
run_ = false;
if(fd_ != -1)
{
int fd = fd_;
fd_ = -1;
::close(fd);
}
}
void send(std::string& msg)
{
sent_que_.save(msg, true);
}
};
class ui_messenger
{
std::function<void(LPMSGSTREAM)> cb_;
safe_fifo<std::string> sent_que_;
volatile bool run_ = true;
bool ui_;
bool ready_ = true;
safe_thread workers_;
int fdo_ = -1;
int fdi_ = -1;
std::string fmode(int m)
{
if(m == O_RDONLY)
return "O_RDONLY";
if(m == O_WRONLY)
return "O_WRONLY";
return "Unk";
}
void init(void)
{
const char* fifo[] = {"/tmp/worker", "/tmp/ui"};
int mode[] = {O_RDONLY, O_WRONLY};
int *fd[] = {&fdi_, &fdo_};
if(ui_)
{
mkfifo(fifo[!ui_], 0777);
mkfifo(fifo[ui_], 0777);
fdo_ = open(fifo[1], mode[1]);
fdi_ = open(fifo[0], mode[0]);
}
else
{
fdi_ = open(fifo[1], mode[0]);
fdo_ = open(fifo[0], mode[1]);
}
if(fdo_ == -1 || fdi_ == -1)
{
printf("Out fd = %d, In fd = %d\n", fdo_, fdi_);
this->close();
}
else
{
auto r = [this](void) -> void
{
receiver();
};
auto s = [this](void) -> void
{
sender();
};
workers_.start(r, SIZE_MB(1), "receiver");
workers_.start(s, SIZE_MB(1), "sender");
}
}
void close(void)
{
ready_ = false;
if(fdo_ != -1)
::close(fdo_);
if(fdi_ != -1)
::close(fdi_);
fdi_ = fdo_ = -1;
}
void receiver(void)
{
std::string rcv("");
char buf[300] = {0};
LPMSGSTREAM pack = nullptr;
chronograph watch;
printf("ui-receiver running ...\n");
while(run_)
{
watch.reset();
int r = read(fdi_, buf, _countof(buf));
if(r == -1)
{
printf("Read UI message failed: %d(%s)\n", errno, strerror(errno));
utils::to_log(LOG_LEVEL_FATAL, "Read UI message failed: %d(%s)\n", errno, strerror(errno));
this->close();
break;
}
else if(r == 0 /*&& errno == ENOENT*/) // errno maybe ZERO, here ommit the error code
{
// peer closed, wait 10ms ...
if(watch.elapse_ms() > 10)
{
printf("PIPE: peer closed(read ZERO byte and error = %d).\n", errno);
utils::to_log(LOG_LEVEL_DEBUG, "PIPE: peer closed(read ZERO byte and error = %d).\n", errno);
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
rcv += std::string(buf, r);
if(rcv.length())
{
int off = 0;
pack = (LPMSGSTREAM)&rcv[off];
while(pack->whole_size() <= rcv.length() - off)
{
cb_(pack);
off += pack->whole_size();
pack = (LPMSGSTREAM)&rcv[off];
}
if(off)
rcv.erase(0, off);
}
}
printf("ui-receiver exited.\n");
}
void sender(void)
{
printf("ui-sender running ...\n");
while(run_)
{
std::string cont("");
if(sent_que_.take(cont, true))
{
int s = 0, off = 0;
do
{
s = write(fdo_, cont.c_str() + off, cont.length() - off);
if(s == -1)
{
printf("Send UI message failed: %d(%s)\n", errno, strerror(errno));
utils::to_log(LOG_LEVEL_FATAL, "Send UI message failed: %d(%s)\n", errno, strerror(errno));
this->close();
break;
}
off += s;
}while(off < cont.length());
if(s == -1)
break;
}
}
printf("ui-sender exited.\n");
}
public:
ui_messenger(std::function<void(LPMSGSTREAM)> uicb
, bool ui) : sent_que_("ui-sent-que")
, cb_(uicb), ui_(ui)
{
sent_que_.enable_wait_log(false);
init();
}
~ui_messenger()
{
close();
}
public:
bool send(std::string& msg)
{
if(ready_)
sent_que_.save(msg, true);
return ready_;
}
void stop(void)
{
sent_que_.trigger();
run_ = false;
this->close();
}
};
};
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// interface
static MUTEX msg_lk_;
static devui::ui_messenger *msgr = nullptr;
namespace devui
{
void init_ui(std::function<void(LPMSGSTREAM)> uicb, bool ui)
{
SIMPLE_LOCK(msg_lk_);
if(!msgr)
msgr = new ui_messenger(uicb, ui);
}
void uninit_ui(void)
{
SIMPLE_LOCK(msg_lk_);
if(msgr)
{
msgr->stop();
delete msgr;
}
msgr = nullptr;
}
bool send_message(uint32_t msgid, uint8_t* data, uint8_t size)
{
std::string stream("");
MSGSTREAM pack;
bool ret = false;
size_t fix = sizeof(pack.data);
memset(&pack, 0, sizeof(pack));
pack.ver = 1;
pack.msg = msgid;
pack.size = size;
if(size > fix)
{
memcpy(pack.data, data, fix);
stream = std::string((char*)&pack, sizeof(pack));
stream += std::string((char*)data + fix, size - fix);
}
else
{
memcpy(pack.data, data, size);
stream = std::string((char*)&pack, sizeof(pack));
}
{
SIMPLE_LOCK(msg_lk_);
if(msgr)
ret = msgr->send(stream);
}
return ret;
}
bool send_status_message(uint32_t msgid, int align_h, int align_v, int font, int clears)
{
devui::STATMSG msg;
msg.msg_words_id = msgid;
msg.align_h = align_h;
msg.align_v = align_v;
msg.clear = clears;
msg.font = font;
msg.reserved = 0;
return send_message(devui::UI_STATUS_MESSAGE, (uint8_t*)&msg, sizeof(msg));
}
};