[C++14]线程池 铜牌收录

线程池

概述

  • 线程池是一种数据处理形式,将一定数量的线程组合起来,用统一的方式调度,达到线程复用的效果,运行时将任务加入任务队列,线程自动调度执行

为什么使用线程池

  • 重复利用线程,减小创建和销毁线程带来的开销
  • 提高相应速度,任务无需等待线程创建(在线程池内线程不足时等待时间将延长,可用动态线程池解决)
  • 提高线程的可管理性

本文概述

本文实现的是一个最低可在 C++14 使用的线程池,其中,线程池的大小在运行时是固定的,不能动态改变,可能出现几个长时间任务阻塞线程池的情况

流程图

程序

头文件 ThreadPool.h

////////////////////////////////////////////
// 程序:线程池
// 作者:侧云<2047615717@qq.com>
// 编译环境:Visual Studio 2022,最低 C++14
// 编写日期:2023-7-15

#pragma once

#include <future>
#include <vector>
#include <set>

struct Thread_Task
{
	using TimePointT = typename ::std::chrono::steady_clock::time_point;	// 高精度时钟的时间点类型
	using TimeT = typename ::std::chrono::steady_clock::duration;			// 高精度时钟的时间段类型
	using PriorityT = int;													// 优先级类型
	using FuncionT = ::std::function<void()>;								// 任务函数类型

	TimePointT expireTime;													// 超过该时间点为超时
	PriorityT priority = 0;													// 优先级,在本文实现中,越大代表越先
	bool bExecuteOnTimeout = false;											// 为 true 时,超时后仍然执行
	FuncionT func;															// 任务函数
};
constexpr bool operator<(const Thread_Task& task1, const Thread_Task& task2) noexcept { return task1.priority < task2.priority; }
constexpr bool operator>(const Thread_Task& task1, const Thread_Task& task2) noexcept { return task1.priority > task2.priority; }
// 用于比较优先级大小

class ThreadPool
{
	public:
		using MutexT = ::std::mutex;												// 互斥锁
		using ConditionVariableT = ::std::condition_variable;						// 环境变量
		using ThreadT = ::std::thread;												// 线程
		using ThreadPtrT = ThreadT*;												// 线程指针
		using ThreadVectorT = ::std::vector<ThreadPtrT>;							// 线程指针向量
		using TaskT = Thread_Task;													// 任务
		using TaskSetT = ::std::multiset<TaskT, ::std::greater<TaskT>>;				// 任务队列,greater 代表越大越靠前
		using ThreadNumT = ::uint8_t;												// 线程数量的类型,因为线程一般不超过 256 个
		using AtomicThreadNumT = ::std::atomic<ThreadNumT>;							// 原子的线程数量的类型
		using TaskNumT = ::uint32_t;												// 任务数量的类型,因为任务一般不超过 4294967296 个
		using UniqueLockT = ::std::unique_lock<MutexT>;								// 可解锁的锁,用于保护数据和环境变量的等待
		using LockGuardT = ::std::lock_guard<MutexT>;								// 不可解锁的锁,用于保护数据
		template<class _Ret> using PackagedTaskT = ::std::packaged_task<_Ret()>;	// 异步提供程序,使运行与结果分离
		using TimePointT = typename TaskT::TimePointT;								// 时间点类型
		using TimeT = typename TaskT::TimeT;										// 时间段类型
		using PriorityT = typename TaskT::PriorityT;								// 优先度类型
		using FuncionT = typename TaskT::FuncionT;									// 任务函数类型
	public:
		ThreadPool() noexcept {}
		ThreadPool(const ThreadPool&) = delete;										// 不可复制构造
		ThreadPool& operator=(const ThreadPool&) = delete;
		~ThreadPool() noexcept { exit(); }
	protected:																		// 以下为不检查,提供给内部的程序
		
		void init_unchecked(ThreadNumT num) noexcept								// 设置线程数
		{
			m_threadNum = num;
		}
		void start_unchecked() noexcept												// 线程池开始运行
		{
			m_bRunning = true;														// 设置正在运行
			m_bTerminate = false;													// 设置没有退出
			for (ThreadNumT i = 0; i < m_threadNum; ++i)							// 循环将线程加入向量,线程调用任务调度函数
				m_threads.push_back(new ThreadT(&ThreadPool::dispatch_task, this));
		}
		void resume_unchecked() noexcept							// 继续运行线程池
		{
			m_bRunning = true;										// 设置正在运行	
			m_pauseCondition.notify_all();							// 唤醒暂停的线程
		}
		void pause_no_wait_unchecked() noexcept						// 暂停线程池,不等待线程执行完当前任务
		{
			m_bRunning = false;										// 设置不在运行
			m_taskCondition.notify_all();							// 唤醒等待任务的线程,此时线程会进入 m_pauseCondition 的等待
		}
		void pause_unchecked() noexcept								// 暂停线程池,等待线程执行完当前任务
		{
			pause_no_wait_unchecked();
			UniqueLockT lock(m_mutex);
			if (m_runningNum)										// 若有运行中的线程,则等待
				m_waitCondition.wait(lock);
		}
		void clear_unchecked() noexcept								// 清除线程和任务数据,用于 exit
		{
			m_threads.clear();
			m_tasks.clear();
			m_threadNum = 0;
		}
		TaskNumT get_task_num_unchecked() const noexcept			// 获得剩余任务数量
		{
			return (TaskNumT)m_tasks.size();
		}
		bool is_running_unchecked() const noexcept					// 判断线程池是否处于运行状态
		{
			return m_bRunning && !m_bTerminate;
		}
		bool is_tasking_unchecked() const noexcept					// 判断线程池是否有任务没处理
		{
			return m_tasks.size();
		}
		bool is_all_done_unchecked() const noexcept					// 判断线程池是否已经完成所以任务
		{
			return m_tasks.empty();
		}
	public:															// 以下为检查,提供给外部的程序
		
		bool init(ThreadNumT num) noexcept
		{
			LockGuardT lock(m_mutex);								// 保护数据,防止多个线程同时运行
			if (!m_threads.empty())									// 只有线程池为空时可执行
				return false;
			init_unchecked(num);
			return true;
		}
		bool start() noexcept
		{
			LockGuardT lock(m_mutex);
			if (!m_threads.empty())									// 只有线程池为空时可执行
				return false;
			start_unchecked();
			return true;
		}
		void exit_no_wait() noexcept
		{
			m_bTerminate = true;									// 设置已退出
			pause_no_wait_unchecked();								// 原为暂停且不等待任务完成,但此时 m_bTerminate 会使线程结束而不是进入暂停状态
			clear_unchecked();										// 清除数据
		}
		void exit() noexcept
		{
			m_bTerminate = true;
			pause_unchecked();										// 原为暂停且等待任务完成,但此时 m_bTerminate 会使线程结束而不是进入暂停状态
			m_threadNum = 0;
			clear_unchecked();
		}
		bool resume() noexcept
		{
			if (m_bRunning || m_bTerminate)							// 若正在运行或已经退出,则失败
				return false;
			resume_unchecked();
			return true;
		}
		bool pause_no_wait() noexcept
		{
			if (!m_bRunning || m_bTerminate)						// 若不在运行或已经退出,则失败
				return false;
			pause_no_wait_unchecked();
			return true;
		}
		bool pause() noexcept
		{
			if (!m_bRunning || m_bTerminate)
				return false;
			pause_unchecked();
			return true;
		}
		ThreadNumT get_running_thread_num() const noexcept
		{
			return m_runningNum;
		}
		ThreadNumT get_thread_num() const noexcept
		{
			return m_threadNum;
		}
		TaskNumT get_task_num() noexcept
		{
			LockGuardT lock(m_mutex);			// 可能与其他改变 m_tasks 的函数同时运行,要加锁
			return get_task_num_unchecked();
		}
		TaskNumT get_task_num() const noexcept
		{
			return get_task_num_unchecked();
		}
		const MutexT& get_mutex() const noexcept
		{
			return m_mutex;
		}
		const TaskSetT& get_tasks() const noexcept
		{
			return m_tasks;
		}
		const ThreadVectorT& get_threads() const noexcept
		{
			return m_threads;
		}
		// 判断是否退出
		bool is_terminate() const noexcept							
		{
			return m_bTerminate;
		}
		bool is_running() noexcept
		{
			LockGuardT lock(m_mutex);
			return is_running_unchecked();
		}
		bool is_running() const noexcept
		{
			return is_running_unchecked();
		}
		bool is_tasking() noexcept
		{
			LockGuardT lock(m_mutex);
			return is_tasking_unchecked();
		}
		bool is_tasking() const noexcept
		{
			return is_tasking_unchecked();
		}
		bool is_all_done() noexcept
		{
			LockGuardT lock(m_mutex);
			return is_all_done_unchecked();
		}
		bool is_all_done() const noexcept
		{
			return is_all_done_unchecked();
		}
		// 以下为提供给用户派发任务的函数
		template<class _TFunc, class..._TArgs> auto execute(const TimeT& timeoutMs, PriorityT priority, bool bExecuteOnTimeout, _TFunc&& func, _TArgs&&... args) noexcept
		// -> ::std::future<decltype(::std::forward<_TFunc>(func)(::std::forward<_TArgs>(args)...))>
		// 在 C++14 以下,auto 需要在添加以上内容
		{
			using _RetType = decltype(::std::forward<_TFunc>(func)(::std::forward<_TArgs>(args)...));
			// 推导函数的返回值类型
			auto task = ::std::make_shared<PackagedTaskT<_RetType>>(::std::bind(::std::forward<_TFunc>(func), ::std::forward<_TArgs>(args)...));
			// bind 允许函数与其参数绑定,使用该返回值调用函数无需传参(使用 placeholders 是例外)
			// package_task 允许将函数与结果异步进行
			LockGuardT lock(m_mutex);
			m_tasks.insert({ bExecuteOnTimeout ? TimePointT{} : ::std::chrono::steady_clock::now() + timeoutMs, priority, bExecuteOnTimeout, [task]() { (*task)(); } });
			// 任务队列中插入新任务,其中
			// 超时时间点为 bExecuteOnTimeout ? TimePointT{} : ::std::chrono::steady_clock::now() + timeoutMs,仅在超时后不执行时计算超时时间点
			// 优先级为 priority
			// 超时是否执行为 bExecuteOnTimeout
			// 任务函数为 [task]() { (*task)(); } },一个 lambda 表达式,省去声明函数
			m_taskCondition.notify_one();
			// 唤醒一个线程来执行任务,也有可能都唤醒过了,但这不重要
			return task->get_future();
			// 返回储存结果的 future,任务执行完后即可从 future 中获得返回值
		}
		// 默认在填写超时时间时,超时后不执行
		template<class _TFunc, class..._TArgs> auto execute(const TimeT& timeoutMs, PriorityT priority, _TFunc&& func, _TArgs&&... args) noexcept
		{
			return execute(timeoutMs, priority, false, ::std::forward<_TFunc>(func), ::std::forward<_TArgs>(args)...);
		}
		// 超时时间变为超时时间点
		template<class _TFunc, class..._TArgs> auto execute(const TimePointT& timeout, PriorityT priority, bool bExecuteOnTimeout, _TFunc&& func, _TArgs&&... args) noexcept
		{
			using _RetType = decltype(::std::forward<_TFunc>(func)(::std::forward<_TArgs>(args)...));
			auto task = ::std::make_shared<PackagedTaskT<_RetType>>(::std::bind(::std::forward<_TFunc>(func), ::std::forward<_TArgs>(args)...));
			LockGuardT lock(m_mutex);
			m_tasks.insert({ timeout, priority, bExecuteOnTimeout, [task]() { (*task)(); } });
			m_taskCondition.notify_one();
			return task->get_future();
		}
		// 默认在填写超时时间点时,超时后不执行
		template<class _TFunc, class..._TArgs> auto execute(const TimePointT& timeout, PriorityT priority, _TFunc&& func, _TArgs&&... args) noexcept
		{
			return execute(timeout, priority, false, ::std::forward<_TFunc>(func), ::std::forward<_TArgs>(args)...);
		}
		// 默认在填写优先级时,超时后执行
		template<class _TFunc, class..._TArgs> auto execute(PriorityT priority, _TFunc&& func, _TArgs&&... args) noexcept
		{
			return execute(TimePointT{}, priority, true, ::std::forward<_TFunc>(func), ::std::forward<_TArgs>(args)...);
		}
		template<class _TDuration = ::std::chrono::milliseconds> bool wait_for_all_done(const _TDuration& time = _TDuration(-1)) noexcept
		// 缺省情况下,认为一直等待
		{
			UniqueLockT lock(m_mutex);
			if (time < _TDuration(0))																		// 若为一直等待
				return m_waitCondition.wait(lock, [this]() { return m_tasks.empty(); }), true;				// 逗号表达式,等待完成后返回 true
			else
				return !(bool)m_waitCondition.wait_for(lock, time, [this]() { return m_tasks.empty(); });	// 等待一段时间
		}
		template<class _TTimePoint = ::std::chrono::steady_clock::time_point> bool wait_until_all_done(const _TTimePoint& time = _TTimePoint {}) noexcept
		// 缺省情况下,认为一直等待
		{
			UniqueLockT lock(m_mutex);
			if (time <= _TTimePoint{})																		// 若为一直等待
				return m_waitCondition.wait(lock, [this]() { return m_tasks.empty(); }), true;
			else
				return !(bool)m_waitCondition.wait_until(lock, time, [this]() { return m_tasks.empty(); });	// 等待至时间点
		}
	protected:
		bool get_task(TaskT& task) noexcept									// 调度函数获取任务的函数
		{
			if (m_bTerminate || !m_bRunning)								// 仅在不暂停、不退出时运行,写在这里可以不用 lock
				return false;
			UniqueLockT lock(m_mutex);
			if (m_tasks.empty())											// 若当前没有任务,则进入等待,在派发任务后随机唤醒等待中的一个线程
				m_taskCondition.wait(lock, [this]() { return m_bTerminate || !m_bRunning || !m_tasks.empty(); });	// 若因退出或暂停或有任务,都暂停等待
			if (m_bTerminate || !m_bRunning)								// 若因退出或暂停,则函数失败
				return false;
			auto ptr = m_tasks.begin();										// 获取任务列表中排列第一的任务
			task = ::std::move(const_cast<TaskT&>(*ptr));					// 将任务移动赋值到 task,其中 multiset 中是 const 类型,使用 const_cast 解除
			m_tasks.erase(ptr);												// 获取后删除
			return true;
		}
		void dispatch_task() noexcept
		{
			while (!m_bTerminate)													// 仅在不退出时运行,设置退出后会退出循环,既销毁线程
			{
				if (!m_bRunning)													// 如果暂停,则进入等待,此时派发任务不会唤醒
				{
					UniqueLockT lock(m_mutex);
					m_pauseCondition.wait(lock);
				}
				TaskT task;
				bool got = get_task(task);											// 获取任务
				if (got)															// 若成功,则执行任务
				{
					++m_runningNum;													// 增加正在运行的线程数
					TimePointT expireTime = ::std::chrono::steady_clock::now();
					if (task.bExecuteOnTimeout || expireTime <= task.expireTime)
						task.func();
					--m_runningNum;
					if (m_runningNum == 0 && m_tasks.empty() || !m_bRunning)		// 若任务已经空,或暂停运行,则唤醒
						m_waitCondition.notify_all();
				}
			}
		}
	protected:
		MutexT m_mutex;								// 互斥锁,用于防止多线程同时运行导致数据出错
		ConditionVariableT m_taskCondition;			// 用于任务的环境变量,在派发任务时唤醒一个
		ConditionVariableT m_waitCondition;			// 用于等待的环境变量,在任务完成时唤醒所有
		ConditionVariableT m_pauseCondition;			// 用于暂停的环境变量,在设置运行时唤醒所有
		ThreadVectorT m_threads;					// 储存线程的向量
		TaskSetT m_tasks;							// 储存任务的队列
		AtomicThreadNumT m_runningNum = 0;			// 用于记录正在运行的线程数量的原子变量
		ThreadNumT m_threadNum = 0;					// 用于记录线程总数的变量
		bool m_bTerminate = false;					// 用于记录是否退出的变量
		bool m_bRunning = false;					// 用于记录是否运行的变量
};

测试程序

#include "ThreadPool.h"
#include <iostream>

using namespace std;

void func1(int i)
{
	printf("func1 %d\n", i);
}

int func2(int i)
{
	return i * i;
}

void func3()
{
	this_thread::sleep_for(1ms);			// 代替运行的时间
	printf("func3 ok\n");
}

int main()
{
	ThreadPool pool;
	pool.init(1);							// 设置线程数量,设置单线程,体现优先级
	for (int i = 0; i < 5; ++i)
		pool.execute(i, func1, i);
	pool.start();							// 开始线程池,若在加入任务前开启,则可能先执行了先加入的任务
	pool.wait_for_all_done();				// 等待
	printf("\n");

	auto res = pool.execute(0, func2, 3);	// 体现异步程序
	pool.wait_until_all_done();				// 等待异步程序执行完毕,此句可省略,res.get() 会自动等待
	printf("result is %d\n\n", res.get());	// 获取结果
	pool.exit();							// 退出,为了重设线程数

	// 以下程序有随机性,提供可能的结果
	pool.init(3);							// 设置线程数量,设置 3 线程,体现并发
	pool.start();
	for (int i = 0; i < 10; ++i)
		pool.execute(0, func3);
	this_thread::sleep_for(2ms);			// 等待两毫秒
	pool.pause();							// 暂停
	printf("pause here\n");	
	pool.resume();							// 继续运行
	pool.wait_for_all_done();				// 等待运行完毕后退出
	pool.exit();

	(void)getchar();
	return 0;
};

可能的结果

func1 4
func1 3
func1 2
func1 1
func1 0

result is 9

func3 ok
func3 ok
func3 ok
func3 ok
pause here
func3 ok
func3 ok
func3 ok
func3 ok
func3 ok
func3 ok

添加评论