资源监控在扫描线程中处理;线程页面化参数使用宏定义控制;减少CIS缓冲区数量(至4个)

This commit is contained in:
gb 2024-03-08 10:22:47 +08:00
parent e264cd6556
commit ca0ab06dcc
9 changed files with 160 additions and 60 deletions

View File

@ -42,7 +42,7 @@ protected:
size_t length;
};
const int v4l_buffer_count = 6;
const int v4l_buffer_count = 4;
int buf_size_ = 0;
int v4l_width = 3100;
int v4l_height = 3100;

View File

@ -36,9 +36,16 @@ imgproc_mgr::imgproc_mgr(device_option* devopts, void(*sender)(SENDER_PROTO), vo
)
: opts_(devopts), img_sender_(sender), sender_param_(sp)
, res_(res), res_param_(rp)
#ifndef USE_THREAD_PAGED_DATA
, raw_("cis-img")
#endif
{
ADD_THIS_JSON();
#ifndef USE_THREAD_PAGED_DATA
raw_.enable_wait_log(false);
#endif
if (opts_)
opts_->add_ref();
else
@ -119,7 +126,11 @@ void imgproc_mgr::process(image_processor* prc, std::vector<PROCIMGINFO>* in, st
void imgproc_mgr::send_image(LPPAGEDPARAM obj, LPPACKIMAGE head, cv::Mat& mat, void* info, size_t info_l, bool last)
{
PACKIMAGE h(*head);
#ifdef USE_THREAD_PAGED_DATA
std::shared_ptr<std::vector<uchar>> compd(obj->mean.encoder->encode(&h, mat));
#else
std::shared_ptr<std::vector<uchar>> compd(last_->encode(&h, mat));
#endif
image_packet_ptr ptr = nullptr;
if(last)
@ -131,14 +142,21 @@ void imgproc_mgr::send_image(LPPAGEDPARAM obj, LPPACKIMAGE head, cv::Mat& mat, v
h.prc_stage = head->prc_stage;
h.prc_time = head->prc_time;
}
ptr = new image_packet(&h, compd, obj->mean.scan_id, info, info_l);
#ifdef USE_THREAD_PAGED_DATA
ptr = new image_packet(&h, compd, obj->mean.scan_id, info, info_l);
ptr->set_session_id(obj->mean.session);
obj->mean.sender(ptr, obj->mean.sender_param);
#else
ptr = new image_packet(&h, compd, scan_id_, info, info_l);
ptr->set_session_id(session_id_);
img_sender_(ptr, sender_param_);
#endif
ptr->release();
}
void imgproc_mgr::real_dump_image(DUMP_PROTO)
{
#ifdef USE_THREAD_PAGED_DATA
if(arr)
{
for(auto& v: *arr)
@ -148,6 +166,16 @@ void imgproc_mgr::real_dump_image(DUMP_PROTO)
{
imgproc_mgr::send_image(obj, info, *mat, infoex, infexl, last);
}
#else
if(arr)
{
((imgproc_mgr*)param)->send_image(nullptr, *arr, last);
}
else
{
((imgproc_mgr*)param)->send_image(obj, info, *mat, infoex, infexl, last);
}
#endif
}
void imgproc_mgr::empty_dump_image(DUMP_PROTO)
{}
@ -156,8 +184,10 @@ void imgproc_mgr::start_workers(int cnt)
{
workers_.stop(nullptr);
#ifdef USE_THREAD_PAGED_DATA
for(auto& v: params_)
v->free();
#endif
auto thrd = [this](void* param) -> void
{
@ -178,7 +208,8 @@ void imgproc_mgr::start_workers(int cnt)
for(int i = 0; i < cnt; ++i)
{
LPPAGEDPARAM param = nullptr;
LPPAGEDPARAM param = (LPPAGEDPARAM)i;
#ifdef USE_THREAD_PAGED_DATA
if(i < params_.size())
param = params_[i];
else
@ -219,6 +250,8 @@ void imgproc_mgr::start_workers(int cnt)
param->mean.processors[ind++] = prc;
param->mean.processors[ind] = nullptr;
}
#endif
char n[40] = {0};
sprintf(n, "thread_worker%d", i + 1);
@ -234,14 +267,22 @@ uint32_t imgproc_mgr::add_busy_worker(int inc)
}
void imgproc_mgr::thread_worker(void* param)
{
LPPAGEDPARAM para = (LPPAGEDPARAM)param;
RAWIMG img;
int cpu = (para->mean.ind % (CPU_CORES - CPU_MAJOR_CNT)) + CPU_MINOR_0;
#ifdef USE_THREAD_PAGED_DATA
LPPAGEDPARAM para = (LPPAGEDPARAM)param;
int ind = para->mean.ind;
#else
int ind = (int)(long)param;
void(*dump)(DUMP_PROTO) = dump_img_ ? &imgproc_mgr::real_dump_image : &imgproc_mgr::empty_dump_image;
LPPAGEDPARAM para = (LPPAGEDPARAM)(void*)dump;
#endif
int cpu = (ind % (CPU_CORES - CPU_MAJOR_CNT)) + CPU_MINOR_0;
utils::to_log(LOG_LEVEL_DEBUG, "set image process thread %d to CPU %d = %d\n"
, para->mean.ind + 1, cpu, utils::set_cpu_affinity(cpu));
, ind + 1, cpu, utils::set_cpu_affinity(cpu));
add_busy_worker();
#ifdef USE_THREAD_PAGED_DATA
while(para->mean.run)
{
if(para->mean.res)
@ -265,9 +306,34 @@ void imgproc_mgr::thread_worker(void* param)
process(&img, para);
}
}
#else
while(run_)
{
if(res_)
{
while(!res_(TASK_IMG_PROCESSOR, true, 3000, res_param_))
{
if(!run_)
break;
}
if(!run_)
break;
}
if(raw_.take(img, true))
{
#ifdef REBUILD_IN_CIS_THREAD
if(img.img && img.imgs.size() == 0)
#else
if(img.img && !img.data)
#endif
break;
process(&img, para, ind);
}
}
#endif
add_busy_worker(-1);
}
void imgproc_mgr::process(RAWIMG* img, LPPAGEDPARAM param)
void imgproc_mgr::process(RAWIMG* img, LPPAGEDPARAM param, int thrd_sn)
{
if(img->img)
{
@ -275,20 +341,35 @@ void imgproc_mgr::process(RAWIMG* img, LPPAGEDPARAM param)
chronograph watch;
#ifdef REBUILD_IN_CIS_THREAD
void(*dump)(DUMP_PROTO) = (void(*)(DUMP_PROTO))param;
src = &img->imgs;
utils::to_log(LOG_LEVEL_ALL, "Rebuild paper %d spend %u milliseconds.\n", img->imgs[0].info.pos.paper_ind, img->imgs[0].info.prc_time);
#else
#ifdef USE_THREAD_PAGED_DATA
if(param->mean.dumpi)
#else
if(dump_img_)
#endif
{
cv::Mat mat(img->info.width, img->info.height, CV_8UC1, img->data->ptr());
send_image(param, &img->info, mat, nullptr, 0, false);
}
#ifdef USE_THREAD_PAGED_DATA
if(param->mean.rebld)
{
param->mean.rebld->do_rebuild(&img->info, img->data->ptr(), in);
utils::to_log(LOG_LEVEL_ALL, "Thread %d Rebuild paper %d spend %llu milliseconds.\n", param->mean.ind + 1, img->info.pos.paper_ind, watch.elapse_ms());
utils::to_log(LOG_LEVEL_ALL, "Thread %d Rebuild paper %d spend %llu milliseconds.\n", thrd_sn + 1, img->info.pos.paper_ind, watch.elapse_ms());
param->mean.dump(param, &in, nullptr, nullptr, nullptr, 0, false, param->mean.dump_param);
}
#else
if(do_rebuild_)
{
first_->do_rebuild(&img->info, img->data->ptr(), in);
utils::to_log(LOG_LEVEL_ALL, "Thread %d Rebuild paper %d spend %llu milliseconds.\n", thrd_sn + 1, img->info.pos.paper_ind, watch.elapse_ms());
dump(nullptr, &in, nullptr, nullptr, nullptr, 0, false, this);
}
#endif
else
{
PROCIMGINFO i;
@ -299,6 +380,7 @@ void imgproc_mgr::process(RAWIMG* img, LPPAGEDPARAM param)
img->data->release(); // page fault
#endif
#ifdef USE_THREAD_PAGED_DATA
for(auto& v: param->mean.processors)
{
if(!v)
@ -315,6 +397,20 @@ void imgproc_mgr::process(RAWIMG* img, LPPAGEDPARAM param)
param->mean.dump(param, src, nullptr, nullptr, nullptr, 0, false, param->mean.dump_param);
}
}
#else
for(auto& v: processors_)
{
if(v->is_enable())
{
process(v, src, dst);
src->clear();
swp = src;
src = dst;
dst = swp;
dump(param, src, nullptr, nullptr, nullptr, 0, false, this);
}
}
#endif
send_image(param, *src, true);
}
@ -331,14 +427,19 @@ void imgproc_mgr::process(RAWIMG* img, LPPAGEDPARAM param)
over.data = nullptr;
#endif
over.img = true;
#ifdef USE_THREAD_PAGED_DATA
for(auto& v: params_)
v->mean.que->save(over, true);
#else
for(int i = 0; i < working_cnt_; ++i)
raw_.save(over, true);
#endif
ptr->set_session_id(session_id_);
while((que = add_busy_worker(0)) > 1)
{
if(wait++ == 0)
utils::to_log(LOG_LEVEL_DEBUG, "Received scan completed (in thread %d) event while processing %u paper(s), wait ...\n", param->mean.ind + 1, que - 1);
utils::to_log(LOG_LEVEL_DEBUG, "Received scan completed (in thread %d) event while processing %u paper(s), wait ...\n", thrd_sn + 1, que - 1);
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
@ -415,9 +516,11 @@ int imgproc_mgr::clear(void)
v->release();
processors_.clear();
#ifdef USE_THREAD_PAGED_DATA
for(auto& v: params_)
delete v;
params_.clear();
#endif
return 0;
}
@ -434,20 +537,23 @@ int imgproc_mgr::process(LPPACKIMAGE info, dyn_mem_ptr data, bool img)
#ifdef REBUILD_IN_CIS_THREAD
if(img)
{
LPPAGEDPARAM paged = nullptr;
#ifdef USE_THREAD_PAGED_DATA
paged = params_[0];
#endif
if(dump_img_)
{
cv::Mat mat(info->width, info->height, CV_8UC1, data->ptr());
send_image(params_[0], info, mat, nullptr, 0, false);
send_image(paged, info, mat, nullptr, 0, false);
}
if(do_rebuild_)
{
chronograph watch;
first_->do_rebuild(info, data->ptr(), ri.imgs);
rebuild_cis += watch.elapse_ms();
rebuild_cis += ri.imgs[0].info.prc_time;
scan_count++;
utils::to_log(LOG_LEVEL_ALL, "Rebuild paper %d spend %llu milliseconds.\n", info->pos.paper_ind, watch.elapse_ms());
if(dump_img_)
send_image(params_[0], ri.imgs, false);
send_image(paged, ri.imgs, false);
}
else
{
@ -476,8 +582,11 @@ int imgproc_mgr::process(LPPACKIMAGE info, dyn_mem_ptr data, bool img)
#endif
ri.img = img;
// if(!img)
#ifdef USE_THREAD_PAGED_DATA
params_[ind]->mean.que->save(ri, true);
#else
raw_.save(ri, true);
#endif
++put_ind_;
return ret;
@ -486,12 +595,17 @@ int imgproc_mgr::process(LPPACKIMAGE info, dyn_mem_ptr data, bool img)
void imgproc_mgr::stop(void)
{
run_ = false;
#ifdef USE_THREAD_PAGED_DATA
for(auto& v: params_)
{
v->mean.run = false;
if(v->mean.que)
v->mean.que->trigger();
}
#else
for(int i = 0; i < working_cnt_; ++i)
raw_.trigger();
#endif
workers_.stop(nullptr);
}

View File

@ -13,6 +13,7 @@
#include <base/data.h>
#include <vector>
#define DUMP_PROTO union _page_thrd_data_4k* obj, std::vector<PROCIMGINFO>* arr, LPPACKIMAGE info, cv::Mat* mat, char* infoex, size_t infexl, bool last, void* param
class device_option;
@ -74,7 +75,11 @@ class imgproc_mgr : public sane_opt_provider
}
void free(void);
}PAGEDPARAM, *LPPAGEDPARAM;
#ifdef USE_THREAD_PAGED_DATA
std::vector<LPPAGEDPARAM> params_;
#else
safe_fifo<RAWIMG> raw_;
#endif
refer_guard<rebuild> first_;
refer_guard<img_encoder> last_;
@ -101,14 +106,17 @@ class imgproc_mgr : public sane_opt_provider
static bool sort_image_packet(image_packet_ptr l, image_packet_ptr r);
static data_source_ptr scan_finished_packet(uint32_t scanid, uint32_t err = 0);
static void process(image_processor* prc, std::vector<PROCIMGINFO>* in, std::vector<PROCIMGINFO>* out);
static void send_image(LPPAGEDPARAM obj, LPPACKIMAGE head, cv::Mat& mat, void* info = nullptr, size_t info_l = 0, bool last = true);
#ifdef USE_THREAD_PAGED_DATA
static
#endif
void send_image(LPPAGEDPARAM obj, LPPACKIMAGE head, cv::Mat& mat, void* info = nullptr, size_t info_l = 0, bool last = true);
static void real_dump_image(DUMP_PROTO);
static void empty_dump_image(DUMP_PROTO);
void start_workers(int cnt);
uint32_t add_busy_worker(int inc = 1);
void thread_worker(void* param);
void process(RAWIMG* img, LPPAGEDPARAM param);
void process(RAWIMG* img, LPPAGEDPARAM param, int thrd_sn);
void send_image(LPPAGEDPARAM obj, std::vector<PROCIMGINFO>& imgs, bool last);
public:

View File

@ -593,7 +593,6 @@ dyn_mem_ptr async_scanner::handle_scan_start(LPPACK_BASE pack, uint32_t* used, p
scan_id_ = pack->pack_id;
scan_err_ = 0;
reply_start_ = false;
res_mgr_->start();
auto receiver = [this](dyn_mem_ptr data, bool img, LPPACKIMAGE lpinfo) -> void
{
@ -618,7 +617,10 @@ dyn_mem_ptr async_scanner::handle_scan_start(LPPACK_BASE pack, uint32_t* used, p
cis_->close();
}
else
{
system("echo performance > /sys/devices/system/cpu/cpu5/cpufreq/scaling_governor");
res_mgr_->start();
}
BASE_PACKET_REPLY(*((LPPACK_BASE)reply->ptr()), pack->cmd + 1, pack->pack_id, scan_err_);
((LPPACK_BASE)reply->ptr())->payload_len = config.length() + 2;
strcpy(((LPPACK_BASE)reply->ptr())->payload, config.c_str());

View File

@ -2,7 +2,9 @@
resource_mgr::resource_mgr()
{
#ifdef USE_MONITOR_THREAD
thread_.reset(new std::thread(&resource_mgr::thread_monitor, this));
#endif
}
resource_mgr::~resource_mgr()
{
@ -47,6 +49,7 @@ std::string resource_mgr::task_type(_task task)
void resource_mgr::start(void)
{
printf("\tstart resource monitor with mem limit = %llu\n", mem_limit_);
monitor_ = true;
}
void resource_mgr::stop(void)
@ -59,6 +62,11 @@ void resource_mgr::set_memory_limit(uint64_t max_size)
}
bool resource_mgr::is_resource_enable(_task task, bool wait, int to_ms)
{
#ifndef USE_MONITOR_THREAD
if(task == TASK_CAPTURER)
utils::get_memory_usage(nullptr, (uint64_t*)&mem_now_, nullptr);
#endif
if(wait && mem_now_ > mem_limit_)
{
chronograph watch;

View File

@ -8,12 +8,14 @@
#include <base/utils.h>
#include <base/packet.h>
// #define USE_MONITOR_THREAD
class resource_mgr
{
volatile bool run_ = true;
volatile bool monitor_ = false;
uint64_t mem_limit_ = SIZE_GB(2);
uint64_t mem_now_ = 0;
uint64_t mem_limit_ = SIZE_GB(3);
volatile uint64_t mem_now_ = 0;
std::unique_ptr<std::thread> thread_;
void thread_monitor(void);
@ -31,38 +33,3 @@ public:
bool is_resource_enable(_task task, bool wait = false, int to_ms = 0);
};
// watch.reset();
// bool first = true;
// while(scanning_)
// {
// if(res_(TASK_CAPTURER, true, 3))
// {
// if(!first)
// {
// uint64_t now = 0;
// utils::get_memory_usage(nullptr, &now, nullptr);
// utils::to_log(LOG_LEVEL_DEBUG, "Resources OK: %lld\n", now);
// }
// break;
// }
// if(first)
// {
// uint64_t now = 0;
// first = false;
// utils::get_memory_usage(nullptr, &now, nullptr);
// utils::to_log(LOG_LEVEL_WARNING, "Resources have reached their maximum limit: %lld\n", now);
// }
// if(watch.elapse_s() >= 3)
// {
// uint64_t now = 0;
// utils::get_memory_usage(nullptr, &now, nullptr);
// utils::to_log(LOG_LEVEL_FATAL, "Resources is not enough(memory = %lld) for continue scanning, exit scanning now.\n", now);
// auto_scan_ = scanning_ = false;
// err = SCANNER_ERR_INSUFFICIENT_MEMORY;
// break;
// }
// std::this_thread::sleep_for(std::chrono::milliseconds(3));
// }

View File

@ -402,6 +402,7 @@ public:
#define FUNCTION_PROTO_PARAMETERS dyn_mem_ptr, uint32_t*, packet_data_base_ptr*
#define FUNCTION_PROTO_COMMAND_HANDLE dyn_mem_ptr(FUNCTION_PROTO_PARAMETERS)
// #define USE_THREAD_PAGED_DATA
#define REBUILD_IN_CIS_THREAD
#define SENDER_PROTO data_source_ptr ptr, void* param

View File

@ -8,9 +8,9 @@
#include <math.h> // for fabs
#define SIZE_KB(n) ((n) * 1024)
#define SIZE_MB(n) SIZE_KB((n) * 1024)
#define SIZE_GB(n) SIZE_MB((n) * 1024)
#define SIZE_KB(n) ((long)(n) * 1024)
#define SIZE_MB(n) SIZE_KB((long)(n) * 1024)
#define SIZE_GB(n) SIZE_MB((long)(n) * 1024)
#define USEC_2_NS(us) ((long)(us) * 1000)
#define MSEC_2_US(ms) ((long)(ms) * 1000)

View File

@ -60,8 +60,8 @@ add_packagedirs("sdk")
add_defines("BUILD_AS_DEVICE")
add_defines("VER_MAIN=2")
add_defines("VER_FAMILY=200")
add_defines("VER_DATE=20240301")
add_defines("VER_BUILD=20")
add_defines("VER_DATE=20240308")
add_defines("VER_BUILD=8")
target("conf")
set_kind("phony")