From 32f830ee944661e74e6c32bc7a853fc58668f412 Mon Sep 17 00:00:00 2001 From: gb <741021719@qq.com> Date: Wed, 13 Dec 2023 14:57:08 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=8B=E6=90=93=E5=86=85=E5=AD=98=E6=B1=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scanner/async_scanner.cpp | 21 +++-- sdk/base/data.cpp | 107 +++++++++++++++++++++++- sdk/base/data.h | 30 ++++++- sdk/base/packet.h | 1 + usb/usb_io.cpp | 167 ++++++++++++++++++++++++-------------- usb/usb_io.h | 9 +- 6 files changed, 260 insertions(+), 75 deletions(-) diff --git a/scanner/async_scanner.cpp b/scanner/async_scanner.cpp index 515ce69..094e6b6 100644 --- a/scanner/async_scanner.cpp +++ b/scanner/async_scanner.cpp @@ -27,6 +27,8 @@ async_scanner::async_scanner() : usb_(nullptr), cfg_mgr_(nullptr), scan_id_(0) { utils::init_log(LOG_TYPE_FILE); + utils::to_log(LOG_LEVEL_DEBUG, "System info: page-size = %u, mapping-page-size = %u, disk-cluster-size = %u.\n" + , sys_info::page_size, sys_info::page_map_size, sys_info::cluster_size); init(); auto bulk_handle = [&](dyn_mem_ptr data, uint32_t* used, packet_data_base_ptr* required) -> dyn_mem_ptr @@ -277,7 +279,7 @@ dyn_mem_ptr async_scanner::handle_file_receive(LPPACK_BASE pack, uint32_t* used, { saver->set_packet_param(pack->cmd, pack->pack_id); } - utils::to_log(LOG_LEVEL_DEBUG, "receive file (%u bytes): %s = %d\n", pfi->size, path.c_str(), err); + utils::to_log(LOG_LEVEL_DEBUG, "Receiving file(%p bytes): %s = %d\n", pfi->size, path.c_str(), err); *required = dynamic_cast(saver); } @@ -353,11 +355,20 @@ dyn_mem_ptr async_scanner::handle_file_send_roger(LPPACK_BASE pack, uint32_t* us } } } - *used = base_head_size; - *required = dynamic_cast(reader); - // if reader was nullptr, notify failed by INT or control ? - utils::to_log(LOG_LEVEL_DEBUG, "File Send beginning (%p) ...\n", reader); + *used = base_head_size; + if(pack->data) + { + utils::to_log(LOG_LEVEL_DEBUG, "Sending file '%s' (%p) cancelled with error %d.\n", reader->path_file(), reader, pack->data); + reader->release(); + reader = nullptr; + } + else + { + // if reader was nullptr, notify failed by INT or control ? + utils::to_log(LOG_LEVEL_DEBUG, "Sending file '%s' (%p) ...\n", reader->path_file(), reader); + } + *required = dynamic_cast(reader); } return reply; diff --git a/sdk/base/data.cpp b/sdk/base/data.cpp index c33f69c..3201ece 100644 --- a/sdk/base/data.cpp +++ b/sdk/base/data.cpp @@ -172,7 +172,7 @@ file_saver::file_saver(void) : size_(0), wrote_(0), path_(""), check_(""), dst_( {} file_saver::~file_saver() { - utils::to_log(LOG_LEVEL_DEBUG, "Write file(%s) over(%ld/%ld).\n", path_.c_str(), wrote_, size_); + utils::to_log(LOG_LEVEL_DEBUG, "Wrote over of file(%s) at(%llu/%llu).\n", path_.c_str(), wrote_, size_); close(); } @@ -199,6 +199,10 @@ int file_saver::set_verify_data(const char* data, size_t len) return 0; } +const char* file_saver::path_file(void) +{ + return path_.c_str(); +} int file_saver::open(const char* path, uint64_t size, bool in_mem, size_t off) { int err = 0; @@ -480,7 +484,7 @@ file_reader::~file_reader() if (map_) map_->release(); notify_progress(len_, len_, 0); // ensure 100% - utils::to_log(LOG_LEVEL_DEBUG, "Read file(%s) over(%ld/%ld).\n", path_.c_str(), consume_, len_); + utils::to_log(LOG_LEVEL_DEBUG, "Read over of file(%s) at(%p/%p).\n", path_.c_str(), consume_, len_); } int file_reader::open(const char* file, bool in_mem, size_t off) @@ -569,6 +573,10 @@ FILE* file_reader::detach(void) return ret; } +const char* file_reader::path_file(void) +{ + return path_.c_str(); +} bool file_reader::is_memory_block(void) { @@ -718,7 +726,6 @@ int file_map::open(const char* file, uint64_t size, bool readonly) return err; } map_ = (HANDLE)::open(file, O_RDWR, 0666); - utils::to_log(LOG_LEVEL_DEBUG, "FileMapping: open('%s', O_APPEND, 0666) = %p\n", file, map_); } if (map_ == INVALID_HANDLE_VALUE) { @@ -800,7 +807,7 @@ uint8_t* file_map::map(uint64_t off, uint32_t* size) #endif if (!buf_) { - utils::to_log(LOG_LEVEL_WARNING, "FileMapping: request map(%p + %u), real map(%p + %u) failed: %d\n" + utils::to_log(LOG_LEVEL_WARNING, "FileMapping: request map(%llu + %u), real map(%llu + %u) failed: %d\n" , off, *size, map_off_, map_size_, GetLastError()); *size = 0; } @@ -814,3 +821,95 @@ uint8_t* file_map::buffer(void) { return buf_ ? buf_ + off_ : nullptr; } + + + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// +dyn_mem_pool::dyn_mem_pool(uint32_t cnt, uint32_t unit) : count_(cnt), unit_(unit) +{ + pool_ = (dyn_mem_ptr*)malloc(cnt * sizeof(dyn_mem_ptr)); + for(uint32_t i = 0; i < cnt; ++i) + { + pool_[i] = dyn_mem::memory(unit); + } +} +dyn_mem_pool::~dyn_mem_pool() +{ + if(pool_) + { + for(uint32_t i = 0; i < count_; ++i) + { + if(pool_[i]) + { + pool_[i]->release(); + } + } + free(pool_); + } + + pool_ = nullptr; +} + +dyn_mem_ptr dyn_mem_pool::take(void) +{ + dyn_mem_ptr buf = nullptr; + + if(!pool_[rpos_]) + { + chronograph watch; + do + { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } while (run_ && !pool_[rpos_]); + utils::to_log(LOG_LEVEL_DEBUG, "Waiting for taking memory pool took %ums at %u.\n", watch.elapse_ms(), rpos_); + } + if(pool_[rpos_]) + { + buf = pool_[rpos_]; + pool_[rpos_++] = nullptr; + if(rpos_ >= count_) + rpos_ = 0; + } + + return buf; +} +void dyn_mem_pool::put(dyn_mem_ptr buf) +{ + if(pool_[wpos_]) + { + chronograph watch; + do + { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } while (run_ && pool_[wpos_]); + utils::to_log(LOG_LEVEL_DEBUG, "Waiting for putting memory pool took %ums at %u.\n", watch.elapse_ms(), wpos_); + } + + if(pool_[wpos_]) + { + buf->release(); + } + else + { + pool_[wpos_++] = buf; + if(wpos_ >= count_) + wpos_ = 0; + } +} +void dyn_mem_pool::stop(void) +{ + run_ = false; +} +uint32_t dyn_mem_pool::count(void) +{ + return count_; +} +uint32_t dyn_mem_pool::unit(void) +{ + return unit_; +} +uint32_t dyn_mem_pool::take_pos(void) +{ + return rpos_; +} \ No newline at end of file diff --git a/sdk/base/data.h b/sdk/base/data.h index ffd8dc9..c717d42 100644 --- a/sdk/base/data.h +++ b/sdk/base/data.h @@ -25,8 +25,8 @@ class packet_data_base : public refer void* user_data_; protected: - uint32_t pack_cmd_; - uint32_t pack_id_; + uint32_t pack_cmd_ = 0; + uint32_t pack_id_ = 0; uint32_t session_id_ = -1; public: @@ -167,6 +167,7 @@ protected: public: int set_verify_data(const char* data, size_t len); int open(const char* path, uint64_t size, bool in_mem = false, size_t off = 0); + const char* path_file(void); public: virtual int put_data(const void* data, uint32_t* size/*[in] - total bytes of data; [out] - used bytes*/) override; @@ -264,6 +265,7 @@ public: int open(const char* file, bool in_mem, size_t off = 0); int attach(FILE* f); FILE* detach(void); + const char* path_file(void); public: virtual bool is_memory_block(void) override; @@ -285,6 +287,30 @@ CLS_PTR(dyn_mem); CLS_PTR(file_reader); +class dyn_mem_pool : public refer +{ + volatile bool run_ = true; + dyn_mem_ptr *pool_ = nullptr; + uint32_t count_ = 0; + uint32_t unit_ = 0; + uint32_t wpos_ = 0; + uint32_t rpos_ = 0; + +public: + dyn_mem_pool(uint32_t cnt, uint32_t unit); + +protected: + virtual ~dyn_mem_pool(); + +public: + dyn_mem_ptr take(void); + void put(dyn_mem_ptr buf); + void stop(void); + uint32_t count(void); + uint32_t unit(void); + uint32_t take_pos(void); +}; + // callback proto // // parameters: usb_functionfs_event* - the function event ptr diff --git a/sdk/base/packet.h b/sdk/base/packet.h index b62667d..36b38b4 100644 --- a/sdk/base/packet.h +++ b/sdk/base/packet.h @@ -58,6 +58,7 @@ enum woker_status WORKER_STATUS_BUSY, // in working WORKER_STATUS_ERROR, // error occurs WORKER_STATUS_RESET, // in reset(close and reopen) process + WORKER_STATUS_WAIT_RESOURCE, // wait resource }; enum packet_cmd diff --git a/usb/usb_io.cpp b/usb/usb_io.cpp index 6cf4a69..126ea3f 100644 --- a/usb/usb_io.cpp +++ b/usb/usb_io.cpp @@ -18,7 +18,12 @@ async_usb_gadget::async_usb_gadget(std::function , std::function dev_conn) : handle_cmd_(cmd_handler), dev_connect_(dev_conn) , enc_head_(ENCRYPT_CMD_NONE), enc_payload_(ENCRYPT_NONE), enc_data_(0) - , io_buf_("IO-buf"), cmd_que_("in-queue"), sent_que_("out-queue") +#ifdef MEM_POOL + , io_buf_(nullptr) +#else + , io_buf_("IO-buf") +#endif + , cmd_que_("in-queue"), sent_que_("out-queue") , wait_in_("wait_usb_enable_in"), wait_out_("wait_usb_enable_out") { cmd_que_.enable_wait_log(false); @@ -90,6 +95,10 @@ async_usb_gadget::async_usb_gadget(std::function unit_in_ = unit_out_ = sys_info::page_size; // dev_->get_max_packet(); // allocate 10MB for IO +#ifdef MEM_POOL + io_buf_ = new dyn_mem_pool(256, unit_out_); + utils::to_log(LOG_LEVEL_DEBUG, "Prepare %u(%u * %u) for IO.\n", io_buf_->count() * unit_out_, io_buf_->count(), unit_in_); +#else for(int i = 0; i < SIZE_MB(10) / unit_out_; ++i) { dyn_mem_ptr buf = dyn_mem::memory(unit_out_); @@ -97,6 +106,7 @@ async_usb_gadget::async_usb_gadget(std::function io_buf_.save(buf); } utils::to_log(LOG_LEVEL_DEBUG, "Prepare %u(%u * %u) for IO.\n", io_buf_.size() * unit_out_, io_buf_.size(), unit_in_); +#endif threads_.start(task, "thread_pump_task", (void*)&async_usb_gadget::thread_pump_task); threads_.start(bulkw, "thread_write_bulk", (void*)&async_usb_gadget::thread_write_bulk); @@ -108,26 +118,38 @@ async_usb_gadget::~async_usb_gadget() { stop(); +#ifdef MEM_POOL + io_buf_->release(); +#else dyn_mem_ptr buf = nullptr; while(io_buf_.take(buf, false)) { if(buf) buf->release(); } +#endif } dyn_mem_ptr async_usb_gadget::get_io_buffer(void) { dyn_mem_ptr buf = nullptr; +#ifdef MEM_POOL + buf = io_buf_->take(); +#else io_buf_.take(buf, true); +#endif return buf; } void async_usb_gadget::free_io_buffer(dyn_mem_ptr buf) { buf->clear_data(); +#ifdef MEM_POOL + io_buf_->put(buf); +#else io_buf_.save(buf, true); +#endif } const char* async_usb_gadget::ep0_status_desc(int ep0_status, char* unk_buf/*>= 20 bytes*/) @@ -305,80 +327,74 @@ dyn_mem_ptr async_usb_gadget::handle_ctrl_setup(dyn_mem_ptr data) return reply; } -int async_usb_gadget::inner_write_bulk(int fd, data_source_ptr data, int* err) +int async_usb_gadget::inner_write_bulk_memory(int fd, uint8_t* buf, uint32_t* len, uint32_t bulk_size) { - unsigned char* ptr = data->ptr(); - size_t bulk_size = unit_in_ * get_buf_coefficient(), - total = data->get_rest(), off = 0, size = total > bulk_size ? bulk_size : total; - int w = 0; - bool fail = false; + int w = 0, to = 0, err = 0, total = *len, off = 0, + size = *len; + + while(1) + { + w = write(fd, buf + off, size); + if(w == -1) + { + if(errno == ETIMEDOUT) + { + if (to++ > 3) + { + utils::to_log(LOG_LEVEL_DEBUG, "Write bulk failed at (%u/%u) for err: %d\n", total, *len, errno); + break; + } + utils::to_log(LOG_LEVEL_DEBUG, "Write bulk timeout at (%u/%u), try again after %ums ...\n", off, total, to * 100); + std::this_thread::sleep_for(std::chrono::milliseconds(to * 100)); + continue; + } + err = errno; + break; + } + + to = 0; + off += w; + want_bytes_in_ -= w; + if(off >= total) + break; + if(off + bulk_size < total) + size = bulk_size; + else + size = total - off; + } + *len = off; + + return err; +} +int async_usb_gadget::inner_write_bulk(int fd, data_source_ptr data, dyn_mem_ptr mem, uint32_t bulk_size) +{ + int ret = 0; + uint32_t total = data->get_rest(); statu_in_ = WORKER_STATUS_BUSY; want_bytes_in_ = total; if(data->is_memory_block()) { - while((w = write(fd, ptr + off, size)) > 0) - { - off += w; - want_bytes_in_ -= w; - if(off >= total) - break; - if(off + bulk_size < total) - size = bulk_size; - else - size = total - off; - } - if(w <= 0 && err) - { - *err = errno; - } - fail = w <= 0; + ret = inner_write_bulk_memory(fd, data->ptr(), &total, bulk_size); } else { - dyn_mem_ptr buf = dyn_mem::memory(bulk_size); uint32_t len = bulk_size; - if(err) - *err = 0; - total = 0; - while((w = data->fetch_data(buf->ptr(), &len)) == 0) + while(data->fetch_data(mem->ptr(), &len) == 0) { - ptr = buf->ptr(); - off = 0; - w = write(fd, ptr + off, len); - while(w > 0 && w + off < len) - { - want_bytes_in_ -= w; - total += w; - off += w; - w = write(fd, ptr + off, len - off); - } - - if(w <= 0) - { - fail = true; - if(err) - *err = errno; - break; - } - else - { - want_bytes_in_ -= w; - total += w; - } - - if(data->get_rest() == 0) + ret = inner_write_bulk_memory(fd, mem->ptr(), &len, bulk_size); + if(ret || data->get_rest() == 0) break; + total += len; len = bulk_size; } - buf->release(); - off = total; } - statu_in_ = fail ? WORKER_STATUS_ERROR : WORKER_STATUS_IDLE; - want_bytes_in_ = 0; - return off; + statu_in_ = ret ? WORKER_STATUS_ERROR : WORKER_STATUS_IDLE; + want_bytes_in_ -= total; + + return ret; } dyn_mem_ptr async_usb_gadget::handle_bulk_command(dyn_mem_ptr data, uint32_t* used, packet_data_base_ptr* pkd) { @@ -478,6 +494,16 @@ void async_usb_gadget::thread_read_bulk(int fd) dyn_mem_ptr buf = get_io_buffer(); int l = 0; + if(!buf) + { +#ifdef MEM_POOL + utils::to_log(LOG_LEVEL_FATAL, "NO IO buffer got at position %u!\n", io_buf_->take_pos()); +#else + utils::to_log(LOG_LEVEL_FATAL, "NO IO buffer got while %u elements in queue!\n", io_buf_.size()); +#endif + continue; + } + statu_out_ = WORKER_STATUS_BUSY; buf->set_session_id(session_id_); l = read(fd, buf->ptr(), bulk_size); @@ -487,7 +513,7 @@ void async_usb_gadget::thread_read_bulk(int fd) free_io_buffer(buf); if(errno) { - utils::to_log(LOG_LEVEL_ALL, "read bulk(%d) failed: %d(%s)\n", l, errno, strerror(errno)); + utils::to_log(LOG_LEVEL_FATAL, "read bulk(%d) failed: %d(%s)\n", l, errno, strerror(errno)); statu_out_ = WORKER_STATUS_ERROR; break; } @@ -512,6 +538,9 @@ void async_usb_gadget::thread_read_bulk(int fd) } void async_usb_gadget::thread_write_bulk(int fd) { + uint32_t bulk_size = unit_in_ * get_buf_coefficient(); + dyn_mem_ptr mem = dyn_mem::memory(bulk_size); + statu_in_ = WORKER_STATUS_IDLE; while(run_) { @@ -521,15 +550,24 @@ void async_usb_gadget::thread_write_bulk(int fd) if(sent_que_.take(data, true)) { want_bytes_in_ = data->get_rest(); - inner_write_bulk(fd, data, &err); + err = inner_write_bulk(fd, data, mem, bulk_size); + { + file_reader_ptr fr = dynamic_cast(data); + if(fr) + { + utils::to_log(LOG_LEVEL_DEBUG, "Sent file(%s) with error: %d.\n", fr->path_file(), err); + } + } data->release(); if(err) { - utils::to_log(LOG_LEVEL_ALL, "write bulk failed: %d(%s)\n", errno, strerror(errno)); + utils::to_log(LOG_LEVEL_FATAL, "write bulk failed: %d(%s)\n", errno, strerror(errno)); break; } } } + mem->release(); + statu_in_ = WORKER_STATUS_NOT_START; } void async_usb_gadget::thread_pump_task(void) @@ -644,7 +682,7 @@ void async_usb_gadget::thread_pump_task(void) BASE_PACKET_REPLY(*pack, dh->get_packet_command() + 1, dh->get_packet_id(), err); reply->set_len(sizeof(PACK_BASE)); - utils::to_log(LOG_LEVEL_ALL, "Finished receiving large data with error(Max queue size is %u): %d, total size = 0x%x\n", max_que, err, total_size); + utils::to_log(LOG_LEVEL_ALL, "Received large data(%p) with error(Max queue size is %u): %d.\n", total_size, max_que, err); dh->release(); dh = nullptr; @@ -732,6 +770,11 @@ int async_usb_gadget::stop(void) { run_ = false; +#ifdef MEM_POOL + io_buf_->stop(); +#else + io_buf_.trigger(); +#endif wait_in_.trigger(); wait_out_.trigger(); cmd_que_.trigger(); diff --git a/usb/usb_io.h b/usb/usb_io.h index 9251ba3..7ff2f30 100644 --- a/usb/usb_io.h +++ b/usb/usb_io.h @@ -35,7 +35,7 @@ // return value of all routines is the reply packet, nullptr if the packet need not reply // #define FUNCTION_PROTO_UNHANDLED_EP0 dyn_mem_ptr(struct usb_functionfs_event*) - +#define MEM_POOL class usb_device; @@ -69,7 +69,11 @@ class async_usb_gadget : public refer uint32_t task_cmd_ = 0; uint32_t task_packet_id_ = 0; +#ifdef MEM_POOL + dyn_mem_pool *io_buf_ = nullptr; +#else safe_fifo io_buf_; +#endif safe_fifo cmd_que_; safe_fifo sent_que_; @@ -95,7 +99,8 @@ class async_usb_gadget : public refer dyn_mem_ptr handle_ctrl_message(dyn_mem_ptr data); dyn_mem_ptr handle_ctrl_setup(dyn_mem_ptr data); // user command ... - int inner_write_bulk(int fd, data_source_ptr data, int* err = nullptr); + int inner_write_bulk_memory(int fd, uint8_t* buf, uint32_t* len/*in - to sent; out - real sent*/, uint32_t bulk_size); // return error code + int inner_write_bulk(int fd, data_source_ptr data, dyn_mem_ptr mem/*to load data in if data was not memory*/, uint32_t bulk_size); // return error dyn_mem_ptr handle_bulk_command(dyn_mem_ptr data, uint32_t* used, packet_data_base_ptr* pkd); void thread_read_ep0(void);