手搓内存池

This commit is contained in:
gb 2023-12-13 14:57:08 +08:00
parent b6884b148e
commit 32f830ee94
6 changed files with 260 additions and 75 deletions

View File

@ -27,6 +27,8 @@
async_scanner::async_scanner() : usb_(nullptr), cfg_mgr_(nullptr), scan_id_(0)
{
utils::init_log(LOG_TYPE_FILE);
utils::to_log(LOG_LEVEL_DEBUG, "System info: page-size = %u, mapping-page-size = %u, disk-cluster-size = %u.\n"
, sys_info::page_size, sys_info::page_map_size, sys_info::cluster_size);
init();
auto bulk_handle = [&](dyn_mem_ptr data, uint32_t* used, packet_data_base_ptr* required) -> dyn_mem_ptr
@ -277,7 +279,7 @@ dyn_mem_ptr async_scanner::handle_file_receive(LPPACK_BASE pack, uint32_t* used,
{
saver->set_packet_param(pack->cmd, pack->pack_id);
}
utils::to_log(LOG_LEVEL_DEBUG, "receive file (%u bytes): %s = %d\n", pfi->size, path.c_str(), err);
utils::to_log(LOG_LEVEL_DEBUG, "Receiving file(%p bytes): %s = %d\n", pfi->size, path.c_str(), err);
*required = dynamic_cast<packet_data_base_ptr>(saver);
}
@ -353,11 +355,20 @@ dyn_mem_ptr async_scanner::handle_file_send_roger(LPPACK_BASE pack, uint32_t* us
}
}
}
*used = base_head_size;
*required = dynamic_cast<packet_data_base_ptr>(reader);
*used = base_head_size;
if(pack->data)
{
utils::to_log(LOG_LEVEL_DEBUG, "Sending file '%s' (%p) cancelled with error %d.\n", reader->path_file(), reader, pack->data);
reader->release();
reader = nullptr;
}
else
{
// if reader was nullptr, notify failed by INT or control ?
utils::to_log(LOG_LEVEL_DEBUG, "File Send beginning (%p) ...\n", reader);
utils::to_log(LOG_LEVEL_DEBUG, "Sending file '%s' (%p) ...\n", reader->path_file(), reader);
}
*required = dynamic_cast<packet_data_base_ptr>(reader);
}
return reply;

View File

@ -172,7 +172,7 @@ file_saver::file_saver(void) : size_(0), wrote_(0), path_(""), check_(""), dst_(
{}
file_saver::~file_saver()
{
utils::to_log(LOG_LEVEL_DEBUG, "Write file(%s) over(%ld/%ld).\n", path_.c_str(), wrote_, size_);
utils::to_log(LOG_LEVEL_DEBUG, "Wrote over of file(%s) at(%llu/%llu).\n", path_.c_str(), wrote_, size_);
close();
}
@ -199,6 +199,10 @@ int file_saver::set_verify_data(const char* data, size_t len)
return 0;
}
const char* file_saver::path_file(void)
{
return path_.c_str();
}
int file_saver::open(const char* path, uint64_t size, bool in_mem, size_t off)
{
int err = 0;
@ -480,7 +484,7 @@ file_reader::~file_reader()
if (map_)
map_->release();
notify_progress(len_, len_, 0); // ensure 100%
utils::to_log(LOG_LEVEL_DEBUG, "Read file(%s) over(%ld/%ld).\n", path_.c_str(), consume_, len_);
utils::to_log(LOG_LEVEL_DEBUG, "Read over of file(%s) at(%p/%p).\n", path_.c_str(), consume_, len_);
}
int file_reader::open(const char* file, bool in_mem, size_t off)
@ -569,6 +573,10 @@ FILE* file_reader::detach(void)
return ret;
}
const char* file_reader::path_file(void)
{
return path_.c_str();
}
bool file_reader::is_memory_block(void)
{
@ -718,7 +726,6 @@ int file_map::open(const char* file, uint64_t size, bool readonly)
return err;
}
map_ = (HANDLE)::open(file, O_RDWR, 0666);
utils::to_log(LOG_LEVEL_DEBUG, "FileMapping: open('%s', O_APPEND, 0666) = %p\n", file, map_);
}
if (map_ == INVALID_HANDLE_VALUE)
{
@ -800,7 +807,7 @@ uint8_t* file_map::map(uint64_t off, uint32_t* size)
#endif
if (!buf_)
{
utils::to_log(LOG_LEVEL_WARNING, "FileMapping: request map(%p + %u), real map(%p + %u) failed: %d\n"
utils::to_log(LOG_LEVEL_WARNING, "FileMapping: request map(%llu + %u), real map(%llu + %u) failed: %d\n"
, off, *size, map_off_, map_size_, GetLastError());
*size = 0;
}
@ -814,3 +821,95 @@ uint8_t* file_map::buffer(void)
{
return buf_ ? buf_ + off_ : nullptr;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//
dyn_mem_pool::dyn_mem_pool(uint32_t cnt, uint32_t unit) : count_(cnt), unit_(unit)
{
pool_ = (dyn_mem_ptr*)malloc(cnt * sizeof(dyn_mem_ptr));
for(uint32_t i = 0; i < cnt; ++i)
{
pool_[i] = dyn_mem::memory(unit);
}
}
dyn_mem_pool::~dyn_mem_pool()
{
if(pool_)
{
for(uint32_t i = 0; i < count_; ++i)
{
if(pool_[i])
{
pool_[i]->release();
}
}
free(pool_);
}
pool_ = nullptr;
}
dyn_mem_ptr dyn_mem_pool::take(void)
{
dyn_mem_ptr buf = nullptr;
if(!pool_[rpos_])
{
chronograph watch;
do
{
std::this_thread::sleep_for(std::chrono::milliseconds(1));
} while (run_ && !pool_[rpos_]);
utils::to_log(LOG_LEVEL_DEBUG, "Waiting for taking memory pool took %ums at %u.\n", watch.elapse_ms(), rpos_);
}
if(pool_[rpos_])
{
buf = pool_[rpos_];
pool_[rpos_++] = nullptr;
if(rpos_ >= count_)
rpos_ = 0;
}
return buf;
}
void dyn_mem_pool::put(dyn_mem_ptr buf)
{
if(pool_[wpos_])
{
chronograph watch;
do
{
std::this_thread::sleep_for(std::chrono::milliseconds(1));
} while (run_ && pool_[wpos_]);
utils::to_log(LOG_LEVEL_DEBUG, "Waiting for putting memory pool took %ums at %u.\n", watch.elapse_ms(), wpos_);
}
if(pool_[wpos_])
{
buf->release();
}
else
{
pool_[wpos_++] = buf;
if(wpos_ >= count_)
wpos_ = 0;
}
}
void dyn_mem_pool::stop(void)
{
run_ = false;
}
uint32_t dyn_mem_pool::count(void)
{
return count_;
}
uint32_t dyn_mem_pool::unit(void)
{
return unit_;
}
uint32_t dyn_mem_pool::take_pos(void)
{
return rpos_;
}

View File

@ -25,8 +25,8 @@ class packet_data_base : public refer
void* user_data_;
protected:
uint32_t pack_cmd_;
uint32_t pack_id_;
uint32_t pack_cmd_ = 0;
uint32_t pack_id_ = 0;
uint32_t session_id_ = -1;
public:
@ -167,6 +167,7 @@ protected:
public:
int set_verify_data(const char* data, size_t len);
int open(const char* path, uint64_t size, bool in_mem = false, size_t off = 0);
const char* path_file(void);
public:
virtual int put_data(const void* data, uint32_t* size/*[in] - total bytes of data; [out] - used bytes*/) override;
@ -264,6 +265,7 @@ public:
int open(const char* file, bool in_mem, size_t off = 0);
int attach(FILE* f);
FILE* detach(void);
const char* path_file(void);
public:
virtual bool is_memory_block(void) override;
@ -285,6 +287,30 @@ CLS_PTR(dyn_mem);
CLS_PTR(file_reader);
class dyn_mem_pool : public refer
{
volatile bool run_ = true;
dyn_mem_ptr *pool_ = nullptr;
uint32_t count_ = 0;
uint32_t unit_ = 0;
uint32_t wpos_ = 0;
uint32_t rpos_ = 0;
public:
dyn_mem_pool(uint32_t cnt, uint32_t unit);
protected:
virtual ~dyn_mem_pool();
public:
dyn_mem_ptr take(void);
void put(dyn_mem_ptr buf);
void stop(void);
uint32_t count(void);
uint32_t unit(void);
uint32_t take_pos(void);
};
// callback proto
//
// parameters: usb_functionfs_event* - the function event ptr

View File

@ -58,6 +58,7 @@ enum woker_status
WORKER_STATUS_BUSY, // in working
WORKER_STATUS_ERROR, // error occurs
WORKER_STATUS_RESET, // in reset(close and reopen) process
WORKER_STATUS_WAIT_RESOURCE, // wait resource
};
enum packet_cmd

View File

@ -18,7 +18,12 @@ async_usb_gadget::async_usb_gadget(std::function<FUNCTION_PROTO_COMMAND_HANDLE>
, std::function<void(bool)> dev_conn)
: handle_cmd_(cmd_handler), dev_connect_(dev_conn)
, enc_head_(ENCRYPT_CMD_NONE), enc_payload_(ENCRYPT_NONE), enc_data_(0)
, io_buf_("IO-buf"), cmd_que_("in-queue"), sent_que_("out-queue")
#ifdef MEM_POOL
, io_buf_(nullptr)
#else
, io_buf_("IO-buf")
#endif
, cmd_que_("in-queue"), sent_que_("out-queue")
, wait_in_("wait_usb_enable_in"), wait_out_("wait_usb_enable_out")
{
cmd_que_.enable_wait_log(false);
@ -90,6 +95,10 @@ async_usb_gadget::async_usb_gadget(std::function<FUNCTION_PROTO_COMMAND_HANDLE>
unit_in_ = unit_out_ = sys_info::page_size; // dev_->get_max_packet();
// allocate 10MB for IO
#ifdef MEM_POOL
io_buf_ = new dyn_mem_pool(256, unit_out_);
utils::to_log(LOG_LEVEL_DEBUG, "Prepare %u(%u * %u) for IO.\n", io_buf_->count() * unit_out_, io_buf_->count(), unit_in_);
#else
for(int i = 0; i < SIZE_MB(10) / unit_out_; ++i)
{
dyn_mem_ptr buf = dyn_mem::memory(unit_out_);
@ -97,6 +106,7 @@ async_usb_gadget::async_usb_gadget(std::function<FUNCTION_PROTO_COMMAND_HANDLE>
io_buf_.save(buf);
}
utils::to_log(LOG_LEVEL_DEBUG, "Prepare %u(%u * %u) for IO.\n", io_buf_.size() * unit_out_, io_buf_.size(), unit_in_);
#endif
threads_.start(task, "thread_pump_task", (void*)&async_usb_gadget::thread_pump_task);
threads_.start(bulkw, "thread_write_bulk", (void*)&async_usb_gadget::thread_write_bulk);
@ -108,26 +118,38 @@ async_usb_gadget::~async_usb_gadget()
{
stop();
#ifdef MEM_POOL
io_buf_->release();
#else
dyn_mem_ptr buf = nullptr;
while(io_buf_.take(buf, false))
{
if(buf)
buf->release();
}
#endif
}
dyn_mem_ptr async_usb_gadget::get_io_buffer(void)
{
dyn_mem_ptr buf = nullptr;
#ifdef MEM_POOL
buf = io_buf_->take();
#else
io_buf_.take(buf, true);
#endif
return buf;
}
void async_usb_gadget::free_io_buffer(dyn_mem_ptr buf)
{
buf->clear_data();
#ifdef MEM_POOL
io_buf_->put(buf);
#else
io_buf_.save(buf, true);
#endif
}
const char* async_usb_gadget::ep0_status_desc(int ep0_status, char* unk_buf/*>= 20 bytes*/)
@ -305,20 +327,32 @@ dyn_mem_ptr async_usb_gadget::handle_ctrl_setup(dyn_mem_ptr data)
return reply;
}
int async_usb_gadget::inner_write_bulk(int fd, data_source_ptr data, int* err)
int async_usb_gadget::inner_write_bulk_memory(int fd, uint8_t* buf, uint32_t* len, uint32_t bulk_size)
{
unsigned char* ptr = data->ptr();
size_t bulk_size = unit_in_ * get_buf_coefficient(),
total = data->get_rest(), off = 0, size = total > bulk_size ? bulk_size : total;
int w = 0;
bool fail = false;
int w = 0, to = 0, err = 0, total = *len, off = 0,
size = *len;
statu_in_ = WORKER_STATUS_BUSY;
want_bytes_in_ = total;
if(data->is_memory_block())
while(1)
{
while((w = write(fd, ptr + off, size)) > 0)
w = write(fd, buf + off, size);
if(w == -1)
{
if(errno == ETIMEDOUT)
{
if (to++ > 3)
{
utils::to_log(LOG_LEVEL_DEBUG, "Write bulk failed at (%u/%u) for err: %d\n", total, *len, errno);
break;
}
utils::to_log(LOG_LEVEL_DEBUG, "Write bulk timeout at (%u/%u), try again after %ums ...\n", off, total, to * 100);
std::this_thread::sleep_for(std::chrono::milliseconds(to * 100));
continue;
}
err = errno;
break;
}
to = 0;
off += w;
want_bytes_in_ -= w;
if(off >= total)
@ -328,57 +362,39 @@ int async_usb_gadget::inner_write_bulk(int fd, data_source_ptr data, int* err)
else
size = total - off;
}
if(w <= 0 && err)
{
*err = errno;
*len = off;
return err;
}
fail = w <= 0;
int async_usb_gadget::inner_write_bulk(int fd, data_source_ptr data, dyn_mem_ptr mem, uint32_t bulk_size)
{
int ret = 0;
uint32_t total = data->get_rest();
statu_in_ = WORKER_STATUS_BUSY;
want_bytes_in_ = total;
if(data->is_memory_block())
{
ret = inner_write_bulk_memory(fd, data->ptr(), &total, bulk_size);
}
else
{
dyn_mem_ptr buf = dyn_mem::memory(bulk_size);
uint32_t len = bulk_size;
if(err)
*err = 0;
total = 0;
while((w = data->fetch_data(buf->ptr(), &len)) == 0)
while(data->fetch_data(mem->ptr(), &len) == 0)
{
ptr = buf->ptr();
off = 0;
w = write(fd, ptr + off, len);
while(w > 0 && w + off < len)
{
want_bytes_in_ -= w;
total += w;
off += w;
w = write(fd, ptr + off, len - off);
}
if(w <= 0)
{
fail = true;
if(err)
*err = errno;
break;
}
else
{
want_bytes_in_ -= w;
total += w;
}
if(data->get_rest() == 0)
ret = inner_write_bulk_memory(fd, mem->ptr(), &len, bulk_size);
if(ret || data->get_rest() == 0)
break;
total += len;
len = bulk_size;
}
buf->release();
off = total;
}
statu_in_ = fail ? WORKER_STATUS_ERROR : WORKER_STATUS_IDLE;
want_bytes_in_ = 0;
return off;
statu_in_ = ret ? WORKER_STATUS_ERROR : WORKER_STATUS_IDLE;
want_bytes_in_ -= total;
return ret;
}
dyn_mem_ptr async_usb_gadget::handle_bulk_command(dyn_mem_ptr data, uint32_t* used, packet_data_base_ptr* pkd)
{
@ -478,6 +494,16 @@ void async_usb_gadget::thread_read_bulk(int fd)
dyn_mem_ptr buf = get_io_buffer();
int l = 0;
if(!buf)
{
#ifdef MEM_POOL
utils::to_log(LOG_LEVEL_FATAL, "NO IO buffer got at position %u!\n", io_buf_->take_pos());
#else
utils::to_log(LOG_LEVEL_FATAL, "NO IO buffer got while %u elements in queue!\n", io_buf_.size());
#endif
continue;
}
statu_out_ = WORKER_STATUS_BUSY;
buf->set_session_id(session_id_);
l = read(fd, buf->ptr(), bulk_size);
@ -487,7 +513,7 @@ void async_usb_gadget::thread_read_bulk(int fd)
free_io_buffer(buf);
if(errno)
{
utils::to_log(LOG_LEVEL_ALL, "read bulk(%d) failed: %d(%s)\n", l, errno, strerror(errno));
utils::to_log(LOG_LEVEL_FATAL, "read bulk(%d) failed: %d(%s)\n", l, errno, strerror(errno));
statu_out_ = WORKER_STATUS_ERROR;
break;
}
@ -512,6 +538,9 @@ void async_usb_gadget::thread_read_bulk(int fd)
}
void async_usb_gadget::thread_write_bulk(int fd)
{
uint32_t bulk_size = unit_in_ * get_buf_coefficient();
dyn_mem_ptr mem = dyn_mem::memory(bulk_size);
statu_in_ = WORKER_STATUS_IDLE;
while(run_)
{
@ -521,15 +550,24 @@ void async_usb_gadget::thread_write_bulk(int fd)
if(sent_que_.take(data, true))
{
want_bytes_in_ = data->get_rest();
inner_write_bulk(fd, data, &err);
err = inner_write_bulk(fd, data, mem, bulk_size);
{
file_reader_ptr fr = dynamic_cast<file_reader_ptr>(data);
if(fr)
{
utils::to_log(LOG_LEVEL_DEBUG, "Sent file(%s) with error: %d.\n", fr->path_file(), err);
}
}
data->release();
if(err)
{
utils::to_log(LOG_LEVEL_ALL, "write bulk failed: %d(%s)\n", errno, strerror(errno));
utils::to_log(LOG_LEVEL_FATAL, "write bulk failed: %d(%s)\n", errno, strerror(errno));
break;
}
}
}
mem->release();
statu_in_ = WORKER_STATUS_NOT_START;
}
void async_usb_gadget::thread_pump_task(void)
@ -644,7 +682,7 @@ void async_usb_gadget::thread_pump_task(void)
BASE_PACKET_REPLY(*pack, dh->get_packet_command() + 1, dh->get_packet_id(), err);
reply->set_len(sizeof(PACK_BASE));
utils::to_log(LOG_LEVEL_ALL, "Finished receiving large data with error(Max queue size is %u): %d, total size = 0x%x\n", max_que, err, total_size);
utils::to_log(LOG_LEVEL_ALL, "Received large data(%p) with error(Max queue size is %u): %d.\n", total_size, max_que, err);
dh->release();
dh = nullptr;
@ -732,6 +770,11 @@ int async_usb_gadget::stop(void)
{
run_ = false;
#ifdef MEM_POOL
io_buf_->stop();
#else
io_buf_.trigger();
#endif
wait_in_.trigger();
wait_out_.trigger();
cmd_que_.trigger();

View File

@ -35,7 +35,7 @@
// return value of all routines is the reply packet, nullptr if the packet need not reply
//
#define FUNCTION_PROTO_UNHANDLED_EP0 dyn_mem_ptr(struct usb_functionfs_event*)
#define MEM_POOL
class usb_device;
@ -69,7 +69,11 @@ class async_usb_gadget : public refer
uint32_t task_cmd_ = 0;
uint32_t task_packet_id_ = 0;
#ifdef MEM_POOL
dyn_mem_pool *io_buf_ = nullptr;
#else
safe_fifo<dyn_mem_ptr> io_buf_;
#endif
safe_fifo<dyn_mem_ptr> cmd_que_;
safe_fifo<data_source_ptr> sent_que_;
@ -95,7 +99,8 @@ class async_usb_gadget : public refer
dyn_mem_ptr handle_ctrl_message(dyn_mem_ptr data);
dyn_mem_ptr handle_ctrl_setup(dyn_mem_ptr data); // user command ...
int inner_write_bulk(int fd, data_source_ptr data, int* err = nullptr);
int inner_write_bulk_memory(int fd, uint8_t* buf, uint32_t* len/*in - to sent; out - real sent*/, uint32_t bulk_size); // return error code
int inner_write_bulk(int fd, data_source_ptr data, dyn_mem_ptr mem/*to load data in if data was not memory*/, uint32_t bulk_size); // return error
dyn_mem_ptr handle_bulk_command(dyn_mem_ptr data, uint32_t* used, packet_data_base_ptr* pkd);
void thread_read_ep0(void);