diff --git a/dlib/all/source.cpp b/dlib/all/source.cpp index 9e53db416..e997c81b4 100644 --- a/dlib/all/source.cpp +++ b/dlib/all/source.cpp @@ -41,6 +41,7 @@ #include "../threads/threads_kernel_1.cpp" #include "../threads/threads_kernel_2.cpp" #include "../threads/threads_kernel_shared.cpp" +#include "../threads/thread_pool_extension.cpp" #include "../timer/timer_kernel_2.cpp" #include "../stack_trace.cpp" diff --git a/dlib/threads.h b/dlib/threads.h index e91cb06cb..319c758ff 100644 --- a/dlib/threads.h +++ b/dlib/threads.h @@ -14,6 +14,7 @@ #include "threads/threaded_object_extension.h" #include "threads/thread_specific_data_extension.h" #include "threads/thread_function_extension.h" +#include "threads/thread_pool_extension.h" #endif // DLIB_THREADs_ diff --git a/dlib/threads/thread_pool_extension.cpp b/dlib/threads/thread_pool_extension.cpp new file mode 100644 index 000000000..0df2c8888 --- /dev/null +++ b/dlib/threads/thread_pool_extension.cpp @@ -0,0 +1,221 @@ +// Copyright (C) 2008 Davis E. King (davisking@users.sourceforge.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_THREAD_POOl_CPP__ +#define DLIB_THREAD_POOl_CPP__ + +#include "thread_pool_extension.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + thread_pool:: + thread_pool ( + unsigned long num_threads + ) : + task_done_signaler(m), + task_ready_signaler(m), + we_are_destructing(false) + { + tasks.expand(num_threads); + for (unsigned long i = 0; i < num_threads; ++i) + { + register_thread(*this, &thread_pool::thread); + } + + start(); + } + +// ---------------------------------------------------------------------------------------- + + thread_pool:: + ~thread_pool() + { + {auto_mutex M(m); + we_are_destructing = true; + task_ready_signaler.broadcast(); + } + + wait(); + } + +// ---------------------------------------------------------------------------------------- + + unsigned long thread_pool:: + num_threads_in_pool ( + ) const + { + return tasks.size(); + } + +// ---------------------------------------------------------------------------------------- + + void thread_pool:: + wait_for_task ( + uint64 task_id + ) const + { + auto_mutex M(m); + const unsigned long idx = task_id_to_index(task_id); + while (tasks[idx].task_id == task_id) + task_done_signaler.wait(); + } + +// ---------------------------------------------------------------------------------------- + + void thread_pool:: + wait_for_all_tasks ( + ) const + { + const thread_id_type thread_id = get_thread_id(); + + auto_mutex M(m); + bool found_task = true; + while (found_task) + { + found_task = false; + for (unsigned long i = 0; i < tasks.size(); ++i) + { + // If task bucket i has a task that is currently supposed to be processed + // and it originated from the calling thread + if (tasks[i].is_empty() == false && tasks[i].thread_id == thread_id) + { + found_task = true; + break; + } + } + + if (found_task) + task_done_signaler.wait(); + } + } + +// ---------------------------------------------------------------------------------------- + + bool thread_pool:: + is_worker_thread ( + const thread_id_type id + ) const + { + for (unsigned long i = 0; i < worker_thread_ids.size(); ++i) + { + if (worker_thread_ids[i] == id) + return true; + } + return false; + } + +// ---------------------------------------------------------------------------------------- + + void thread_pool:: + thread ( + ) + { + { + // save the id of this worker thread into worker_thread_ids + auto_mutex M(m); + thread_id_type id = get_thread_id(); + worker_thread_ids.push_back(id); + } + + task_state_type task; + while (we_are_destructing == false) + { + long idx = 0; + + // wait for a task to do + { auto_mutex M(m); + while ( (idx = find_ready_task()) == -1 && we_are_destructing == false) + task_ready_signaler.wait(); + + if (we_are_destructing) + break; + + tasks[idx].is_being_processed = true; + task = tasks[idx]; + } + + // now do the task + if (task.mfp0) + task.mfp0(); + else if (task.mfp1) + task.mfp1(task.arg1); + else if (task.mfp2) + task.mfp2(task.arg1, task.arg2); + + // Now let others know that we finished the task. We do this + // by clearing out the state of this task + { auto_mutex M(m); + tasks[idx].is_being_processed = false; + tasks[idx].task_id = 0; + tasks[idx].mfp0.clear(); + tasks[idx].mfp1.clear(); + tasks[idx].mfp2.clear(); + tasks[idx].arg1 = 0; + tasks[idx].arg2 = 0; + task_done_signaler.broadcast(); + } + + } + } + +// ---------------------------------------------------------------------------------------- + + long thread_pool:: + find_empty_task_slot ( + ) const + { + for (unsigned long i = 0; i < tasks.size(); ++i) + { + if (tasks[i].is_empty()) + return i; + } + + return -1; + } + +// ---------------------------------------------------------------------------------------- + + long thread_pool:: + find_ready_task ( + ) const + { + for (unsigned long i = 0; i < tasks.size(); ++i) + { + if (tasks[i].is_ready()) + return i; + } + + return -1; + } + +// ---------------------------------------------------------------------------------------- + + uint64 thread_pool:: + make_next_task_id ( + long idx + ) + { + uint64 id = tasks[idx].next_task_id * tasks.size() + idx; + tasks[idx].next_task_id += 1; + return id; + } + +// ---------------------------------------------------------------------------------------- + + unsigned long thread_pool:: + task_id_to_index ( + uint64 id + ) const + { + return id%tasks.size(); + } + +// ---------------------------------------------------------------------------------------- + +} + + +#endif // DLIB_THREAD_POOl_CPP__ + diff --git a/dlib/threads/thread_pool_extension.h b/dlib/threads/thread_pool_extension.h new file mode 100644 index 000000000..2669faa2c --- /dev/null +++ b/dlib/threads/thread_pool_extension.h @@ -0,0 +1,319 @@ +// Copyright (C) 2008 Davis E. King (davisking@users.sourceforge.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_THREAD_POOl_H__ +#define DLIB_THREAD_POOl_H__ + +#include "thread_pool_extension_abstract.h" +#include "dlib/member_function_pointer.h" +#include "threads_kernel.h" +#include "auto_mutex_extension.h" +#include "multithreaded_object_extension.h" +#include "../uintn.h" +#include "dlib/array.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + class thread_pool : private multithreaded_object + { + /*! + CONVENTION + - num_threads_in_pool() == tasks.size() + - if (the destructor has been called) then + - we_are_destructing == true + - else + - we_are_destructing == false + + - m == the mutex used to protect everything in this object + - worker_thread_ids == an array that contains the thread ids for + all the threads in the thread pool + !*/ + + public: + explicit thread_pool ( + unsigned long num_threads + ); + + ~thread_pool( + ); + + void wait_for_task ( + uint64 task_id + ) const; + + unsigned long num_threads_in_pool ( + ) const; + + void wait_for_all_tasks ( + ) const; + + template + uint64 add_task ( + T& obj, + void (T::*funct)() + ) + { + auto_mutex M(m); + const thread_id_type my_thread_id = get_thread_id(); + + // find a thread that isn't doing anything + long idx = find_empty_task_slot(); + if (idx == -1 && is_worker_thread(my_thread_id)) + { + // this function is being called from within a worker thread and there + // aren't any other worker threads free so just perform the task right + // here + + m.unlock(); + (obj.*funct)(); + + // return a task id that is both non-zero and also one + // that is never normally returned. This way calls + // to wait_for_task() will never block given this id. + return 1; + } + + // wait until there is a thread that isn't doing anything + while (idx == -1) + { + task_done_signaler.wait(); + idx = find_empty_task_slot(); + } + + tasks[idx].thread_id = my_thread_id; + tasks[idx].task_id = make_next_task_id(idx); + tasks[idx].mfp0.set(obj,funct); + + task_ready_signaler.signal(); + + return tasks[idx].task_id; + } + + template + uint64 add_task ( + T& obj, + void (T::*funct)(long), + long arg1 + ) + { + auto_mutex M(m); + const thread_id_type my_thread_id = get_thread_id(); + + // find a thread that isn't doing anything + long idx = find_empty_task_slot(); + if (idx == -1 && is_worker_thread(my_thread_id)) + { + // this function is being called from within a worker thread and there + // aren't any other worker threads free so just perform the task right + // here + + m.unlock(); + (obj.*funct)(arg1); + + // return a task id that is both non-zero and also one + // that is never normally returned. This way calls + // to wait_for_task() will never block given this id. + return 1; + } + + // wait until there is a thread that isn't doing anything + while (idx == -1) + { + task_done_signaler.wait(); + idx = find_empty_task_slot(); + } + + tasks[idx].thread_id = my_thread_id; + tasks[idx].task_id = make_next_task_id(idx); + tasks[idx].mfp1.set(obj,funct); + tasks[idx].arg1 = arg1; + + task_ready_signaler.signal(); + + return tasks[idx].task_id; + } + + template + uint64 add_task ( + T& obj, + void (T::*funct)(long,long), + long arg1, + long arg2 + ) + { + auto_mutex M(m); + const thread_id_type my_thread_id = get_thread_id(); + + // find a thread that isn't doing anything + long idx = find_empty_task_slot(); + if (idx == -1 && is_worker_thread(my_thread_id)) + { + // this function is being called from within a worker thread and there + // aren't any other worker threads free so just perform the task right + // here + + m.unlock(); + (obj.*funct)(arg1, arg2); + + // return a task id that is both non-zero and also one + // that is never normally returned. This way calls + // to wait_for_task() will never block given this id. + return 1; + } + + // wait until there is a thread that isn't doing anything + while (idx == -1) + { + task_done_signaler.wait(); + idx = find_empty_task_slot(); + } + + tasks[idx].thread_id = my_thread_id; + tasks[idx].task_id = make_next_task_id(idx); + tasks[idx].mfp2.set(obj,funct); + tasks[idx].arg1 = arg1; + tasks[idx].arg2 = arg2; + + task_ready_signaler.signal(); + + return tasks[idx].task_id; + } + + private: + + bool is_worker_thread ( + const thread_id_type id + ) const; + /*! + requires + - m is locked + ensures + - if (thread with given id is one of the thread pool's worker threads) then + - returns true + - else + - returns false + !*/ + + void thread ( + ); + /*! + this is the function that executes the threads in the thread pool + !*/ + + long find_empty_task_slot ( + ) const; + /*! + requires + - m is locked + ensures + - if (there is currently a empty task slot) then + - returns the index of that task slot in tasks + - there is a task slot + - else + - returns -1 + !*/ + + long find_ready_task ( + ) const; + /*! + requires + - m is locked + ensures + - if (there is currently a task to do) then + - returns the index of that task in tasks + - else + - returns -1 + !*/ + + uint64 make_next_task_id ( + long idx + ); + /*! + requires + - m is locked + - 0 <= idx < tasks.size() + ensures + - returns the next index to be used for tasks that are placed in + tasks[idx] + !*/ + + unsigned long task_id_to_index ( + uint64 id + ) const; + /*! + requires + - m is locked + ensures + - returns the index in tasks corresponding to the given id + !*/ + + struct task_state_type + { + task_state_type() : is_being_processed(false), task_id(0), next_task_id(2), arg1(0), arg2(0) {} + + bool is_ready () const + /*! + ensures + - if (is_empty() == false && no thread is currently processing this task) then + - returns true + - else + - returns false + !*/ + { + return !is_being_processed && !is_empty(); + } + + bool is_empty () const + /*! + ensures + - if (this task state is empty. i.e. it doesn't contain a task to be processed) then + - returns true + - else + - returns false + !*/ + { + return task_id == 0; + } + + bool is_being_processed; // true when a thread is working on this task + uint64 task_id; // the id of this task. 0 means this task is empty + thread_id_type thread_id; // the id of the thread that requested this task + + uint64 next_task_id; + + long arg1; + long arg2; + + member_function_pointer<>::kernel_1a_c mfp0; + member_function_pointer::kernel_1a_c mfp1; + member_function_pointer::kernel_1a_c mfp2; + + }; + + array::expand_1c_c tasks; + array::expand_1c_c worker_thread_ids; + + mutex m; + signaler task_done_signaler; + signaler task_ready_signaler; + bool we_are_destructing; + + // restricted functions + thread_pool(thread_pool&); // copy constructor + thread_pool& operator=(thread_pool&); // assignment operator + + }; + +} + +// ---------------------------------------------------------------------------------------- + +#ifdef NO_MAKEFILE +#include "thread_pool_extension.cpp" +#endif + +#endif // DLIB_THREAD_POOl_H__ + + diff --git a/dlib/threads/thread_pool_extension_abstract.h b/dlib/threads/thread_pool_extension_abstract.h new file mode 100644 index 000000000..4fc2bbb13 --- /dev/null +++ b/dlib/threads/thread_pool_extension_abstract.h @@ -0,0 +1,151 @@ +// Copyright (C) 2008 Davis E. King (davisking@users.sourceforge.net) +// License: Boost Software License See LICENSE.txt for the full license. +#undef DLIB_THREAD_POOl_ABSTRACT_H__ +#ifdef DLIB_THREAD_POOl_ABSTRACT_H__ + +#include "threads_kernel_abstract.h" +#include "../uintn.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + class thread_pool + { + /*! + WHAT THIS OBJECT REPRESENTS + This object represents a group of threads which you can + submit tasks to and then wait for those tasks to be completed. + !*/ + + public: + explicit thread_pool ( + unsigned long num_threads + ); + /*! + ensures + - num_threads_in_pool() == num_threads + throws + - std::bad_alloc + - dlib::thread_error + the constructor may throw this exception if there is a problem + gathering resources to create threading objects. + !*/ + + ~thread_pool( + ); + /*! + ensures + - all resources allocated by *this have been freed. + !*/ + + unsigned long num_threads_in_pool ( + ) const; + /*! + ensures + - returns the number of threads contained in this thread pool. That is, returns + the maximum number of tasks that this object will process concurrently. + !*/ + + template + uint64 add_task ( + T& obj, + void (T::*funct)() + ); + /*! + requires + - funct == a valid member function pointer for class T + ensures + - if (the thread calling this function is actually one of the threads in the + thread pool and there aren't any free threads available) then + - calls (obj.*funct)() within the calling thread and returns + when it finishes + - else + - the call to this function blocks until there is a free thread in the pool + to process this new task. Once a free thread is available the task + is handed off to that thread which then calls (obj.funct)() + - returns a task id that can be used by this->wait_for_task() to wait + for the submitted task to finish. + !*/ + + template + uint64 add_task ( + T& obj, + void (T::*funct)(long), + long arg1 + ); + /*! + requires + - funct == a valid member function pointer for class T + ensures + - if (the thread calling this function is actually one of the threads in the + thread pool and there aren't any free threads available) then + - calls (obj.*funct)(arg1) within the calling thread and returns + when it finishes + - else + - the call to this function blocks until there is a free thread in the pool + to process this new task. Once a free thread is available the task + is handed off to that thread which then calls (obj.funct)(arg1) + - returns a task id that can be used by this->wait_for_task() to wait + for the submitted task to finish. + !*/ + + template + uint64 add_task ( + T& obj, + void (T::*funct)(long,long), + long arg1, + long arg2 + ); + /*! + requires + - funct == a valid member function pointer for class T + ensures + - if (the thread calling this function is actually one of the threads in the + thread pool and there aren't any free threads available) then + - calls (obj.*funct)(arg1,arg2) within the calling thread and returns + when it finishes + - else + - the call to this function blocks until there is a free thread in the pool + to process this new task. Once a free thread is available the task + is handed off to that thread which then calls (obj.funct)(arg1,arg2) + - returns a task id that can be used by this->wait_for_task() to wait + for the submitted task to finish. + !*/ + + void wait_for_task ( + uint64 task_id + ) const; + /*! + ensures + - if (there is currently a task with the given id being executed in the thread pool) then + - the call to this function blocks until the task with the given id is complete + - else + - the call to this function returns immediately + !*/ + + void wait_for_all_tasks ( + ) const; + /*! + ensures + - the call to this function blocks until all tasks which were submitted + to the thread pool by the thread that is calling this function have + finished. + !*/ + + private: + + // restricted functions + thread_pool(thread_pool&); // copy constructor + thread_pool& operator=(thread_pool&); // assignment operator + }; + +} + +// ---------------------------------------------------------------------------------------- + +#endif // DLIB_THREAD_POOl_ABSTRACT_H__ + + +