添加CancelIO协议

This commit is contained in:
gb 2023-12-14 14:25:06 +08:00
parent 955584c894
commit a364fd68a0
10 changed files with 357 additions and 170 deletions

View File

@ -407,9 +407,9 @@ int hg_scanner::file_transfer(const char* local, const char* remote, bool to_rem
tx_prg_ = now; tx_prg_ = now;
status_ = err; status_ = err;
if (err) if (err)
utils::to_log(LOG_LEVEL_WARNING, "File transfer error: %d (at %ld/%ld)\n", err, txed, total); utils::to_log(LOG_LEVEL_WARNING, "File transfer error: %d (at %llu/%llu)\n", err, txed, total);
else if (txed >= total) else if (txed >= total)
utils::to_log(LOG_LEVEL_DEBUG, "File transfer finished(%ld/%ld) with error %d\n", txed, total, err); utils::to_log(LOG_LEVEL_DEBUG, "File transfer finished(%llu/%llu) with error %d\n", txed, total, err);
return 0; return 0;

View File

@ -10,7 +10,7 @@
async_usb_host::async_usb_host(std::function<FUNCTION_PROTO_COMMAND_HANDLE> cmd_handler) async_usb_host::async_usb_host(std::function<FUNCTION_PROTO_COMMAND_HANDLE> cmd_handler)
: handler_(cmd_handler), usb_dev_(nullptr), usb_handle_(nullptr), run_(true), cancel_write_(false), writing_(false) : handler_(cmd_handler), usb_dev_(nullptr), usb_handle_(nullptr), run_(true), cancel_write_(false), writing_(false)
, head_enc_type_(ENCRYPT_CMD_NONE), payload_enc_type_(ENCRYPT_NONE), enc_data_(0), buf_coef_(1) , head_enc_type_(ENCRYPT_CMD_NONE), payload_enc_type_(ENCRYPT_NONE), enc_data_(0), buf_coef_(1)
, in_que_("usb-r"), out_que_("usb-w") , in_que_("usb-r"), out_que_("usb-w"), io_buf_("IO-buf")
{ {
in_que_.enable_wait_log(false); in_que_.enable_wait_log(false);
out_que_.enable_wait_log(false); out_que_.enable_wait_log(false);
@ -164,6 +164,17 @@ int async_usb_host::start(libusb_device* dev)
libusb_ref_device(dev); libusb_ref_device(dev);
usb_dev_ = dev; usb_dev_ = dev;
memset(&peer_cfg_, 0, sizeof(peer_cfg_));
if (get_peer_config(&peer_cfg_))
peer_cfg_.io_size = bulk_out_.max_packet;
utils::to_log(LOG_LEVEL_DEBUG, "IO size: %u\n", peer_cfg_.io_size);
for (int i = 0; i < SIZE_MB(10) / peer_cfg_.io_size; ++i)
{
dyn_mem_ptr buf = dyn_mem::memory(peer_cfg_.io_size);
if (buf)
io_buf_.save(buf);
}
create_worker_threads(); create_worker_threads();
return ret; return ret;
@ -204,6 +215,13 @@ int async_usb_host::stop(void)
memset(&bulk_out_, -1, sizeof(bulk_out_)); memset(&bulk_out_, -1, sizeof(bulk_out_));
bulk_in_.claimed = bulk_out_.claimed = 0; bulk_in_.claimed = bulk_out_.claimed = 0;
while (io_buf_.take(data))
{
if (data)
data->release();
}
io_buf_.clear();
return 0; return 0;
} }
@ -220,12 +238,25 @@ uint8_t& async_usb_host::encrypt_data(void)
return enc_data_; return enc_data_;
} }
dyn_mem_ptr async_usb_host::get_io_buffer(void)
{
dyn_mem_ptr buf = nullptr;
io_buf_.take(buf, true);
return buf;
}
void async_usb_host::free_io_buffer(dyn_mem_ptr buf)
{
buf->clear_data();
io_buf_.save(buf, true);
}
void async_usb_host::thread_read_bulk(void) void async_usb_host::thread_read_bulk(void)
{ {
size_t buf_size = buf_coef_ * bulk_in_.max_packet; size_t buf_size = buf_coef_ * /*bulk_in_.max_packet*/peer_cfg_.io_size;
dyn_mem_ptr mem = dyn_mem::memory(buf_size); dyn_mem_ptr mem = get_io_buffer();
utils::to_log(LOG_LEVEL_ALL, "thread_read_bulk working ...\r\n");
while (run_) while (run_)
{ {
int r = 0, int r = 0,
@ -247,17 +278,17 @@ void async_usb_host::thread_read_bulk(void)
mem->set_len(r); mem->set_len(r);
in_que_.save(mem, true); in_que_.save(mem, true);
buf_size = buf_coef_ * bulk_in_.max_packet; mem = get_io_buffer();
mem = dyn_mem::memory(buf_size);
} }
if (mem) if (mem)
mem->release(); free_io_buffer(mem);
utils::to_log(LOG_LEVEL_ALL, "thread_read_bulk exited.\r\n");
} }
void async_usb_host::thread_write_bulk(void) void async_usb_host::thread_write_bulk(void)
{ {
utils::to_log(LOG_LEVEL_ALL, "thread_write_bulk working ...\r\n"); int bulk_size = peer_cfg_.io_size * buf_coef_;
dyn_mem_ptr mem = dyn_mem::memory(bulk_size);
while (run_) while (run_)
{ {
data_source_ptr data = nullptr; data_source_ptr data = nullptr;
@ -266,7 +297,7 @@ void async_usb_host::thread_write_bulk(void)
int err = 0; int err = 0;
if(!cancel_write_) if(!cancel_write_)
inner_write_bulk(data, &err); err = inner_write_bulk(data, mem, bulk_size);
data->release(); data->release();
if (err && err != ECANCELED) if (err && err != ECANCELED)
{ {
@ -276,7 +307,7 @@ void async_usb_host::thread_write_bulk(void)
} }
} }
} }
utils::to_log(LOG_LEVEL_ALL, "thread_write_bulk exited.\r\n"); mem->release();
} }
void async_usb_host::thread_pump_task(void) void async_usb_host::thread_pump_task(void)
{ {
@ -286,16 +317,18 @@ void async_usb_host::thread_pump_task(void)
data_holder* dh = nullptr; data_holder* dh = nullptr;
LPPACK_BASE pack = nullptr; LPPACK_BASE pack = nullptr;
utils::to_log(LOG_LEVEL_ALL, "thread_pump_task working ...\r\n");
while (run_) while (run_)
{ {
bool pool = true;
data = nullptr; data = nullptr;
if (in_que_.take(data, true) && data) if (in_que_.take(data, true) && data)
{ {
if (prev) if (prev)
{ {
*prev += *data; *prev += *data;
data->release(); free_io_buffer(data);
pool = false;
data = prev; data = prev;
prev = nullptr; prev = nullptr;
} }
@ -369,9 +402,25 @@ void async_usb_host::thread_pump_task(void)
} while (used && data->get_rest()); } while (used && data->get_rest());
if (data->get_rest()) if (data->get_rest())
prev = data; {
if (pool)
{
prev = dyn_mem::memory(peer_cfg_.io_size);
prev->put(data->ptr(), data->get_rest());
free_io_buffer(data);
}
else
{
prev = data;
}
}
else else
data->release(); {
if (pool)
free_io_buffer(data);
else
data->release();
}
} }
} }
if (prev) if (prev)
@ -382,7 +431,6 @@ void async_usb_host::thread_pump_task(void)
reply->release(); reply->release();
if (dh) if (dh)
dh->release(); dh->release();
utils::to_log(LOG_LEVEL_ALL, "thread_pump_task exited.\r\n");
} }
void async_usb_host::create_worker_threads(void) void async_usb_host::create_worker_threads(void)
{ {
@ -399,9 +447,9 @@ void async_usb_host::create_worker_threads(void)
{ {
thread_pump_task(); thread_pump_task();
}; };
thread_w_.start(thread_w, "async_usb_host::thread_write_bulk"); worker_.start(thread_p, "async_usb_host::thread_pump_task");
thread_r_.start(thread_r, "async_usb_host::thread_read_bulk"); worker_.start(thread_w, "async_usb_host::thread_write_bulk");
thread_p_.start(thread_p, "async_usb_host::thread_pump_task"); worker_.start(thread_r, "async_usb_host::thread_read_bulk");
#else #else
thread_w_.reset(new std::thread(&async_usb_host::thread_write_bulk, this)); thread_w_.reset(new std::thread(&async_usb_host::thread_write_bulk, this));
thread_r_.reset(new std::thread(&async_usb_host::thread_read_bulk, this)); thread_r_.reset(new std::thread(&async_usb_host::thread_read_bulk, this));
@ -417,6 +465,10 @@ void async_usb_host::stop_worker_threads(void)
out_que_.trigger(); out_que_.trigger();
in_que_.trigger(); in_que_.trigger();
worker_.stop("async_usb_host::thread_write_bulk");
worker_.stop("async_usb_host::thread_read_bulk");
worker_.stop("async_usb_host::thread_pump_task");
#ifndef USE_SAFE_THREAD #ifndef USE_SAFE_THREAD
WAIT_THREAD(thread_w_); WAIT_THREAD(thread_w_);
WAIT_THREAD(thread_r_); WAIT_THREAD(thread_r_);
@ -424,103 +476,80 @@ void async_usb_host::stop_worker_threads(void)
#endif #endif
} }
int async_usb_host::bulk_write_buf(uint8_t* buf, int* len) int async_usb_host::bulk_write_buf(uint8_t* buf, int* len, int io_size)
{ {
int bulk_size = bulk_out_.max_packet * buf_coef_, int total = 0,
total = 0, l = io_size <= *len ? io_size : *len,
l = bulk_size <= *len ? bulk_size : *len,
s = 0, s = 0,
err = 0; err = 0,
to = 0;
do while (1)
{ {
while ((err = libusb_bulk_transfer(usb_handle_, bulk_out_.port, buf, l, &s, 1000)) == 0) err = libusb_bulk_transfer(usb_handle_, bulk_out_.port, buf, l, &s, 1000);
if (err != LIBUSB_SUCCESS)
{ {
if (cancel_write_) if (err == LIBUSB_ERROR_INTERRUPTED || err == LIBUSB_ERROR_TIMEOUT)
{ {
err = ECANCELED; if (to++ > 3)
break; {
utils::to_log(LOG_LEVEL_DEBUG, "Write bulk failed at (%u/%u) for err: %d\n", total, *len, err);
break;
}
utils::to_log(LOG_LEVEL_DEBUG, "Write bulk timeout(%d) at (%u/%u), try again after %ums ...\n", err, total, *len, to * 100);
std::this_thread::sleep_for(std::chrono::milliseconds(to * 100));
continue;
} }
break;
total += s;
if (total >= *len)
break;
buf += s;
if (*len - total < bulk_size)
l = *len - total;
else
l = bulk_size;
} }
} while (err == LIBUSB_ERROR_INTERRUPTED); // should pay more attention to this error !!!
if (cancel_write_)
{
err = ECANCELED;
break;
}
total += s;
if (total >= *len)
break;
buf += s;
if (*len - total < io_size)
l = *len - total;
else
l = io_size;
}
*len = total; *len = total;
return err; return err;
} }
int async_usb_host::inner_write_bulk(data_source_ptr data, int* err) int async_usb_host::inner_write_bulk(data_source_ptr data, dyn_mem_ptr mem, int bulk_size)
{ {
unsigned char* ptr = data->ptr(); unsigned char* ptr = data->ptr();
size_t bulk_size = bulk_out_.max_packet * buf_coef_, int total = data->get_rest();
total = data->get_rest(); int err = 0, s = 0;
int e = 0, s = 0;
writing_ = true; writing_ = true;
if (data->is_memory_block()) if (data->is_memory_block())
{ {
s = total; err = bulk_write_buf(data->ptr(), &total, bulk_size);
e = bulk_write_buf(ptr, &s);
if (err)
*err = e;
total = s;
} }
else else
{ {
dyn_mem_ptr twin[] = { dyn_mem::memory(bulk_size), dyn_mem::memory(bulk_size) },
buf = twin[0];
int ind = 0;
uint32_t len = bulk_size; uint32_t len = bulk_size;
if (err) while ((err = data->fetch_data(mem->ptr(), &len)) == 0)
*err = 0;
total = 0;
while ((e = data->fetch_data(buf->ptr(), &len)) == 0)
{ {
buf->set_len(len); err = bulk_write_buf(mem->ptr(), (int*)&len, bulk_size);
if (len == 0) if (err || data->get_rest() == 0)
utils::to_log(LOG_LEVEL_WARNING, "ZERO byte content fetched!\r\n");
do
{
if (e)
utils::to_log(LOG_LEVEL_WARNING, "Write failed at + 0x%08X with error 0x%x, we try again ...\r\n", total, e);
ptr = buf->ptr();
s = len;
e = bulk_write_buf(ptr, &s);
} while (e == LIBUSB_ERROR_INTERRUPTED || e == LIBUSB_ERROR_TIMEOUT);
if (e)
{
utils::to_log(LOG_LEVEL_ALL, "Write failed at +0x%08X with error: 0x%x. (Rest: %u)\r\n", total, e, data->get_rest());
if (err)
*err = e;
break;
}
else
total += s;
if (data->get_rest() == 0)
break; break;
total += len;
len = bulk_size; len = bulk_size;
ind ^= 1;
buf = twin[ind];
} }
twin[0]->release();
twin[1]->release();
} }
writing_ = false; writing_ = false;
return total; return err;
} }
void async_usb_host::post_2_write_bulk_thread(data_source_ptr data) void async_usb_host::post_2_write_bulk_thread(data_source_ptr data)
{ {
@ -547,20 +576,24 @@ dyn_mem_ptr async_usb_host::handle_data_in(dyn_mem_ptr& data, uint32_t* used, pa
} }
} }
int async_usb_host::get_peer_protocol_version(uint16_t* ver) int async_usb_host::get_peer_config(LPPEERCFG cfg)
{ {
SIMPLE_LOCK(io_lock_); SIMPLE_LOCK(io_lock_);
uint16_t v = 0; PEERCFG v = { 0 };
int err = libusb_control_transfer(usb_handle_, LIBUSB_REQUEST_TYPE_VENDOR | LIBUSB_ENDPOINT_IN int err = libusb_control_transfer(usb_handle_, LIBUSB_REQUEST_TYPE_VENDOR | LIBUSB_ENDPOINT_IN
, USB_REQ_EP0_GET_PROTO_VER, 0, 0 , USB_REQ_EP0_GET_PEER_CONFIG, 0, 0
, (unsigned char*)&v, sizeof(v) , (unsigned char*)&v, sizeof(v)
, 1000); , 1000);
if (ver) if (cfg)
*ver = v; *cfg = v;
return err == sizeof(v) ? 0 : EFAULT; return err == sizeof(v) ? 0 : EFAULT;
} }
uint16_t async_usb_host::get_protocol_version(void)
{
return peer_cfg_.ver;
}
int async_usb_host::get_peer_status(LPEP0REPLYSTATUS status) int async_usb_host::get_peer_status(LPEP0REPLYSTATUS status)
{ {
SIMPLE_LOCK(io_lock_); SIMPLE_LOCK(io_lock_);
@ -570,27 +603,26 @@ int async_usb_host::get_peer_status(LPEP0REPLYSTATUS status)
, (unsigned char*)status, sizeof(*status) , (unsigned char*)status, sizeof(*status)
, 1000) == sizeof(*status) ? 0 : EFAULT; , 1000) == sizeof(*status) ? 0 : EFAULT;
} }
int async_usb_host::restart_peer_bulk(uint32_t timeout) int async_usb_host::reset_peer(uint32_t timeout)
{ {
EP0REPLYSTATUS status = { 0 }; EP0REPLYSTATUS status = { 0 };
chronograph tc; chronograph tc;
int err = 0;
int ok = 0, uint32_t cancel = CANCEL_IO_CANCEL;
w = 0,
err = 0;
{ {
SIMPLE_LOCK(io_lock_); SIMPLE_LOCK(io_lock_);
err = libusb_control_transfer(usb_handle_, LIBUSB_REQUEST_TYPE_VENDOR | LIBUSB_ENDPOINT_IN err = libusb_control_transfer(usb_handle_, LIBUSB_REQUEST_TYPE_VENDOR | LIBUSB_ENDPOINT_IN
, USB_REQ_EP0_RESET_BULK, 0, 0 , USB_REQ_EP0_CANCEL_IO, 0, 0
, (unsigned char*)&ok, sizeof(ok) , (unsigned char*)&cancel, sizeof(cancel)
, 1000); , 1000);
} }
tc.reset(); tc.reset();
while ((err = get_peer_status(&status)) == 0 && ok == 0) while ((err = get_peer_status(&status)) == 0)
{ {
if (status.in_status == WORKER_STATUS_IDLE) if (status.in_status == WORKER_STATUS_IDLE && status.out_status == WORKER_STATUS_BUSY && status.task_cnt == 0
&& status.task_required_bytes == 0 && status.packets_to_sent == 0)
break; break;
std::this_thread::sleep_for(std::chrono::milliseconds(5)); std::this_thread::sleep_for(std::chrono::milliseconds(5));
@ -601,27 +633,16 @@ int async_usb_host::restart_peer_bulk(uint32_t timeout)
} }
} }
return err ? err : ok; cancel = 0;
}
int async_usb_host::reset_io_buffer_size(unsigned short size)
{
SIMPLE_LOCK(io_lock_);
libusb_control_transfer(usb_handle_, LIBUSB_REQUEST_TYPE_VENDOR | LIBUSB_ENDPOINT_OUT
, USB_REQ_EP0_SET_BULK_BUFFER, 0, size
, nullptr, 0
, 1000);
{ {
buf_coef_ = size; SIMPLE_LOCK(io_lock_);
libusb_control_transfer(usb_handle_, LIBUSB_REQUEST_TYPE_VENDOR | LIBUSB_ENDPOINT_IN
return 0; , USB_REQ_EP0_CANCEL_IO, 0, 0
, (unsigned char*)&cancel, sizeof(cancel)
, 1000);
} }
return EFAULT; return err;
}
int async_usb_host::get_io_buffer_size(void)
{
return buf_coef_;
} }
int async_usb_host::set_gadget_encrypting_method(uint32_t cmd_enc, uint32_t payload_enc, uint8_t enc_data) int async_usb_host::set_gadget_encrypting_method(uint32_t cmd_enc, uint32_t payload_enc, uint8_t enc_data)
{ {

View File

@ -36,17 +36,17 @@ class async_usb_host : public refer
volatile bool cancel_write_; volatile bool cancel_write_;
volatile int buf_coef_; volatile int buf_coef_;
libusb_device_handle* usb_handle_; libusb_device_handle* usb_handle_;
libusb_device* usb_dev_; libusb_device* usb_dev_;
USBEP bulk_in_; USBEP bulk_in_;
USBEP bulk_out_; USBEP bulk_out_;
MUTEX io_lock_; MUTEX io_lock_;
PEERCFG peer_cfg_ = { 0 };
safe_fifo<dyn_mem_ptr> io_buf_;
safe_fifo<dyn_mem_ptr> in_que_; safe_fifo<dyn_mem_ptr> in_que_;
safe_fifo<data_source_ptr> out_que_; safe_fifo<data_source_ptr> out_que_;
#ifdef USE_SAFE_THREAD #ifdef USE_SAFE_THREAD
safe_thread thread_w_; safe_thread worker_;
safe_thread thread_r_;
safe_thread thread_p_;
#else #else
std::unique_ptr<std::thread> thread_w_; std::unique_ptr<std::thread> thread_w_;
std::unique_ptr<std::thread> thread_r_; std::unique_ptr<std::thread> thread_r_;
@ -57,14 +57,17 @@ class async_usb_host : public refer
uint32_t payload_enc_type_; uint32_t payload_enc_type_;
uint8_t enc_data_; uint8_t enc_data_;
dyn_mem_ptr get_io_buffer(void);
void free_io_buffer(dyn_mem_ptr buf);
void thread_read_bulk(void); void thread_read_bulk(void);
void thread_write_bulk(void); void thread_write_bulk(void);
void thread_pump_task(void); void thread_pump_task(void);
void create_worker_threads(void); void create_worker_threads(void);
void stop_worker_threads(void); void stop_worker_threads(void);
int bulk_write_buf(uint8_t* buf, int* len); // return error code int bulk_write_buf(uint8_t* buf, int* len, int io_size); // return error code
int inner_write_bulk(data_source_ptr data, int* err); int inner_write_bulk(data_source_ptr data, dyn_mem_ptr mem/*to load data in if data was not memory*/, int bulk_size); // return error code
void post_2_write_bulk_thread(data_source_ptr data); void post_2_write_bulk_thread(data_source_ptr data);
dyn_mem_ptr handle_data_in(dyn_mem_ptr& data, uint32_t* used, packet_data_base_ptr* more); dyn_mem_ptr handle_data_in(dyn_mem_ptr& data, uint32_t* used, packet_data_base_ptr* more);
@ -87,11 +90,10 @@ public:
uint8_t& encrypt_data(void); uint8_t& encrypt_data(void);
public: public:
int get_peer_protocol_version(uint16_t* ver); int get_peer_config(LPPEERCFG cfg);
uint16_t get_protocol_version(void);
int get_peer_status(LPEP0REPLYSTATUS status); int get_peer_status(LPEP0REPLYSTATUS status);
int restart_peer_bulk(uint32_t timeout = 1000/*ms*/); int reset_peer(uint32_t timeout = 2000/*ms*/);
int reset_io_buffer_size(unsigned short size);
int get_io_buffer_size(void);
int set_gadget_encrypting_method(uint32_t cmd_enc = ENCRYPT_CMD_NONE, uint32_t payload_enc = ENCRYPT_NONE, uint8_t enc_data = 0); int set_gadget_encrypting_method(uint32_t cmd_enc = ENCRYPT_CMD_NONE, uint32_t payload_enc = ENCRYPT_NONE, uint8_t enc_data = 0);
int send_heart_beat(uint32_t pack_id); int send_heart_beat(uint32_t pack_id);

View File

@ -316,10 +316,10 @@ int scanner_handler::wait_result(cmd_result* reply)
int scanner_handler::get_protocol_version(uint16_t* ver) int scanner_handler::get_protocol_version(uint16_t* ver)
{ {
if (!is_scanner_available()) if (ver)
return ENODEV; *ver = usb_->get_protocol_version();
return usb_->get_peer_protocol_version(ver); return 0;
} }
int scanner_handler::get_scanner_status(LPEP0REPLYSTATUS status) int scanner_handler::get_scanner_status(LPEP0REPLYSTATUS status)
{ {
@ -333,21 +333,7 @@ int scanner_handler::restart_peer_bulk(uint32_t timeout)
if (!is_scanner_available()) if (!is_scanner_available())
return ENODEV; return ENODEV;
return usb_->restart_peer_bulk(timeout); return usb_->reset_peer(timeout);
}
int scanner_handler::set_io_buffer_size(unsigned short size)
{
if (!is_scanner_available())
return ENODEV;
return usb_->reset_io_buffer_size(size);
}
int scanner_handler::get_io_buffer_size(void)
{
if (!is_scanner_available())
return 1;
return usb_->get_io_buffer_size();
} }
int scanner_handler::option_get_all(std::string& json_opts) int scanner_handler::option_get_all(std::string& json_opts)
@ -558,6 +544,8 @@ int scanner_handler::file_transfer(const char* local_path, const char* remote_pa
auto call = [&](cmd_result* cmd) -> int auto call = [&](cmd_result* cmd) -> int
{ {
cmd->set_timeout(3000);
return usb_->file_send(cmd->get_id(), remote_path, size, remote_off); return usb_->file_send(cmd->get_id(), remote_path, size, remote_off);
}; };
auto clean = [&](cmd_result* cmd) -> int auto clean = [&](cmd_result* cmd) -> int
@ -576,7 +564,6 @@ int scanner_handler::file_transfer(const char* local_path, const char* remote_pa
*used = sizeof(PACK_BASE); *used = sizeof(PACK_BASE);
*more = nullptr; *more = nullptr;
utils::to_log(LOG_LEVEL_DEBUG, "Send file - Roger of send file result: %d\r\n", pack->data);
if (pack->data == 0) if (pack->data == 0)
{ {
@ -600,10 +587,12 @@ int scanner_handler::file_transfer(const char* local_path, const char* remote_pa
{ {
*more = dynamic_cast<packet_data_base_ptr>(freader); *more = dynamic_cast<packet_data_base_ptr>(freader);
status_ = SCANNER_STATUS_BUSY; status_ = SCANNER_STATUS_BUSY;
utils::to_log(LOG_LEVEL_DEBUG, "Send file - beginning ...\r\n");
} }
} }
} }
else
utils::to_log(LOG_LEVEL_DEBUG, "Send file - Roger of send file result: %d\r\n", pack->data);
cmd->trigger(); cmd->trigger();
return ret; return ret;
@ -615,6 +604,8 @@ int scanner_handler::file_transfer(const char* local_path, const char* remote_pa
{ {
auto call = [&](cmd_result* cmd) -> int auto call = [&](cmd_result* cmd) -> int
{ {
cmd->set_timeout(2000);
return usb_->file_get(cmd->get_id(), remote_path, remote_off); return usb_->file_get(cmd->get_id(), remote_path, remote_off);
}; };
auto clean = [&](cmd_result* cmd) -> int auto clean = [&](cmd_result* cmd) -> int
@ -629,7 +620,6 @@ int scanner_handler::file_transfer(const char* local_path, const char* remote_pa
*used = sizeof(PACK_BASE) + pack->payload_len; *used = sizeof(PACK_BASE) + pack->payload_len;
*more = nullptr; *more = nullptr;
utils::to_log(LOG_LEVEL_DEBUG, "Receive file - Roger result: %d\r\n", pack->data);
BASE_PACKET_REPLY((*(LPPACK_BASE)reply->ptr()), PACK_CMD_FILE_READ_REQ_ROGER, pack->pack_id, -1); BASE_PACKET_REPLY((*(LPPACK_BASE)reply->ptr()), PACK_CMD_FILE_READ_REQ_ROGER, pack->pack_id, -1);
reply->set_len(sizeof(PACK_BASE)); reply->set_len(sizeof(PACK_BASE));
if (pack->data == 0) if (pack->data == 0)
@ -658,10 +648,11 @@ int scanner_handler::file_transfer(const char* local_path, const char* remote_pa
*more = dynamic_cast<packet_data_base_ptr>(fsaver); *more = dynamic_cast<packet_data_base_ptr>(fsaver);
(*(LPPACK_BASE)reply->ptr()).data = 0; (*(LPPACK_BASE)reply->ptr()).data = 0;
status_ = SCANNER_STATUS_BUSY; status_ = SCANNER_STATUS_BUSY;
utils::to_log(LOG_LEVEL_DEBUG, "Receive file - beginning ...\r\n");
} }
} }
} }
else
utils::to_log(LOG_LEVEL_DEBUG, "Receive file - Roger result: %d\r\n", pack->data);
cmd->trigger(); cmd->trigger();
return reply; return reply;
@ -811,7 +802,7 @@ int scanner_handler::reset_message_que(void)
status_ = SCANNER_STATUS_RESET_BULK; status_ = SCANNER_STATUS_RESET_BULK;
utils::to_log(LOG_LEVEL_DEBUG, "reset_message_que - send reset command ...\r\n"); utils::to_log(LOG_LEVEL_DEBUG, "reset_message_que - send reset command ...\r\n");
err = usb_->restart_peer_bulk(); err = usb_->reset_peer();
utils::to_log(LOG_LEVEL_DEBUG, "reset_message_que - send reset command = %d\r\n", err); utils::to_log(LOG_LEVEL_DEBUG, "reset_message_que - send reset command = %d\r\n", err);
if (err == 0) if (err == 0)
{ {

View File

@ -101,8 +101,6 @@ public:
int get_protocol_version(uint16_t* ver); int get_protocol_version(uint16_t* ver);
int get_scanner_status(LPEP0REPLYSTATUS status); int get_scanner_status(LPEP0REPLYSTATUS status);
int restart_peer_bulk(uint32_t timeout = 1000/*ms*/); int restart_peer_bulk(uint32_t timeout = 1000/*ms*/);
int set_io_buffer_size(unsigned short size);
int get_io_buffer_size(void);
// following methods transferred by Bulk, blocked ... // following methods transferred by Bulk, blocked ...
int option_get_all(std::string& json_opts); int option_get_all(std::string& json_opts);

View File

@ -10,6 +10,29 @@
#include <sys/mman.h> #include <sys/mman.h>
#endif #endif
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// sys_info
uint32_t sys_info::page_size = 0;
uint32_t sys_info::page_map_size = 0;
uint32_t sys_info::cluster_size = 0;
sys_info::sys_info()
{
sys_info::page_size = utils::get_page_size(&sys_info::page_map_size);
std::string path(utils::get_local_data_path());
unsigned long long cluster = 0;
utils::get_disk_space(path.c_str(), nullptr, nullptr, &cluster);
sys_info::cluster_size = cluster;
printf("Page size: %u\nMap size: %u\nCluster : %u\n", sys_info::page_size, sys_info::page_map_size, sys_info::cluster_size);
}
sys_info::~sys_info()
{}
static sys_info g_si;
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// //
packet_data_base::packet_data_base() : pack_cmd_(0), pack_id_(0) packet_data_base::packet_data_base() : pack_cmd_(0), pack_id_(0)
@ -149,7 +172,7 @@ file_saver::file_saver(void) : size_(0), wrote_(0), path_(""), check_(""), dst_(
{} {}
file_saver::~file_saver() file_saver::~file_saver()
{ {
utils::to_log(LOG_LEVEL_DEBUG, "Write file(%s) over(%ld/%ld).\n", path_.c_str(), wrote_, size_); utils::to_log(LOG_LEVEL_DEBUG, "Wrote over of file(%s) at(%llu/%llu).\n", path_.c_str(), wrote_, size_);
close(); close();
} }
@ -176,6 +199,10 @@ int file_saver::set_verify_data(const char* data, size_t len)
return 0; return 0;
} }
const char* file_saver::path_file(void)
{
return path_.c_str();
}
int file_saver::open(const char* path, uint64_t size, bool in_mem, size_t off) int file_saver::open(const char* path, uint64_t size, bool in_mem, size_t off)
{ {
int err = 0; int err = 0;
@ -406,6 +433,12 @@ dyn_mem& dyn_mem::operator+=(dyn_mem& r)
return *this; return *this;
} }
void dyn_mem::clear_data(void)
{
len_ = 0;
set_packet_param(0, 0);
set_session_id(0);
}
bool dyn_mem::is_memory_block(void) bool dyn_mem::is_memory_block(void)
{ {
@ -450,7 +483,8 @@ file_reader::~file_reader()
fclose(src_); fclose(src_);
if (map_) if (map_)
map_->release(); map_->release();
utils::to_log(LOG_LEVEL_DEBUG, "Read file(%s) over(%ld/%ld).\n", path_.c_str(), consume_, len_); notify_progress(len_, len_, 0); // ensure 100%
utils::to_log(LOG_LEVEL_DEBUG, "Read over of file(%s) at(%p/%p).\n", path_.c_str(), consume_, len_);
} }
int file_reader::open(const char* file, bool in_mem, size_t off) int file_reader::open(const char* file, bool in_mem, size_t off)
@ -539,6 +573,10 @@ FILE* file_reader::detach(void)
return ret; return ret;
} }
const char* file_reader::path_file(void)
{
return path_.c_str();
}
bool file_reader::is_memory_block(void) bool file_reader::is_memory_block(void)
{ {
@ -688,7 +726,6 @@ int file_map::open(const char* file, uint64_t size, bool readonly)
return err; return err;
} }
map_ = (HANDLE)::open(file, O_RDWR, 0666); map_ = (HANDLE)::open(file, O_RDWR, 0666);
utils::to_log(LOG_LEVEL_DEBUG, "FileMapping: open('%s', O_APPEND, 0666) = %p\n", file, map_);
} }
if (map_ == INVALID_HANDLE_VALUE) if (map_ == INVALID_HANDLE_VALUE)
{ {
@ -770,7 +807,7 @@ uint8_t* file_map::map(uint64_t off, uint32_t* size)
#endif #endif
if (!buf_) if (!buf_)
{ {
utils::to_log(LOG_LEVEL_WARNING, "FileMapping: request map(%p + %u), real map(%p + %u) failed: %d\n" utils::to_log(LOG_LEVEL_WARNING, "FileMapping: request map(%llu + %u), real map(%llu + %u) failed: %d\n"
, off, *size, map_off_, map_size_, GetLastError()); , off, *size, map_off_, map_size_, GetLastError());
*size = 0; *size = 0;
} }
@ -784,3 +821,95 @@ uint8_t* file_map::buffer(void)
{ {
return buf_ ? buf_ + off_ : nullptr; return buf_ ? buf_ + off_ : nullptr;
} }
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//
dyn_mem_pool::dyn_mem_pool(uint32_t cnt, uint32_t unit) : count_(cnt), unit_(unit)
{
pool_ = (dyn_mem_ptr*)malloc(cnt * sizeof(dyn_mem_ptr));
for(uint32_t i = 0; i < cnt; ++i)
{
pool_[i] = dyn_mem::memory(unit);
}
}
dyn_mem_pool::~dyn_mem_pool()
{
if(pool_)
{
for(uint32_t i = 0; i < count_; ++i)
{
if(pool_[i])
{
pool_[i]->release();
}
}
free(pool_);
}
pool_ = nullptr;
}
dyn_mem_ptr dyn_mem_pool::take(void)
{
dyn_mem_ptr buf = nullptr;
if(!pool_[rpos_])
{
chronograph watch;
do
{
std::this_thread::sleep_for(std::chrono::milliseconds(1));
} while (run_ && !pool_[rpos_]);
utils::to_log(LOG_LEVEL_DEBUG, "Waiting for taking memory pool took %ums at %u.\n", watch.elapse_ms(), rpos_);
}
if(pool_[rpos_])
{
buf = pool_[rpos_];
pool_[rpos_++] = nullptr;
if(rpos_ >= count_)
rpos_ = 0;
}
return buf;
}
void dyn_mem_pool::put(dyn_mem_ptr buf)
{
if(pool_[wpos_])
{
chronograph watch;
do
{
std::this_thread::sleep_for(std::chrono::milliseconds(1));
} while (run_ && pool_[wpos_]);
utils::to_log(LOG_LEVEL_DEBUG, "Waiting for putting memory pool took %ums at %u.\n", watch.elapse_ms(), wpos_);
}
if(pool_[wpos_])
{
buf->release();
}
else
{
pool_[wpos_++] = buf;
if(wpos_ >= count_)
wpos_ = 0;
}
}
void dyn_mem_pool::stop(void)
{
run_ = false;
}
uint32_t dyn_mem_pool::count(void)
{
return count_;
}
uint32_t dyn_mem_pool::unit(void)
{
return unit_;
}
uint32_t dyn_mem_pool::take_pos(void)
{
return rpos_;
}

View File

@ -25,8 +25,8 @@ class packet_data_base : public refer
void* user_data_; void* user_data_;
protected: protected:
uint32_t pack_cmd_; uint32_t pack_cmd_ = 0;
uint32_t pack_id_; uint32_t pack_id_ = 0;
uint32_t session_id_ = -1; uint32_t session_id_ = -1;
public: public:
@ -76,6 +76,18 @@ public:
uint8_t* buffer(void); uint8_t* buffer(void);
}; };
class sys_info
{
public:
sys_info();
~sys_info();
public:
static uint32_t page_size;
static uint32_t page_map_size;
static uint32_t cluster_size;
};
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// //
/* data_holder, used when data is also required for a certain packet /* data_holder, used when data is also required for a certain packet
@ -155,6 +167,7 @@ protected:
public: public:
int set_verify_data(const char* data, size_t len); int set_verify_data(const char* data, size_t len);
int open(const char* path, uint64_t size, bool in_mem = false, size_t off = 0); int open(const char* path, uint64_t size, bool in_mem = false, size_t off = 0);
const char* path_file(void);
public: public:
virtual int put_data(const void* data, uint32_t* size/*[in] - total bytes of data; [out] - used bytes*/) override; virtual int put_data(const void* data, uint32_t* size/*[in] - total bytes of data; [out] - used bytes*/) override;
@ -220,6 +233,7 @@ public:
size_t used(size_t len); // used len bytes content, move following data to head and set data length, return rest data length size_t used(size_t len); // used len bytes content, move following data to head and set data length, return rest data length
dyn_mem& operator+=(dyn_mem& r); dyn_mem& operator+=(dyn_mem& r);
void clear_data(void);
// data_source // data_source
public: public:
@ -251,6 +265,7 @@ public:
int open(const char* file, bool in_mem, size_t off = 0); int open(const char* file, bool in_mem, size_t off = 0);
int attach(FILE* f); int attach(FILE* f);
FILE* detach(void); FILE* detach(void);
const char* path_file(void);
public: public:
virtual bool is_memory_block(void) override; virtual bool is_memory_block(void) override;
@ -272,6 +287,30 @@ CLS_PTR(dyn_mem);
CLS_PTR(file_reader); CLS_PTR(file_reader);
class dyn_mem_pool : public refer
{
volatile bool run_ = true;
dyn_mem_ptr *pool_ = nullptr;
uint32_t count_ = 0;
uint32_t unit_ = 0;
uint32_t wpos_ = 0;
uint32_t rpos_ = 0;
public:
dyn_mem_pool(uint32_t cnt, uint32_t unit);
protected:
virtual ~dyn_mem_pool();
public:
dyn_mem_ptr take(void);
void put(dyn_mem_ptr buf);
void stop(void);
uint32_t count(void);
uint32_t unit(void);
uint32_t take_pos(void);
};
// callback proto // callback proto
// //
// parameters: usb_functionfs_event* - the function event ptr // parameters: usb_functionfs_event* - the function event ptr
@ -284,7 +323,7 @@ CLS_PTR(file_reader);
// //
// when invalid packet, suggest use the entire data // when invalid packet, suggest use the entire data
// //
// packet_data_base_ptr* - return data_holder or data_source or nullptr £¨The number of bytes required for this packet, 0 is over for this packet£© // packet_data_base_ptr* - return data_holder or data_source or nullptr <EFBFBD><EFBFBD>The number of bytes required for this packet, 0 is over for this packet<65><74>
// //
// data_holder: the packet/command need more data than dyn_mem_ptr provides to complete the business. such as 'write a large file' // data_holder: the packet/command need more data than dyn_mem_ptr provides to complete the business. such as 'write a large file'
// //

View File

@ -40,15 +40,17 @@
// NOTE: All text transmitted by pack cmd is in UTF-8 format !!! // NOTE: All text transmitted by pack cmd is in UTF-8 format !!!
enum cancel_io
{
CANCEL_IO_CANCEL = 0x0ca0cel,
};
enum ep0_req enum ep0_req
{ {
USB_REQ_EP0_GET_PROTO_VER = 100, // get protocol version (PROTOCOL_VER), req = me, ind = 0, val = 0, len = 2 USB_REQ_EP0_GET_PEER_CONFIG = 100, // get protocol version (PROTOCOL_VER), req = me, ind = 0, val = 0, len = sizeof(PEERCFG)
USB_REQ_EP0_GET_STATUS, // 获取各工作线程状态, return EP0REPLYSTATUS. req = me, ind = 0, val = 0, len = sizeof(EP0REPLYSTATUS) USB_REQ_EP0_GET_STATUS, // 获取各工作线程状态, return EP0REPLYSTATUS. req = me, ind = 0, val = 0, len = sizeof(EP0REPLYSTATUS)
USB_REQ_EP0_RESET_BULK, // 关闭并重新打开BULK端点, return error number (uint32_t). req = me, ind = 0, val = 0, len = sizeof(uint32_t) USB_REQ_EP0_CANCEL_IO, // 设置当前IO数据的有效性. req = me, ind = 0, val = 0, len = sizeof(uint32_t), discard IO data when data is CANCEL_IO_CANCEL
USB_REQ_EP0_CANCEL_CMD, // 取消当前指令的继续执行(一般用于中止大数据的传输). req = me, ind = 0, val = 0, len = sizeof(uint32_t) * 2 [(uint32_t)cmd + (uint32_t)pack-id] // work-flow: write control with 'CANCEL_IO_CANCEL', write bulk with 1 byte, write control with not 'CANCEL_IO_CANCEL' to restore
USB_REQ_EP0_SET_ENCRYPT, // 设置加密方式, req = me, ind = 0, val = 0, len = sizeof(PACK_BASE) USB_REQ_EP0_SET_ENCRYPT, // 设置加密方式, req = me, ind = 0, val = 0, len = sizeof(PACK_BASE)
USB_REQ_EP0_SET_BULK_BUFFER, // 设置bulk缓冲区大小系数 req = me, ind = coef, val = 0, len = 0
}; };
enum woker_status enum woker_status
{ {
@ -57,6 +59,7 @@ enum woker_status
WORKER_STATUS_BUSY, // in working WORKER_STATUS_BUSY, // in working
WORKER_STATUS_ERROR, // error occurs WORKER_STATUS_ERROR, // error occurs
WORKER_STATUS_RESET, // in reset(close and reopen) process WORKER_STATUS_RESET, // in reset(close and reopen) process
WORKER_STATUS_WAIT_RESOURCE, // wait resource
}; };
enum packet_cmd enum packet_cmd
@ -253,6 +256,12 @@ typedef struct _ep0_reply
uint32_t bytes_to_sent; // how many bytes data is waiting for be sent in one replying packet uint32_t bytes_to_sent; // how many bytes data is waiting for be sent in one replying packet
}EP0REPLYSTATUS, *LPEP0REPLYSTATUS; }EP0REPLYSTATUS, *LPEP0REPLYSTATUS;
typedef struct _peer_config
{
uint16_t ver; // protocol version
uint32_t io_size; // IO buffer size
}PEERCFG, *LPPEERCFG;
typedef struct _pack_base // A piece of data has only one header typedef struct _pack_base // A piece of data has only one header
{ {
uint32_t enc_cmd : 2; // encrypting type, for 'cmd' uint32_t enc_cmd : 2; // encrypting type, for 'cmd'

View File

@ -1052,8 +1052,6 @@ namespace utils
ret = statfs(path, &fs); ret = statfs(path, &fs);
if (ret == 0) if (ret == 0)
{ {
utils::to_log(LOG_LEVEL_DEBUG, " Total: %lld, Free: %lld, Avail: %lld, block size: %lld\n",
fs.f_blocks, fs.f_bfree, fs.f_bavail, fs.f_bsize);
if (total) if (total)
*total = fs.f_blocks * fs.f_bsize; *total = fs.f_blocks * fs.f_bsize;
if (avail) if (avail)

View File

@ -6,7 +6,6 @@
#pragma once #pragma once
#include <sane/sane_ex.h>
#include <string> #include <string>
#include <vector> #include <vector>
#include <map> #include <map>
@ -17,6 +16,7 @@
#include "simple_logic.h" #include "simple_logic.h"
#include <json/gb_json.h> #include <json/gb_json.h>
#include <base/utils.h> #include <base/utils.h>
#include <sane/sane_ex.h>
class sane_opt_provider; class sane_opt_provider;