// thread.h : the base utility for thread pool management // // Author: Gongbing // // Create: 2017-05-20 #pragma once #ifndef _INCLUDED_REF_ #define _INCLUDED_REF_ #include "../ref/ref.h" #endif #pragma warning(disable: 4996) //////////////////////////////////////////////////////////////////////////////////////////// // thread ... // // purpose: if you CreateThrad and TerminateThread then, the new thread maybe occupying the global // RTL_CRITICAL_SECTION variable in ntdll.dll module. In this situation, all threads that // created by CreateThread will no longer have opportunity to run!!! // This class is providing you to TerminateThread safely namespace thread_util { enum _io_direction { IO_WRITE = 1, IO_READ, IO_CONTROL, IO_USER_TYPE_BASE = 10, }; #pragma pack(push) #pragma pack(1) typedef struct _ovl_extension : OVERLAPPED { int io_direction; // _io_direction size_t buf_len; // bytes of payload buffer size_t data_len; // valid bytes of payload size_t io_pos; // current IO position char payload[8]; // payload data buffer char* user_payload(void) { return payload; } }OVLEX, *LPOVLEX; #pragma pack(pop) __declspec(novtable) struct ILock : public ref_util::IRef { COM_API_DECLARE(void, lock(void)); COM_API_DECLARE(bool, try_lock(void)); COM_API_DECLARE(void, unlock(void)); }; __declspec(novtable) struct IThreadWorker : public ref_util::IRef { COM_API_DECLARE(void*/*void* tls*/, thread_enter(void)); // you can initialize tls in this method COM_API_DECLARE(void, thread_work(void*/*bind data*/ task_key, LPOVLEX task_data, unsigned data_size, DWORD err_code, void* enter_data)); COM_API_DECLARE(void, thread_exit(void* enter_data/*tls*/)); }; __declspec(novtable) struct IThreadPool : public ref_util::IRef { COM_API_DECLARE(long, add_task(void* task_key, void* task_data, unsigned data_size)); // return 0 when success COM_API_DECLARE(long, stop(void)); COM_API_DECLARE(long, worker_threads(void)); COM_API_DECLARE(long, busy_threads(void)); COM_API_DECLARE(long, waiting_task(void)); }; __declspec(novtable) struct IEventDispatchThreadPool : public IThreadPool // dispach thread pool through by hardware-event mechanism { COM_API_DECLARE(long, bind_object(void* object, void* bind_key)); COM_API_DECLARE(long, remove_object(void* object, void* bind_key)); }; class simple_lock { ILock *lck_; public: simple_lock(ILock* lck) { lck_ = lck; if (lck_) { //lck_->add_ref(); lck_->lock(); } } ~simple_lock() { if (lck_) { lck_->unlock(); //lck_->release(); } } }; // for reducing the times of memory copying, request memory from inner - added on 2019-07-28 PORT_API(LPOVLEX) inner_buffer_to_OVLEX_head(void* buf/*'buf' is returned by request_inner_buffer*/); PORT_API(char*) request_inner_buffer(size_t bytes); PORT_API(void) free_inner_buffer(void* buf); // free the buffer returned by 'request_inner_buffer' PORT_API(void) free_inner_buffer(LPOVLEX buf); // free the buffer returned by 'request_inner_buffer' PORT_API(HANDLE) create_thread(unsigned(__stdcall* thread)(void*), void* param, bool suspend = false, unsigned allow_cpu_mask = -1); // return thread id PORT_API(ILock*) create_lock(void); PORT_API(IThreadPool*) create_thread_pool(IThreadWorker* worker, int desired_threads = 0, unsigned cpu_allow_mask = -1); PORT_API(IEventDispatchThreadPool*) create_event_thread_pool(IThreadWorker* worker, int desired_threads = 0, unsigned cpu_allow_mask = -1); // parameter of result: // data: (wchar_t*)thread name, maybe null // len: thread id // total: start address PORT_API(void) enum_threads(DWORD proc_id, inter_module_data::set_data result, void* param); PORT_API(LPVOID) get_thread_start_address(DWORD thread_id); PORT_API(DWORD) get_call_stack(DWORD ebp, DWORD *stack/*caller allocate the buffer*/, int depth = 6, void* ensure_module = NULL/*ensure the module which contains the address ensure_module in the stack, omit if stack_data was valid*/ , unsigned char* stack_data = NULL, int stack_bytes = 0); // return depth in stack PORT_API(DWORD) get_thread_call_stack(DWORD thread_id, DWORD *stack/*caller allocate the buffer*/, int depth = 6, void* ensure_module = NULL/*ensure the module which contains the address ensure_module in the stack*/); // return depth in stack PORT_API(void) set_thread_name(DWORD thread_id, const char* thread_name); // the parameter 'allow_cpu_mask' is is a bit vector in which each bit represents the processors that a thread is allowed to run on // 1 represent CPU0, 5 represent CPU0 and CPU2, and so on... enum _ict_state { ICT_STATE_NONE = 0, ICT_STATE_START_FAIL, ICT_STATE_RUNNING, ICT_STATE_COMPLETE, }; template // NOTE: class T must derived from ref_util::IRef !!! class invoke_class_thread : public ref_util::refer { void(__stdcall T::* handle_func)(void*); void *param_; // dynamic parameter should free in handle_func !!! T *obj_; HANDLE wait_; HANDLE thread_; unsigned thread_id_; unsigned affinity_mask_; _ict_state state_; unsigned start_tick_; // elapse milliseconds if state_ == ICT_STATE_COMPLETE, start time tick counts in others public: invoke_class_thread(T* obj, void(__stdcall T::* thread_func)(void*), void* param, unsigned allow_cpu_mask = -1) : state_(ICT_STATE_NONE), start_tick_(0) { if (obj) obj->add_ref(); obj_ = obj; param_ = param; handle_func = thread_func; wait_ = CreateEvent(NULL, TRUE, FALSE, NULL); thread_ = thread_util::create_thread(invoke_thread, this, false, allow_cpu_mask); if (thread_ == NULL) state_ = ICT_STATE_START_FAIL; else { thread_id_ = GetThreadId(thread_); WaitForSingleObject(wait_, INFINITE); } CloseHandle(wait_); wait_ = NULL; } protected: ~invoke_class_thread() { if (thread_) CloseHandle(thread_); if (obj_) obj_->release(); } private: static unsigned WINAPI invoke_thread(void* lp) { invoke_class_thread *obj = (invoke_class_thread*)lp; obj->add_ref(); obj->state_ = ICT_STATE_RUNNING; obj->start_tick_ = GetTickCount(); SetEvent(obj->wait_); (obj->obj_->*obj->handle_func)(obj->param_); obj->state_ = ICT_STATE_COMPLETE; obj->start_tick_ = GetTickCount() - obj->start_tick_; obj->release(); return 0; } public: unsigned thread_id(void) { return thread_id_; } HANDLE thread(bool take = false) { HANDLE h = thread_; if (take) thread_ = NULL; return h; } enum _ict_state thread_state(void) { return state_; } unsigned start_tick_count(bool elapse = false) { return elapse && state_ != ICT_STATE_COMPLETE ? GetTickCount() - start_tick_ : start_tick_; } int terminate(int reason = 0) { HANDLE h = OpenThread(THREAD_ALL_ACCESS, FALSE, thread_id_); int err = GetLastError(); if (h) { err = TerminateThread(h, reason) ? ERROR_SUCCESS : GetLastError(); CloseHandle(h); } return err; } }; }