#include "usb_io.h" #include #include #include #include #include #include "usb_dev.h" #include "default_cfg.h" // #define TEST //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // async_usb_gadget async_usb_gadget::async_usb_gadget(std::function cmd_handler , std::function dev_conn) : handle_cmd_(cmd_handler), dev_connect_(dev_conn) , enc_head_(ENCRYPT_CMD_NONE), enc_payload_(ENCRYPT_NONE), enc_data_(0) #ifdef MEM_POOL , io_buf_(nullptr) #else , io_buf_("IO-buf") #endif , cmd_que_("in-queue"), sent_que_("out-queue") , wait_in_("wait_usb_enable_in"), wait_out_("wait_usb_enable_out") { cmd_que_.enable_wait_log(false); sent_que_.enable_wait_log(false); wait_in_.enable_log(false); wait_out_.enable_log(false); dev_ = new usb_device("fe900000.dwc3", "/opt/cfg/usb_gadget/g1/UDC", "linaro"); dev_->add_endpoint(USB_EP_BULK_IN, true, true); dev_->add_endpoint(USB_EP_BULK_OUT, true, false); if((last_err_ = dev_->open_device(USB_DEV, true))) { dev_->release(); dev_ = nullptr; run_ = false; return; } auto ep0 = [this](void*) -> void { thread_read_ep0(); }; auto task = [this](void*) -> void { thread_pump_task(); }; auto bulkw = [this](void*) -> void { #ifdef BIND_CPU std::vector cpu; for(int i = 0; i < CPU_MAJOR_CNT; ++i) cpu.push_back(CPU_MAJOR_0 + i); utils::set_cpu_affinity(&cpu[0], cpu.size()); #endif while(run_) { wait_in_.wait(); if(!run_) break; int fd = -1, err = dev_->open_endpoint(EP_IND_BULK_IN, &fd); if(err == 0) { thread_write_bulk(fd); if(run_) dev_->close_endpoint(EP_IND_BULK_IN); } } }; auto bulkr = [this](void*) -> void { while(run_) { wait_out_.wait(); if(!run_) break; int fd = -1, err = dev_->open_endpoint(EP_IND_BULK_OUT, &fd); if(err == 0) { thread_read_bulk(fd); if(run_) dev_->close_endpoint(EP_IND_BULK_OUT); } } }; auto excep = [&](const char* thread_name, void* param) -> void { threads_.stop(thread_name); // threads_.start(task, "thread_pump_task"); }; threads_.set_exception_handler(excep); unit_in_ = unit_out_ = global_info::page_size; // dev_->get_max_packet(); // allocate 1MB for IO #ifdef MEM_POOL io_buf_ = new dyn_mem_pool(SIZE_MB(1) / unit_out_, unit_out_); utils::to_log(LOG_LEVEL_DEBUG, "Prepare %u(%u * %u) for IO.\n", io_buf_->count() * unit_out_, io_buf_->count(), unit_in_); #else for(int i = 0; i < SIZE_MB(1) / unit_out_; ++i) { dyn_mem_ptr buf = dyn_mem::memory(unit_out_); if(buf) io_buf_.save(buf); } utils::to_log(LOG_LEVEL_DEBUG, "Prepare %u(%u * %u) for IO.\n", io_buf_.size() * unit_out_, io_buf_.size(), unit_in_); #endif threads_.start(task, nullptr, SIZE_MB(4), "thread_pump_task", (void*)&async_usb_gadget::thread_pump_task); threads_.start(bulkw, nullptr, SIZE_MB(2), "thread_write_bulk", (void*)&async_usb_gadget::thread_write_bulk); threads_.start(bulkr, nullptr, SIZE_MB(2), "thread_read_bulk", (void*)&async_usb_gadget::thread_read_bulk); threads_.start(ep0, nullptr, SIZE_MB(1), "thread_read_ep0", (void*)&async_usb_gadget::thread_read_ep0); } async_usb_gadget::~async_usb_gadget() { stop(); #ifdef MEM_POOL io_buf_->release(); #else dyn_mem_ptr buf = nullptr; while(io_buf_.take(buf, false)) { if(buf) buf->release(); } #endif } dyn_mem_ptr async_usb_gadget::get_io_buffer(void) { dyn_mem_ptr buf = nullptr; #ifdef MEM_POOL buf = io_buf_->take(); #else io_buf_.take(buf, true); #endif return buf; } void async_usb_gadget::free_io_buffer(dyn_mem_ptr buf) { buf->clear_data(); #ifdef MEM_POOL io_buf_->put(buf); #else io_buf_.save(buf, true); #endif } const char* async_usb_gadget::ep0_status_desc(int ep0_status, char* unk_buf/*>= 20 bytes*/) { RETURN_ENUM_STR(ep0_status, EP0_STATUS_IDLE); RETURN_ENUM_STR(ep0_status, EP0_STATUS_READ_DATA); RETURN_ENUM_STR(ep0_status, EP0_STATUS_READ_INVAL_DATA); RETURN_ENUM_STR(ep0_status, EP0_STATUS_HANDLE_CMD); sprintf(unk_buf, "Unknown status (%d)", ep0_status); return unk_buf; } int async_usb_gadget::wait_fd_event(int fd, int to_ms) { struct timeval timeout, *pto = NULL; fd_set read_set; int ret = 0; FD_ZERO(&read_set); FD_SET(fd, &read_set); if (to_ms != -1) { timeout.tv_sec = to_ms / 1000; timeout.tv_usec = (to_ms % 1000) * 1000; pto = &timeout; } ret = select(fd + 1, &read_set, NULL, NULL, pto); return ret; } uint16_t async_usb_gadget::get_buf_coefficient(void) { SIMPLE_LOCK(buf_coef_lock_); return buf_coef_; } void async_usb_gadget::set_buf_coefficient(int coef) { SIMPLE_LOCK(buf_coef_lock_); buf_coef_ = (coef <= 0 || coef > 10000) ? 1 : coef; } dyn_mem_ptr async_usb_gadget::handle_ctrl_message(dyn_mem_ptr data) { struct usb_functionfs_event* pev = (struct usb_functionfs_event*)data->ptr(); dyn_mem_ptr reply = nullptr; switch (pev->type) { case FUNCTIONFS_ENABLE: session_id_++; utils::to_log(LOG_LEVEL_DEBUG, "////////////// EP0 FFS ENABLE, start session %u ...\n", session_id_); online_ = true; cancel_io_ = false; wait_in_.trigger(); wait_out_.trigger(); if(dev_connect_) dev_connect_(online_); break; case FUNCTIONFS_DISABLE: online_ = false; cancel_io_ = true; if(dev_connect_) dev_connect_(online_); utils::to_log(LOG_LEVEL_DEBUG, "////////////// EP0 FFS DISABLE, end session %u ...\n\n", session_id_); break; case FUNCTIONFS_SETUP: reply = handle_ctrl_setup(data); break; case FUNCTIONFS_BIND: utils::to_log(LOG_LEVEL_ALL, "EP0 FFS BIND\n"); // utils::log_mem_info("EP0 FFS BIND:", pev, sizeof(*pev), LOG_LEVEL_DEBUG); break; case FUNCTIONFS_UNBIND: utils::to_log(LOG_LEVEL_ALL, "EP0 FFS UNBIND\n"); break; case FUNCTIONFS_SUSPEND: utils::to_log(LOG_LEVEL_ALL, "EP0 FFS SUSPEND\n"); break; case FUNCTIONFS_RESUME: utils::to_log(LOG_LEVEL_ALL, "EP0 FFS RESUME\n"); break; } if (pev->u.setup.bRequestType & USB_DIR_IN) { // if host need data back, and reply is null, we send nonsense data back ... if (!reply) { reply = dyn_mem::memory(pev->u.setup.wLength + sizeof(uint16_t)); reply->set_len(pev->u.setup.wLength); for (int i = 0; i <= pev->u.setup.wLength / sizeof(uint16_t); ++i) ((uint16_t*)reply->ptr())[i] = 0x0baad; utils::to_log(LOG_LEVEL_DEBUG, "fake control message reply(%u bytes) while request with USB_DIR_IN flag but no reply returned.\n", reply->get_rest()); } } else if(reply) { utils::to_log(LOG_LEVEL_DEBUG, "discard control message reply(%u bytes) while request without USB_DIR_IN flag.\n", reply->get_rest()); reply->release(); reply = nullptr; } return reply; } dyn_mem_ptr async_usb_gadget::handle_ctrl_setup(dyn_mem_ptr data) { struct usb_functionfs_event* pev = (struct usb_functionfs_event*)data->ptr(); bool handled = (pev->u.setup.bRequestType & USB_TYPE_MASK) == USB_TYPE_VENDOR; dyn_mem_ptr reply = nullptr; uint32_t err = 0; if(handled) { switch (pev->u.setup.bRequest) { case USB_REQ_EP0_HAND_SHAKE: if(pev->u.setup.bRequestType & USB_DIR_IN) { reply = dyn_mem::memory(sizeof(PEERCFG)); ((LPPEERCFG)reply->ptr())->ver = PROTOCOL_VER; ((LPPEERCFG)reply->ptr())->io_size = unit_out_; ((LPPEERCFG)reply->ptr())->pid = GetCurrentProcessId(); reply->set_len(sizeof(PEERCFG)); } else { host_pid_ = ((LPPEERCFG)(pev + 1))->pid; utils::to_log(LOG_LEVEL_DEBUG, "Host process %llu connected.\n", host_pid_); } break; case USB_REQ_EP0_GOOD_BYE: { host_pid_ = ((LPPEERCFG)(pev + 1))->pid; utils::to_log(LOG_LEVEL_DEBUG, "Host process %llu disconnected.\n", host_pid_); host_pid_ = 0; } break; case USB_REQ_EP0_GET_STATUS: reply = dyn_mem::memory(sizeof(struct _ep0_reply)); reply->set_len(sizeof(struct _ep0_reply)); { LPEP0REPLYSTATUS lps = (LPEP0REPLYSTATUS)reply->ptr(); get_ep_status(lps); if(pev->u.setup.wValue) utils::log_mem_info("threads status:", lps, sizeof(*lps), LOG_LEVEL_DEBUG); } break; case USB_REQ_EP0_CANCEL_IO: if (pev->u.setup.wLength == sizeof(uint32_t)) { uint32_t val = *(uint32_t*)&pev[1]; cancel_io_ = (val == CANCEL_IO_CANCEL); } break; case USB_REQ_EP0_SET_ENCRYPT: if (pev->u.setup.wLength == sizeof(PACK_BASE)) { LPPACK_BASE pack = (LPPACK_BASE)&pev[1]; enc_head_ = pack->enc_cmd; enc_payload_ = pack->encrypt; enc_data_ = pack->enc_data; utils::to_log(LOG_LEVEL_DEBUG, "Set encrypting method: command - %d; payload - %d\n", enc_head_, enc_payload_); } break; default: handled = false; } } if(!handled) { utils::to_log(LOG_LEVEL_ALL, "EP0 event(0x%x, 0x%x, 0x%x, 0x%x, 0x%x) has not handled by user business.\n" , pev->u.setup.bRequestType, pev->u.setup.bRequest, pev->u.setup.wValue, pev->u.setup.wIndex, pev->u.setup.wLength); } if(reply) reply->set_session_id(data->get_session_id()); return reply; } int async_usb_gadget::inner_write_bulk_memory(int fd, uint8_t* buf, uint32_t* len, uint32_t bulk_size, data_source_ptr prog) { int w = 0, to = 0, err = 0, total = *len, off = 0, size = *len; uint64_t all = *len, cur = 0; auto real_prog = [&](uint64_t total, uint64_t cur, uint32_t err) -> int { return prog->notify_progress(total, cur, err); }; auto empty_prog = [&](uint64_t total, uint64_t cur, uint32_t err) -> int { return 0; }; std::function progr = empty_prog; if (prog) progr = real_prog; while(!cancel_io_) { w = write(fd, buf + off, size); if(w == -1) { if(errno == ETIMEDOUT) { if (to++ > 3) { utils::to_log(LOG_LEVEL_DEBUG, "Write bulk failed at (%u/%u) for err: %d\n", total, *len, errno); break; } utils::to_log(LOG_LEVEL_DEBUG, "Write bulk timeout at (%u/%u), try again after %ums ...\n", off, total, to * 100); std::this_thread::sleep_for(std::chrono::milliseconds(to * 100)); continue; } err = errno; { cur = off; progr(all, cur, err); } break; } to = 0; off += w; want_bytes_in_ -= w; { cur = off; progr(all, cur, err); } if(off >= total) break; if(off + bulk_size < total) size = bulk_size; else size = total - off; } *len = off; return err; } int async_usb_gadget::inner_write_bulk(int fd, data_source_ptr data, dyn_mem_ptr mem, uint32_t bulk_size) { int ret = 0; uint32_t total = data->get_rest(); statu_in_ = WORKER_STATUS_BUSY; want_bytes_in_ = total; if(data->is_memory_block()) { ret = inner_write_bulk_memory(fd, data->ptr(), &total, bulk_size); } else { uint32_t len = bulk_size; while(data->fetch_data(mem->ptr(), &len) == 0) { ret = inner_write_bulk_memory(fd, mem->ptr(), &len, bulk_size); if(ret || data->get_rest() == 0) break; total += len; len = bulk_size; } } statu_in_ = ret ? WORKER_STATUS_ERROR : WORKER_STATUS_IDLE; // want_bytes_in_ -= total; return ret; } dyn_mem_ptr async_usb_gadget::handle_bulk_command(dyn_mem_ptr data, uint32_t* used, packet_data_base_ptr* pkd) { dyn_mem_ptr decrypt = packet_decrypt(data); if (decrypt) { data->release(); data = decrypt; return handle_cmd_(data, used, pkd); } else { *used = data->get_rest(); *pkd = nullptr; return nullptr; } } void async_usb_gadget::thread_read_ep0(void) { struct timeval timeout; int headl = sizeof(struct usb_functionfs_event), datal = 128, // gadget_->usb_config->dev_desc.bMaxPacketSize0, ret = 0, fd = dev_->get_device_fd(); dyn_mem_ptr mem = dyn_mem::memory(headl + datal); uint8_t* ptr = mem->ptr(); uint32_t recycles = 0; std::string msg(""); struct usb_functionfs_event* pe = (struct usb_functionfs_event*)ptr; ret = dev_->pull_up(&msg); while(run_) { ret = wait_fd_event(fd); if( !run_ || ret <= 0 ) { if(run_) utils::to_log(LOG_LEVEL_ALL, "select EP0(%d) failed: %d(%s)\n", fd, errno, strerror(errno)); break; } // timeout.tv_sec = 0; ret = read(fd, ptr, headl + datal); if (ret < 0) { utils::to_log(LOG_LEVEL_ALL, "read EP0 failed: %d(%s)\n", errno, strerror(errno)); break; } bool good = true; if(pe->u.setup.bRequestType & USB_DIR_IN) { mem->set_len(headl); } else { if (pe->u.setup.wLength > datal) { good = false; utils::to_log(LOG_LEVEL_ALL, "EP0 data too long(%d > %d), we will discard this packet(0x%x, 0x%x, 0x%x, 0x%x)!\n", pe->u.setup.wLength, datal, pe->u.setup.bRequestType, pe->u.setup.bRequest, pe->u.setup.wValue, pe->u.setup.wIndex); // we read-out whole data here ... int rest = pe->u.setup.wLength; while (rest > 0) { read(fd, ptr + headl, rest > datal ? datal : rest); rest -= datal; } } else { read(fd, ptr + headl, pe->u.setup.wLength); mem->set_len(headl + pe->u.setup.wLength); } } if (good) { dyn_mem_ptr reply = handle_ctrl_message(mem); if(reply) { write(fd, reply->ptr(), reply->get_rest()); reply->release(); } } } mem->release(); } void async_usb_gadget::thread_read_bulk(int fd) { size_t bulk_size = unit_out_; // * get_buf_coefficient(); uint32_t cnt_0 = 0; while(run_) { statu_out_ = WORKER_STATUS_WAIT_RESOURCE; dyn_mem_ptr buf = get_io_buffer(); int l = 0; if(!buf) { #ifdef MEM_POOL utils::to_log(LOG_LEVEL_FATAL, "NO IO buffer got at position %u!\n", io_buf_->take_pos()); #else utils::to_log(LOG_LEVEL_FATAL, "NO IO buffer got while %u elements in queue!\n", io_buf_.size()); #endif continue; } buf->set_session_id(session_id_); statu_out_ = WORKER_STATUS_IDLE; l = read(fd, buf->ptr(), bulk_size); statu_out_ = WORKER_STATUS_BUSY; if (!run_) { buf->release(); break; } else if(l <= 0) { free_io_buffer(buf); if(errno) { utils::to_log(LOG_LEVEL_FATAL, "read bulk(%d) failed: %d(%s)\n", l, errno, strerror(errno)); statu_out_ = WORKER_STATUS_ERROR; break; } else { cnt_0++; } } else { // if(!cancel_io_) // ensure to trigger task thread clean it's work { buf->set_len(l); cmd_que_.save(buf, true); } // else // { // free_io_buffer(buf); // utils::to_log(LOG_LEVEL_DEBUG, "Cancel read with %u bytes.\n", l); // } } } statu_out_ = WORKER_STATUS_NOT_START; } void async_usb_gadget::thread_write_bulk(int fd) { uint32_t bulk_size = unit_in_ * get_buf_coefficient(); dyn_mem_ptr mem = dyn_mem::memory(bulk_size); statu_in_ = WORKER_STATUS_IDLE; while(run_) { data_source_ptr data; int err = 0; if(sent_que_.take(data, true)) { want_bytes_in_ = data->get_rest(); if(!cancel_io_) { err = inner_write_bulk(fd, data, mem, bulk_size); { file_reader_ptr fr = dynamic_cast(data); if(fr) { utils::to_log(LOG_LEVEL_DEBUG, "Sent file(%s) with error: %d.\n", fr->path_file(), err); } } } else { utils::to_log(LOG_LEVEL_DEBUG, "Cancel write with %u bytes.\n", data->get_rest()); } data->release(); if(err) { utils::to_log(LOG_LEVEL_FATAL, "write bulk failed: %d(%s)\n", errno, strerror(errno)); break; } } } mem->release(); statu_in_ = WORKER_STATUS_NOT_START; } void async_usb_gadget::thread_pump_task(void) { dyn_mem_ptr prev = nullptr, data = nullptr, reply = nullptr; bool in_err_statu = false; uint32_t required = 0, used = 0, restore_err_cnt = 0, max_que = 0; data_holder *dh = nullptr; LPPACK_BASE pack = nullptr; uint64_t total_size = 0; while(run_) { data = nullptr; statu_task_ = WORKER_STATUS_IDLE; task_cmd_ = 0; if(cmd_que_.take(data, true) && data) { bool pool = true; statu_task_ = WORKER_STATUS_BUSY; if(max_que < cmd_que_.size()) max_que = cmd_que_.size(); if(cancel_io_) { if(prev) { utils::to_log(LOG_LEVEL_DEBUG, "Cancel task previous packet with %u bytes.\n", prev->get_rest()); prev->release(); prev = nullptr; } if(dh) { utils::to_log(LOG_LEVEL_DEBUG, "Cancel task holder need %u bytes.\n", dh->get_required()); dh->cancel(); dh->release(); dh = nullptr; } if(reply) { reply->release(); reply = nullptr; } want_bytes_task_ = 0; utils::to_log(LOG_LEVEL_DEBUG, "Cancel task with %u bytes.\n", data->get_rest()); data->release(); continue; } if(prev) { if(prev->get_session_id() != data->get_session_id()) { utils::to_log(LOG_LEVEL_DEBUG, "Discard previous packet for the session ID(%u) is not equal to now(%u).\n", prev->get_session_id(), data->get_session_id()); prev->release(); } else { utils::to_log(LOG_LEVEL_ALL, "Combine partial packet with length %u and %u ...\n", prev->get_rest(), data->get_rest()); *prev += *data; free_io_buffer(data); data = prev; pool = false; } prev = nullptr; } if(dh && dh->get_session_id() != data->get_session_id()) { dh->cancel(); dh->release(); dh = nullptr; } if (!online_ || data->get_session_id() != session_id_) { utils::to_log(LOG_LEVEL_DEBUG, "Discard task for session ID(%u) is not equal now(%u) or is offline.\n", data->get_session_id(), session_id_); if(pool) free_io_buffer(data); else data->release(); continue; } do { packet_data_base_ptr pack_data = nullptr; data_source_ptr ds = nullptr; bool notify_reply_ok = false; if(dh == nullptr) { if (!online_ || data->get_rest() < sizeof(PACK_BASE)) break; else { pack = (LPPACK_BASE)data->ptr(); task_cmd_ = pack->cmd; task_packet_id_ = pack->pack_id; want_bytes_task_ = 0; used = data->get_rest(); reply = handle_bulk_command(data, &used, &pack_data); notify_reply_ok = used & (INT32_MAX + 1); used &= INT32_MAX; if(pack_data) { pack_data->set_session_id(data->get_session_id()); dh = dynamic_cast(pack_data); if(!dh) { ds = dynamic_cast(pack_data); if(!ds) pack_data->release(); else want_bytes_task_ = ds->get_rest(); } else { total_size = 0; want_bytes_task_ = dh->get_required(); } } } } else { uint32_t len = data->get_rest(); int err = dh->put_data(data->ptr(), &len); if(len < data->get_rest()) { utils::to_log(LOG_LEVEL_WARNING, "Put partial data %u/%u! at +%08X with error %d\n", len, data->get_rest(), total_size, err); } total_size += len; if(dh->is_complete() || err) { reply = dyn_mem::memory(sizeof(PACK_BASE)); reply->set_session_id(dh->get_session_id()); pack = (LPPACK_BASE)reply->ptr(); BASE_PACKET_REPLY(*pack, dh->get_packet_command() + 1, dh->get_packet_id(), err); reply->set_len(sizeof(PACK_BASE)); utils::to_log(LOG_LEVEL_ALL, "Received large data(%p) with error(Max queue size is %u): %d.\n", total_size, max_que, err); dh->release(); dh = nullptr; want_bytes_task_ = 0; } else { want_bytes_task_ = dh->get_required(); } used = len; if (err) { in_err_statu = true; restore_err_cnt = 0; } } // first reply the packet ... if(reply) { // encrypt ... dyn_mem_ptr enc = packet_encrypt(reply, enc_head_, enc_payload_, enc_data_); if (enc) { reply->release(); reply = enc; } reply->set_session_id(data->get_session_id()); write_bulk(dynamic_cast(reply)); if(notify_reply_ok) handle_bulk_command(data, nullptr, nullptr); reply->release(); reply = nullptr; } // second send appendix data ... if (ds) { write_bulk(ds); ds->release(); ds = nullptr; } data->used(used); }while(used && data->get_rest()); if(data->get_rest()) { if(pool) { prev = dyn_mem::memory(unit_out_); prev->put(data->ptr(), data->get_rest()); free_io_buffer(data); } else { prev = data; } } else { if(!dh) { task_cmd_ = task_packet_id_ = want_bytes_task_ = 0; } if(pool) free_io_buffer(data); else data->release(); } } } if(prev) prev->release(); if (data) data->release(); if (reply) reply->release(); if (dh) dh->release(); } int async_usb_gadget::stop(void) { run_ = false; if(dev_) { dev_->pull_down(); dev_->close_device(); dev_->release(); dev_ = nullptr; } #ifdef MEM_POOL io_buf_->stop(); #else io_buf_.trigger(); #endif wait_in_.trigger(); wait_out_.trigger(); cmd_que_.trigger(); sent_que_.trigger(); return 0; } int async_usb_gadget::write_bulk(data_source_ptr data) { if(data->get_session_id() != session_id_ || !online_) { utils::to_log(LOG_LEVEL_DEBUG, "Discard reply packet for the session ID(%u) is not equal current session(%u) or is offline.\n", data->get_session_id(), session_id_); // data->release(); return sent_que_.size(); } else { int quel = 0; data->add_ref(); quel = sent_que_.save(data, true); return quel; } } int async_usb_gadget::last_error(void) { return last_err_; } uint32_t async_usb_gadget::current_session(void) { return session_id_; } void async_usb_gadget::get_ep_status(LPEP0REPLYSTATUS status) { status->in_status = statu_in_; status->out_status = statu_out_; status->task_cnt = cmd_que_.size(); status->task_cmd = task_cmd_; status->task_pack_id = task_packet_id_; status->task_required_bytes = want_bytes_task_; status->packets_to_sent = sent_que_.size(); status->bytes_to_sent = want_bytes_in_; }