newtx/usb/usb_io.cpp

898 lines
21 KiB
C++

#include "usb_io.h"
#include <sys/fcntl.h>
#include <unistd.h>
#include <linux/usb/functionfs.h>
#include <thread>
#include <base/encrypt.h>
#include "usb_dev.h"
#include "default_cfg.h"
// #define TEST
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// async_usb_gadget
async_usb_gadget::async_usb_gadget(std::function<FUNCTION_PROTO_COMMAND_HANDLE> cmd_handler
, std::function<void(bool)> dev_conn)
: handle_cmd_(cmd_handler), dev_connect_(dev_conn)
, enc_head_(ENCRYPT_CMD_NONE), enc_payload_(ENCRYPT_NONE), enc_data_(0)
#ifdef MEM_POOL
, io_buf_(nullptr)
#else
, io_buf_("IO-buf")
#endif
, cmd_que_("in-queue"), sent_que_("out-queue")
, wait_in_("wait_usb_enable_in"), wait_out_("wait_usb_enable_out")
{
cmd_que_.enable_wait_log(false);
sent_que_.enable_wait_log(false);
wait_in_.enable_log(false);
wait_out_.enable_log(false);
dev_ = new usb_device("fe900000.dwc3", "/opt/cfg/usb_gadget/g1/UDC", "linaro");
dev_->add_endpoint(USB_EP_BULK_IN, true, true);
dev_->add_endpoint(USB_EP_BULK_OUT, true, false);
if((last_err_ = dev_->open_device(USB_DEV, true)))
{
dev_->release();
dev_ = nullptr;
run_ = false;
return;
}
auto ep0 = [this](void) -> void
{
thread_read_ep0();
};
auto task = [this](void) -> void
{
thread_pump_task();
};
auto bulkw = [this](void) -> void
{
while(run_)
{
wait_in_.wait();
if(!run_)
break;
int fd = -1,
err = dev_->open_endpoint(EP_IND_BULK_IN, &fd);
if(err == 0)
{
thread_write_bulk(fd);
if(run_)
dev_->close_endpoint(EP_IND_BULK_IN);
}
}
};
auto bulkr = [this](void) -> void
{
while(run_)
{
wait_out_.wait();
if(!run_)
break;
int fd = -1,
err = dev_->open_endpoint(EP_IND_BULK_OUT, &fd);
if(err == 0)
{
thread_read_bulk(fd);
if(run_)
dev_->close_endpoint(EP_IND_BULK_OUT);
}
}
};
auto excep = [&](const char* thread_name) -> void
{
threads_.stop(thread_name);
// threads_.start(task, "thread_pump_task");
};
threads_.set_exception_handler(excep);
unit_in_ = unit_out_ = global_info::page_size; // dev_->get_max_packet();
// allocate 1MB for IO
#ifdef MEM_POOL
io_buf_ = new dyn_mem_pool(SIZE_MB(1) / unit_out_, unit_out_);
utils::to_log(LOG_LEVEL_DEBUG, "Prepare %u(%u * %u) for IO.\n", io_buf_->count() * unit_out_, io_buf_->count(), unit_in_);
#else
for(int i = 0; i < SIZE_MB(1) / unit_out_; ++i)
{
dyn_mem_ptr buf = dyn_mem::memory(unit_out_);
if(buf)
io_buf_.save(buf);
}
utils::to_log(LOG_LEVEL_DEBUG, "Prepare %u(%u * %u) for IO.\n", io_buf_.size() * unit_out_, io_buf_.size(), unit_in_);
#endif
threads_.start(task, SIZE_MB(4), "thread_pump_task", (void*)&async_usb_gadget::thread_pump_task);
threads_.start(bulkw, SIZE_MB(2), "thread_write_bulk", (void*)&async_usb_gadget::thread_write_bulk);
threads_.start(bulkr, SIZE_MB(2), "thread_read_bulk", (void*)&async_usb_gadget::thread_read_bulk);
threads_.start(ep0, SIZE_MB(1), "thread_read_ep0", (void*)&async_usb_gadget::thread_read_ep0);
}
async_usb_gadget::~async_usb_gadget()
{
stop();
#ifdef MEM_POOL
io_buf_->release();
#else
dyn_mem_ptr buf = nullptr;
while(io_buf_.take(buf, false))
{
if(buf)
buf->release();
}
#endif
}
dyn_mem_ptr async_usb_gadget::get_io_buffer(void)
{
dyn_mem_ptr buf = nullptr;
#ifdef MEM_POOL
buf = io_buf_->take();
#else
io_buf_.take(buf, true);
#endif
return buf;
}
void async_usb_gadget::free_io_buffer(dyn_mem_ptr buf)
{
buf->clear_data();
#ifdef MEM_POOL
io_buf_->put(buf);
#else
io_buf_.save(buf, true);
#endif
}
const char* async_usb_gadget::ep0_status_desc(int ep0_status, char* unk_buf/*>= 20 bytes*/)
{
RETURN_ENUM_STR(ep0_status, EP0_STATUS_IDLE);
RETURN_ENUM_STR(ep0_status, EP0_STATUS_READ_DATA);
RETURN_ENUM_STR(ep0_status, EP0_STATUS_READ_INVAL_DATA);
RETURN_ENUM_STR(ep0_status, EP0_STATUS_HANDLE_CMD);
sprintf(unk_buf, "Unknown status (%d)", ep0_status);
return unk_buf;
}
int async_usb_gadget::wait_fd_event(int fd, int to_ms)
{
struct timeval timeout, *pto = NULL;
fd_set read_set;
int ret = 0;
FD_ZERO(&read_set);
FD_SET(fd, &read_set);
if (to_ms != -1)
{
timeout.tv_sec = to_ms / 1000;
timeout.tv_usec = (to_ms % 1000) * 1000;
pto = &timeout;
}
ret = select(fd + 1, &read_set, NULL, NULL, pto);
return ret;
}
uint16_t async_usb_gadget::get_buf_coefficient(void)
{
SIMPLE_LOCK(buf_coef_lock_);
return buf_coef_;
}
void async_usb_gadget::set_buf_coefficient(int coef)
{
SIMPLE_LOCK(buf_coef_lock_);
buf_coef_ = (coef <= 0 || coef > 10000) ? 1 : coef;
}
dyn_mem_ptr async_usb_gadget::handle_ctrl_message(dyn_mem_ptr data)
{
struct usb_functionfs_event* pev = (struct usb_functionfs_event*)data->ptr();
dyn_mem_ptr reply = nullptr;
switch (pev->type)
{
case FUNCTIONFS_ENABLE:
session_id_++;
utils::to_log(LOG_LEVEL_DEBUG, "////////////// EP0 FFS ENABLE, start session %u ...\n", session_id_);
online_ = true;
cancel_io_ = false;
wait_in_.trigger();
wait_out_.trigger();
if(dev_connect_)
dev_connect_(online_);
break;
case FUNCTIONFS_DISABLE:
online_ = false;
cancel_io_ = true;
if(dev_connect_)
dev_connect_(online_);
utils::to_log(LOG_LEVEL_DEBUG, "////////////// EP0 FFS DISABLE, end session %u ...\n\n", session_id_);
break;
case FUNCTIONFS_SETUP:
reply = handle_ctrl_setup(data);
break;
case FUNCTIONFS_BIND:
utils::to_log(LOG_LEVEL_ALL, "EP0 FFS BIND\n");
// utils::log_mem_info("EP0 FFS BIND:", pev, sizeof(*pev), LOG_LEVEL_DEBUG);
break;
case FUNCTIONFS_UNBIND:
utils::to_log(LOG_LEVEL_ALL, "EP0 FFS UNBIND\n");
break;
case FUNCTIONFS_SUSPEND:
utils::to_log(LOG_LEVEL_ALL, "EP0 FFS SUSPEND\n");
break;
case FUNCTIONFS_RESUME:
utils::to_log(LOG_LEVEL_ALL, "EP0 FFS RESUME\n");
break;
}
if (pev->u.setup.bRequestType & USB_DIR_IN)
{
// if host need data back, and reply is null, we send nonsense data back ...
if (!reply)
{
reply = dyn_mem::memory(pev->u.setup.wLength + sizeof(uint16_t));
reply->set_len(pev->u.setup.wLength);
for (int i = 0; i <= pev->u.setup.wLength / sizeof(uint16_t); ++i)
((uint16_t*)reply->ptr())[i] = 0x0baad;
utils::to_log(LOG_LEVEL_DEBUG, "fake control message reply(%u bytes) while request with USB_DIR_IN flag but no reply returned.\n", reply->get_rest());
}
}
else if(reply)
{
utils::to_log(LOG_LEVEL_DEBUG, "discard control message reply(%u bytes) while request without USB_DIR_IN flag.\n", reply->get_rest());
reply->release();
reply = nullptr;
}
return reply;
}
dyn_mem_ptr async_usb_gadget::handle_ctrl_setup(dyn_mem_ptr data)
{
struct usb_functionfs_event* pev = (struct usb_functionfs_event*)data->ptr();
bool handled = (pev->u.setup.bRequestType & USB_TYPE_MASK) == USB_TYPE_VENDOR;
dyn_mem_ptr reply = nullptr;
uint32_t err = 0;
if(handled)
{
switch (pev->u.setup.bRequest)
{
case USB_REQ_EP0_HAND_SHAKE:
if(pev->u.setup.bRequestType & USB_DIR_IN)
{
reply = dyn_mem::memory(sizeof(PEERCFG));
((LPPEERCFG)reply->ptr())->ver = PROTOCOL_VER;
((LPPEERCFG)reply->ptr())->io_size = unit_out_;
((LPPEERCFG)reply->ptr())->pid = GetCurrentProcessId();
reply->set_len(sizeof(PEERCFG));
}
else
{
host_pid_ = ((LPPEERCFG)(pev + 1))->pid;
utils::to_log(LOG_LEVEL_DEBUG, "Host process %llu connected.\n", host_pid_);
}
break;
case USB_REQ_EP0_GOOD_BYE:
{
host_pid_ = ((LPPEERCFG)(pev + 1))->pid;
utils::to_log(LOG_LEVEL_DEBUG, "Host process %llu disconnected.\n", host_pid_);
host_pid_ = 0;
}
break;
case USB_REQ_EP0_GET_STATUS:
reply = dyn_mem::memory(sizeof(struct _ep0_reply));
reply->set_len(sizeof(struct _ep0_reply));
{
LPEP0REPLYSTATUS lps = (LPEP0REPLYSTATUS)reply->ptr();
lps->in_status = statu_in_;
lps->out_status = statu_out_;
lps->task_cnt = cmd_que_.size();
lps->task_cmd = task_cmd_;
lps->task_pack_id = task_packet_id_;
lps->task_required_bytes = want_bytes_task_;
lps->packets_to_sent = sent_que_.size();
lps->bytes_to_sent = want_bytes_in_;
if(pev->u.setup.wValue)
utils::log_mem_info("threads status:", lps, sizeof(*lps), LOG_LEVEL_DEBUG);
}
break;
case USB_REQ_EP0_CANCEL_IO:
if (pev->u.setup.wLength == sizeof(uint32_t))
{
uint32_t val = *(uint32_t*)&pev[1];
cancel_io_ = (val == CANCEL_IO_CANCEL);
}
break;
case USB_REQ_EP0_SET_ENCRYPT:
if (pev->u.setup.wLength == sizeof(PACK_BASE))
{
LPPACK_BASE pack = (LPPACK_BASE)&pev[1];
enc_head_ = pack->enc_cmd;
enc_payload_ = pack->encrypt;
enc_data_ = pack->enc_data;
utils::to_log(LOG_LEVEL_DEBUG, "Set encrypting method: command - %d; payload - %d\n", enc_head_, enc_payload_);
}
break;
default:
handled = false;
}
}
if(!handled)
{
utils::to_log(LOG_LEVEL_ALL, "EP0 event(0x%x, 0x%x, 0x%x, 0x%x, 0x%x) has not handled by user business.\n"
, pev->u.setup.bRequestType, pev->u.setup.bRequest, pev->u.setup.wValue, pev->u.setup.wIndex, pev->u.setup.wLength);
}
if(reply)
reply->set_session_id(data->get_session_id());
return reply;
}
int async_usb_gadget::inner_write_bulk_memory(int fd, uint8_t* buf, uint32_t* len, uint32_t bulk_size, data_source_ptr prog)
{
int w = 0, to = 0, err = 0, total = *len, off = 0,
size = *len;
uint64_t all = *len,
cur = 0;
auto real_prog = [&](uint64_t total, uint64_t cur, uint32_t err) -> int
{
return prog->notify_progress(total, cur, err);
};
auto empty_prog = [&](uint64_t total, uint64_t cur, uint32_t err) -> int
{
return 0;
};
std::function<int(uint64_t, uint64_t, uint32_t)> progr = empty_prog;
if (prog)
progr = real_prog;
while(!cancel_io_)
{
w = write(fd, buf + off, size);
if(w == -1)
{
if(errno == ETIMEDOUT)
{
if (to++ > 3)
{
utils::to_log(LOG_LEVEL_DEBUG, "Write bulk failed at (%u/%u) for err: %d\n", total, *len, errno);
break;
}
utils::to_log(LOG_LEVEL_DEBUG, "Write bulk timeout at (%u/%u), try again after %ums ...\n", off, total, to * 100);
std::this_thread::sleep_for(std::chrono::milliseconds(to * 100));
continue;
}
err = errno;
{
cur = off;
progr(all, cur, err);
}
break;
}
to = 0;
off += w;
want_bytes_in_ -= w;
{
cur = off;
progr(all, cur, err);
}
if(off >= total)
break;
if(off + bulk_size < total)
size = bulk_size;
else
size = total - off;
}
*len = off;
return err;
}
int async_usb_gadget::inner_write_bulk(int fd, data_source_ptr data, dyn_mem_ptr mem, uint32_t bulk_size)
{
int ret = 0;
uint32_t total = data->get_rest();
statu_in_ = WORKER_STATUS_BUSY;
want_bytes_in_ = total;
if(data->is_memory_block())
{
ret = inner_write_bulk_memory(fd, data->ptr(), &total, bulk_size);
}
else
{
uint32_t len = bulk_size;
while(data->fetch_data(mem->ptr(), &len) == 0)
{
ret = inner_write_bulk_memory(fd, mem->ptr(), &len, bulk_size);
if(ret || data->get_rest() == 0)
break;
total += len;
len = bulk_size;
}
}
statu_in_ = ret ? WORKER_STATUS_ERROR : WORKER_STATUS_IDLE;
want_bytes_in_ -= total;
return ret;
}
dyn_mem_ptr async_usb_gadget::handle_bulk_command(dyn_mem_ptr data, uint32_t* used, packet_data_base_ptr* pkd)
{
dyn_mem_ptr decrypt = packet_decrypt(data);
if (decrypt)
{
data->release();
data = decrypt;
return handle_cmd_(data, used, pkd);
}
else
{
*used = data->get_rest();
*pkd = nullptr;
return nullptr;
}
}
void async_usb_gadget::thread_read_ep0(void)
{
struct timeval timeout;
int headl = sizeof(struct usb_functionfs_event),
datal = 128, // gadget_->usb_config->dev_desc.bMaxPacketSize0,
ret = 0,
fd = dev_->get_device_fd();
dyn_mem_ptr mem = dyn_mem::memory(headl + datal);
uint8_t* ptr = mem->ptr();
uint32_t recycles = 0;
std::string msg("");
struct usb_functionfs_event* pe = (struct usb_functionfs_event*)ptr;
ret = dev_->pull_up(&msg);
while(run_)
{
ret = wait_fd_event(fd);
if( !run_ || ret <= 0 )
{
if(run_)
utils::to_log(LOG_LEVEL_ALL, "select EP0(%d) failed: %d(%s)\n", fd, errno, strerror(errno));
break;
}
// timeout.tv_sec = 0;
ret = read(fd, ptr, headl + datal);
if (ret < 0)
{
utils::to_log(LOG_LEVEL_ALL, "read EP0 failed: %d(%s)\n", errno, strerror(errno));
break;
}
bool good = true;
if(pe->u.setup.bRequestType & USB_DIR_IN)
{
mem->set_len(headl);
}
else
{
if (pe->u.setup.wLength > datal)
{
good = false;
utils::to_log(LOG_LEVEL_ALL, "EP0 data too long(%d > %d), we will discard this packet(0x%x, 0x%x, 0x%x, 0x%x)!\n", pe->u.setup.wLength, datal,
pe->u.setup.bRequestType, pe->u.setup.bRequest, pe->u.setup.wValue, pe->u.setup.wIndex);
// we read-out whole data here ...
int rest = pe->u.setup.wLength;
while (rest > 0)
{
read(fd, ptr + headl, rest > datal ? datal : rest);
rest -= datal;
}
}
else
{
read(fd, ptr + headl, pe->u.setup.wLength);
mem->set_len(headl + pe->u.setup.wLength);
}
}
if (good)
{
dyn_mem_ptr reply = handle_ctrl_message(mem);
if(reply)
{
write(fd, reply->ptr(), reply->get_rest());
reply->release();
}
}
}
mem->release();
}
void async_usb_gadget::thread_read_bulk(int fd)
{
size_t bulk_size = unit_out_; // * get_buf_coefficient();
uint32_t cnt_0 = 0;
while(run_)
{
statu_out_ = WORKER_STATUS_WAIT_RESOURCE;
dyn_mem_ptr buf = get_io_buffer();
int l = 0;
if(!buf)
{
#ifdef MEM_POOL
utils::to_log(LOG_LEVEL_FATAL, "NO IO buffer got at position %u!\n", io_buf_->take_pos());
#else
utils::to_log(LOG_LEVEL_FATAL, "NO IO buffer got while %u elements in queue!\n", io_buf_.size());
#endif
continue;
}
buf->set_session_id(session_id_);
statu_out_ = WORKER_STATUS_IDLE;
l = read(fd, buf->ptr(), bulk_size);
statu_out_ = WORKER_STATUS_BUSY;
if (!run_)
{
buf->release();
break;
}
else if(l <= 0)
{
free_io_buffer(buf);
if(errno)
{
utils::to_log(LOG_LEVEL_FATAL, "read bulk(%d) failed: %d(%s)\n", l, errno, strerror(errno));
statu_out_ = WORKER_STATUS_ERROR;
break;
}
else
{
cnt_0++;
}
}
else
{
// if(!cancel_io_) // ensure to trigger task thread clean it's work
{
buf->set_len(l);
cmd_que_.save(buf, true);
}
// else
// {
// free_io_buffer(buf);
// utils::to_log(LOG_LEVEL_DEBUG, "Cancel read with %u bytes.\n", l);
// }
}
}
statu_out_ = WORKER_STATUS_NOT_START;
}
void async_usb_gadget::thread_write_bulk(int fd)
{
uint32_t bulk_size = unit_in_ * get_buf_coefficient();
dyn_mem_ptr mem = dyn_mem::memory(bulk_size);
statu_in_ = WORKER_STATUS_IDLE;
while(run_)
{
data_source_ptr data;
int err = 0;
if(sent_que_.take(data, true))
{
want_bytes_in_ = data->get_rest();
if(!cancel_io_)
{
err = inner_write_bulk(fd, data, mem, bulk_size);
{
file_reader_ptr fr = dynamic_cast<file_reader_ptr>(data);
if(fr)
{
utils::to_log(LOG_LEVEL_DEBUG, "Sent file(%s) with error: %d.\n", fr->path_file(), err);
}
}
}
else
{
utils::to_log(LOG_LEVEL_DEBUG, "Cancel write with %u bytes.\n", data->get_rest());
}
data->release();
if(err)
{
utils::to_log(LOG_LEVEL_FATAL, "write bulk failed: %d(%s)\n", errno, strerror(errno));
break;
}
}
}
mem->release();
statu_in_ = WORKER_STATUS_NOT_START;
}
void async_usb_gadget::thread_pump_task(void)
{
dyn_mem_ptr prev = nullptr, data = nullptr, reply = nullptr;
bool in_err_statu = false;
uint32_t required = 0, used = 0, restore_err_cnt = 0, max_que = 0;
data_holder *dh = nullptr;
LPPACK_BASE pack = nullptr;
uint64_t total_size = 0;
while(run_)
{
data = nullptr;
statu_task_ = WORKER_STATUS_IDLE;
task_cmd_ = 0;
if(cmd_que_.take(data, true) && data)
{
bool pool = true;
statu_task_ = WORKER_STATUS_BUSY;
if(max_que < cmd_que_.size())
max_que = cmd_que_.size();
if(cancel_io_)
{
if(prev)
{
utils::to_log(LOG_LEVEL_DEBUG, "Cancel task previous packet with %u bytes.\n", prev->get_rest());
prev->release();
prev = nullptr;
}
if(dh)
{
utils::to_log(LOG_LEVEL_DEBUG, "Cancel task holder need %u bytes.\n", dh->get_required());
dh->cancel();
dh->release();
dh = nullptr;
}
if(reply)
{
reply->release();
reply = nullptr;
}
want_bytes_task_ = 0;
utils::to_log(LOG_LEVEL_DEBUG, "Cancel task with %u bytes.\n", data->get_rest());
data->release();
continue;
}
if(prev)
{
if(prev->get_session_id() != data->get_session_id())
{
utils::to_log(LOG_LEVEL_DEBUG, "Discard previous packet for the session ID(%u) is not equal to now(%u).\n", prev->get_session_id(), data->get_session_id());
prev->release();
}
else
{
utils::to_log(LOG_LEVEL_ALL, "Combine partial packet with length %u and %u ...\n", prev->get_rest(), data->get_rest());
*prev += *data;
free_io_buffer(data);
data = prev;
pool = false;
}
prev = nullptr;
}
if(dh && dh->get_session_id() != data->get_session_id())
{
dh->cancel();
dh->release();
dh = nullptr;
}
if (!online_ || data->get_session_id() != session_id_)
{
utils::to_log(LOG_LEVEL_DEBUG, "Discard task for session ID(%u) is not equal now(%u) or is offline.\n", data->get_session_id(), session_id_);
if(pool)
free_io_buffer(data);
else
data->release();
continue;
}
do
{
packet_data_base_ptr pack_data = nullptr;
data_source_ptr ds = nullptr;
bool notify_reply_ok = false;
if(dh == nullptr)
{
if (!online_ || data->get_rest() < sizeof(PACK_BASE))
break;
else
{
pack = (LPPACK_BASE)data->ptr();
task_cmd_ = pack->cmd;
task_packet_id_ = pack->pack_id;
want_bytes_task_ = 0;
used = data->get_rest();
reply = handle_bulk_command(data, &used, &pack_data);
notify_reply_ok = used & (INT32_MAX + 1);
used &= INT32_MAX;
if(pack_data)
{
pack_data->set_session_id(data->get_session_id());
dh = dynamic_cast<data_holder_ptr>(pack_data);
if(!dh)
{
ds = dynamic_cast<data_source_ptr>(pack_data);
if(!ds)
pack_data->release();
else
want_bytes_task_ = ds->get_rest();
}
else
{
total_size = 0;
want_bytes_task_ = dh->get_required();
}
}
}
}
else
{
uint32_t len = data->get_rest();
int err = dh->put_data(data->ptr(), &len);
if(len < data->get_rest())
{
utils::to_log(LOG_LEVEL_WARNING, "Put partial data %u/%u! at +%08X with error %d\n", len, data->get_rest(), total_size, err);
}
total_size += len;
if(dh->is_complete() || err)
{
reply = dyn_mem::memory(sizeof(PACK_BASE));
reply->set_session_id(dh->get_session_id());
pack = (LPPACK_BASE)reply->ptr();
BASE_PACKET_REPLY(*pack, dh->get_packet_command() + 1, dh->get_packet_id(), err);
reply->set_len(sizeof(PACK_BASE));
utils::to_log(LOG_LEVEL_ALL, "Received large data(%p) with error(Max queue size is %u): %d.\n", total_size, max_que, err);
dh->release();
dh = nullptr;
want_bytes_task_ = 0;
}
else
{
want_bytes_task_ = dh->get_required();
}
used = len;
if (err)
{
in_err_statu = true;
restore_err_cnt = 0;
}
}
// first reply the packet ...
if(reply)
{
// encrypt ...
dyn_mem_ptr enc = packet_encrypt(reply, enc_head_, enc_payload_, enc_data_);
if (enc)
{
reply->release();
reply = enc;
}
reply->set_session_id(data->get_session_id());
write_bulk(dynamic_cast<data_source_ptr>(reply));
if(notify_reply_ok)
handle_bulk_command(data, nullptr, nullptr);
reply->release();
reply = nullptr;
}
// second send appendix data ...
if (ds)
{
write_bulk(ds);
ds->release();
ds = nullptr;
}
data->used(used);
}while(used && data->get_rest());
if(data->get_rest())
{
if(pool)
{
prev = dyn_mem::memory(unit_out_);
prev->put(data->ptr(), data->get_rest());
free_io_buffer(data);
}
else
{
prev = data;
}
}
else
{
if(!dh)
{
task_cmd_ = task_packet_id_ = want_bytes_task_ = 0;
}
if(pool)
free_io_buffer(data);
else
data->release();
}
}
}
if(prev)
prev->release();
if (data)
data->release();
if (reply)
reply->release();
if (dh)
dh->release();
}
int async_usb_gadget::stop(void)
{
run_ = false;
if(dev_)
{
dev_->pull_down();
dev_->close_device();
dev_->release();
dev_ = nullptr;
}
#ifdef MEM_POOL
io_buf_->stop();
#else
io_buf_.trigger();
#endif
wait_in_.trigger();
wait_out_.trigger();
cmd_que_.trigger();
sent_que_.trigger();
return 0;
}
int async_usb_gadget::write_bulk(data_source_ptr data)
{
if(data->get_session_id() != session_id_ || !online_)
{
utils::to_log(LOG_LEVEL_DEBUG, "Discard reply packet for the session ID(%u) is not equal current session(%u) or is offline.\n", data->get_session_id(), session_id_);
// data->release();
return sent_que_.size();
}
else
{
int quel = 0;
data->add_ref();
quel = sent_que_.save(data, true);
return quel;
}
}
int async_usb_gadget::last_error(void)
{
return last_err_;
}