添加CancelIO协议

This commit is contained in:
gb 2023-12-14 14:24:15 +08:00
parent 32f830ee94
commit 137ba57a05
3 changed files with 83 additions and 41 deletions

View File

@ -40,16 +40,17 @@
// NOTE: All text transmitted by pack cmd is in UTF-8 format !!!
enum cancel_io
{
CANCEL_IO_CANCEL = 0x0ca0cel,
};
enum ep0_req
{
USB_REQ_EP0_GET_PROTO_VER = 100, // get protocol version (PROTOCOL_VER), req = me, ind = 0, val = 0, len = 2
USB_REQ_EP0_GET_IO_SIZE, // get bulk size of IO buffer, req = me, ind = 0, val = 0, len = sizeof(uint32_t)
USB_REQ_EP0_GET_PEER_CONFIG = 100, // get protocol version (PROTOCOL_VER), req = me, ind = 0, val = 0, len = sizeof(PEERCFG)
USB_REQ_EP0_GET_STATUS, // 获取各工作线程状态, return EP0REPLYSTATUS. req = me, ind = 0, val = 0, len = sizeof(EP0REPLYSTATUS)
USB_REQ_EP0_RESET_BULK, // 关闭并重新打开BULK端点, return error number (uint32_t). req = me, ind = 0, val = 0, len = sizeof(uint32_t)
USB_REQ_EP0_CANCEL_CMD, // 取消当前指令的继续执行(一般用于中止大数据的传输). req = me, ind = 0, val = 0, len = sizeof(uint32_t) * 2 [(uint32_t)cmd + (uint32_t)pack-id]
USB_REQ_EP0_CANCEL_IO, // 设置当前IO数据的有效性. req = me, ind = 0, val = 0, len = sizeof(uint32_t), discard IO data when data is CANCEL_IO_CANCEL
// work-flow: write control with 'CANCEL_IO_CANCEL', write bulk with 1 byte, write control with not 'CANCEL_IO_CANCEL' to restore
USB_REQ_EP0_SET_ENCRYPT, // 设置加密方式, req = me, ind = 0, val = 0, len = sizeof(PACK_BASE)
USB_REQ_EP0_SET_BULK_BUFFER, // 设置bulk缓冲区大小系数 req = me, ind = coef, val = 0, len = 0
};
enum woker_status
{
@ -255,6 +256,12 @@ typedef struct _ep0_reply
uint32_t bytes_to_sent; // how many bytes data is waiting for be sent in one replying packet
}EP0REPLYSTATUS, *LPEP0REPLYSTATUS;
typedef struct _peer_config
{
uint16_t ver; // protocol version
uint32_t io_size; // IO buffer size
}PEERCFG, *LPPEERCFG;
typedef struct _pack_base // A piece of data has only one header
{
uint32_t enc_cmd : 2; // encrypting type, for 'cmd'

View File

@ -266,10 +266,11 @@ dyn_mem_ptr async_usb_gadget::handle_ctrl_setup(dyn_mem_ptr data)
{
switch (pev->u.setup.bRequest)
{
case USB_REQ_EP0_GET_PROTO_VER:
reply = dyn_mem::memory(sizeof(short));
*(short*)reply->ptr() = PROTOCOL_VER;
reply->set_len(sizeof(short));
case USB_REQ_EP0_GET_PEER_CONFIG:
reply = dyn_mem::memory(sizeof(PEERCFG));
((LPPEERCFG)reply->ptr())->ver = PROTOCOL_VER;
((LPPEERCFG)reply->ptr())->io_size = unit_out_;
reply->set_len(sizeof(PEERCFG));
break;
case USB_REQ_EP0_GET_STATUS:
reply = dyn_mem::memory(sizeof(struct _ep0_reply));
@ -278,6 +279,7 @@ dyn_mem_ptr async_usb_gadget::handle_ctrl_setup(dyn_mem_ptr data)
LPEP0REPLYSTATUS lps = (LPEP0REPLYSTATUS)reply->ptr();
lps->in_status = statu_in_;
lps->out_status = statu_out_;
lps->task_cnt = cmd_que_.size();
lps->task_cmd = task_cmd_;
lps->task_pack_id = task_packet_id_;
lps->task_required_bytes = want_bytes_task_;
@ -286,13 +288,12 @@ dyn_mem_ptr async_usb_gadget::handle_ctrl_setup(dyn_mem_ptr data)
utils::log_mem_info("threads status:", lps, sizeof(*lps), LOG_LEVEL_DEBUG);
}
break;
case USB_REQ_EP0_GET_IO_SIZE:
reply = dyn_mem::memory(sizeof(unit_out_));
reply->put(&unit_out_, sizeof(unit_out_));
break;
case USB_REQ_EP0_RESET_BULK:
reply = dyn_mem::memory(sizeof(uint32_t));
reply->put(&err, sizeof(err));
case USB_REQ_EP0_CANCEL_IO:
if (pev->u.setup.wLength == sizeof(uint32_t))
{
uint32_t val = *(uint32_t*)&pev[1];
cancel_io_ = (val == CANCEL_IO_CANCEL);
}
break;
case USB_REQ_EP0_SET_ENCRYPT:
if (pev->u.setup.wLength == sizeof(PACK_BASE))
@ -304,15 +305,6 @@ dyn_mem_ptr async_usb_gadget::handle_ctrl_setup(dyn_mem_ptr data)
utils::to_log(LOG_LEVEL_DEBUG, "Set encrypting method: command - %d; payload - %d\n", enc_head_, enc_payload_);
}
break;
case USB_REQ_EP0_SET_BULK_BUFFER:
// if(pev->u.setup.wLength == sizeof(short))
{
uint16_t pre = buf_coef_;
set_buf_coefficient(pev->u.setup.wIndex);
utils::to_log(LOG_LEVEL_DEBUG, "Set bulk buffer size coefficient from %d to %d\n", pre, buf_coef_);
}
break;
default:
handled = false;
}
@ -332,7 +324,7 @@ int async_usb_gadget::inner_write_bulk_memory(int fd, uint8_t* buf, uint32_t* le
int w = 0, to = 0, err = 0, total = *len, off = 0,
size = *len;
while(1)
while(!cancel_io_)
{
w = write(fd, buf + off, size);
if(w == -1)
@ -491,6 +483,7 @@ void async_usb_gadget::thread_read_bulk(int fd)
while(run_)
{
statu_out_ = WORKER_STATUS_WAIT_RESOURCE;
dyn_mem_ptr buf = get_io_buffer();
int l = 0;
@ -508,7 +501,12 @@ void async_usb_gadget::thread_read_bulk(int fd)
buf->set_session_id(session_id_);
l = read(fd, buf->ptr(), bulk_size);
statu_out_ = WORKER_STATUS_IDLE;
if(l <= 0)
if (!run_)
{
buf->release();
break;
}
else if(l <= 0)
{
free_io_buffer(buf);
if(errno)
@ -522,16 +520,18 @@ void async_usb_gadget::thread_read_bulk(int fd)
cnt_0++;
}
}
else if (!run_)
else
{
utils::to_log(LOG_LEVEL_ALL, "thread_read_bulk do reset-bulk ...\n");
buf->release();
break;
}
else
{
buf->set_len(l);
cmd_que_.save(buf, true);
// if(!cancel_io_) // ensure to trigger task thread clean it's work
{
buf->set_len(l);
cmd_que_.save(buf, true);
}
// else
// {
// free_io_buffer(buf);
// utils::to_log(LOG_LEVEL_DEBUG, "Cancel read with %u bytes.\n", l);
// }
}
}
statu_out_ = WORKER_STATUS_NOT_START;
@ -550,14 +550,21 @@ void async_usb_gadget::thread_write_bulk(int fd)
if(sent_que_.take(data, true))
{
want_bytes_in_ = data->get_rest();
err = inner_write_bulk(fd, data, mem, bulk_size);
if(!cancel_io_)
{
file_reader_ptr fr = dynamic_cast<file_reader_ptr>(data);
if(fr)
err = inner_write_bulk(fd, data, mem, bulk_size);
{
utils::to_log(LOG_LEVEL_DEBUG, "Sent file(%s) with error: %d.\n", fr->path_file(), err);
file_reader_ptr fr = dynamic_cast<file_reader_ptr>(data);
if(fr)
{
utils::to_log(LOG_LEVEL_DEBUG, "Sent file(%s) with error: %d.\n", fr->path_file(), err);
}
}
}
else
{
utils::to_log(LOG_LEVEL_DEBUG, "Cancel write with %u bytes.\n", data->get_rest());
}
data->release();
if(err)
{
@ -583,6 +590,7 @@ void async_usb_gadget::thread_pump_task(void)
{
data = nullptr;
statu_task_ = WORKER_STATUS_IDLE;
task_cmd_ = 0;
if(cmd_que_.take(data, true) && data)
{
bool pool = true;
@ -590,6 +598,32 @@ void async_usb_gadget::thread_pump_task(void)
if(max_que < cmd_que_.size())
max_que = cmd_que_.size();
if(cancel_io_)
{
if(prev)
{
utils::to_log(LOG_LEVEL_DEBUG, "Cancel task previous packet with %u bytes.\n", prev->get_rest());
prev->release();
prev = nullptr;
}
if(dh)
{
utils::to_log(LOG_LEVEL_DEBUG, "Cancel task holder need %u bytes.\n", dh->get_required());
dh->cancel();
dh->release();
dh = nullptr;
}
if(reply)
{
reply->release();
reply = nullptr;
}
want_bytes_task_ = 0;
utils::to_log(LOG_LEVEL_DEBUG, "Cancel task with %u bytes.\n", data->get_rest());
data->release();
continue;
}
if(prev)
{
if(prev->get_session_id() != data->get_session_id())

View File

@ -42,6 +42,7 @@ class usb_device;
class async_usb_gadget : public refer
{
volatile bool run_ = true;
volatile bool cancel_io_ = false;
usb_device *dev_ = nullptr;
safe_thread threads_;
size_t unit_in_ = 0x200;