目录
1、概述
2、Thread
2.1 Thread.h
3、EventLoopThread
3.1 EventLoopThread.h
3.2 EventLoopThread.cc
4、 EventLoopThreadPool
4.1 EventLoopThreadPool.h
4.2 EventLoopThreadPool.cc
1、概述
管理事件循环线程的调度的
打包了一个EventLoop和线程,绑定了一个loop跟thread,让这个loop运行在这个thread上,在这个thread里面创建一个loop(one loop per thread)
底层封装的线程
2、Thread
2.1 Thread.h
#pragma once
#include "noncopyable.h"
#include <functional>
#include <thread>
#include <memory>
#include <string>
#include <atomic>
class Thread:noncopyable
{
public:
using ThreadFunc=std::function<void()>;//线程函数的函数类型 绑定器和函数对象,就可以传参
explicit Thread(ThreadFunc,const std::string& name=std::string());//构造函数
~Thread();
void start();//启动当前线程
void join();//当前线程等待其他线程完了再运行下去
bool started()const {return started_;}
pid_t tid()const{return tid_;}
const std::string& name()const{return name_;}
static int numCreated(){return numCreated_;}
private:
void setDefaultName();//给线程设置默认的名称
bool started_;//启动当前线程
bool joined_;//当前线程等待其他线程完了再运行下去
std::shared_ptr<std::thread> thread_;//自己来掌控线程对象产生的时机
pid_t tid_;
ThreadFunc func_;//存储线程函数
std::string name_;//调试的时候打印
static std::atomic_int numCreated_;//对线程数量计数
};
我们使用C++结合lambda表达式 的方法来实现,非常方便。
3、EventLoopThread
3.1 EventLoopThread.h
#pragma once
#include "noncopyable.h"
#include "Thread.h"
#include <functional>
#include <mutex>
#include <condition_variable>
#include <string>
class EventLoop;
class EventLoopThread:noncopyable
{
public:
using ThreadInitCallback=std::function<void(EventLoop*)>;
EventLoopThread(const ThreadInitCallback& cb=ThreadInitCallback(), //线程初始化的回调
const std::string& name=std::string());
~EventLoopThread();
EventLoop* startLoop();//开启循环
private:
void threadFunc();//线程函数,创建loop
EventLoop* loop_;
bool exiting_;//是否退出循环
Thread thread_;
std::mutex mutex_;
std::condition_variable cond_;
ThreadInitCallback callback_;//初始化操作
};
3.2 EventLoopThread.cc
#include "EventLoopThread.h"
#include "EventLoop.h"
EventLoopThread::EventLoopThread(const ThreadInitCallback& cb, //线程初始化的回调
const std::string& name)
:loop_(nullptr)
,exiting_(false)
,thread_(std::bind(&EventLoopThread::threadFunc,this),name)//绑定回调函数
,mutex_()
,cond_()
,callback_(cb)
{
}
EventLoopThread::~EventLoopThread()
{
exiting_=true;
if(loop_!=nullptr)
{
loop_->quit();
thread_.join();
}
}
EventLoop* EventLoopThread::startLoop()//开启循环
{
thread_.start();//启动底层的新线程
//启动后执行的是EventLoopThread::threadFunc
EventLoop* loop=nullptr;
{
std::unique_lock<std::mutex> lock(mutex_);
while(loop_==nullptr)//loop指针还没有初始化
{
cond_.wait(lock);挂起,等待
}
loop=loop_;
}
return loop;
}
//下面这个方法,是在单独的新线程里面运行的
void EventLoopThread::threadFunc()//线程函数,创建loop
{
EventLoop loop;//创建一个独立的eventloop,和上面的线程是一一对应的,one loop per thread
if(callback_)//如果有回调
{
callback_(&loop);//绑定loop做一些事情
}
{
std::unique_lock<std::mutex> lock(mutex_);
loop_=&loop;//就是运行在这个线程的loop对象,将这个对象初始化好之后(loop指针指向loop对象),才能唤醒(通知)
cond_.notify_one();//唤醒1个线程,被唤醒后去访问loop指针
}
loop.loop();//EventLoop loop=>Poller.poll
std::unique_lock<std::mutex> lock(mutex_);
loop_=nullptr;
}
4、 EventLoopThreadPool
这个很明显,是池的概念。是一个事件线程池,管理eventloop,eventloop绑定的就是一个线程。
用户最开始创建的loop,对应的是一个线程
4.1 EventLoopThreadPool.h
#pragma once
#include "noncopyable.h"
#include <functional>
#include <string>
#include <vector>
#include <memory>
class EventLoop;
class EventLoopThread;
class EventLoopThreadPool:noncopyable
{
public:
using ThreadInitCallback=std::function<void(EventLoop*)>;
EventLoopThreadPool(EventLoop* baseLoop,const std::string& nameArg);
~EventLoopThreadPool();
void setThreadNum(int numThreads){numThreads_=numThreads;}
void start(const ThreadInitCallback& cb=ThreadInitCallback());
//如果工作在多线程中,baseloop_默认以轮询的方式分配channel给subloop
EventLoop* getNextLoop();
std::vector<EventLoop*> getAllLoops();
bool started()const{return started_;}
const std::string name() const{return name_;}
private:
EventLoop* baseLoop_;//EventLoop loop;
std::string name_;
bool started_;
int numThreads_;
int next_;
std::vector<std::unique_ptr<EventLoopThread>> threads_;
std::vector<EventLoop*> loops_;
};
4.2 EventLoopThreadPool.cc
#include "EventLoopThreadPool.h"
#include "EventLoopThread.h"
#include <memory>
EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop,const std::string& nameArg)
:baseLoop_(baseLoop)
,name_(nameArg)
,started_(false)
,numThreads_(0)
,next_(0)
{}
EventLoopThreadPool::~EventLoopThreadPool()
{}
void EventLoopThreadPool::start(const ThreadInitCallback& cb)
{
started_=true;
for(int i=0;i<numThreads_;i++)
{
char buf[name_.size()+32];
snprintf(buf,sizeof buf,"%s%d",name_.c_str(),i);
EventLoopThread* t=new EventLoopThread(cb,buf);
threads_.push_back(std::unique_ptr<EventLoopThread>(t));
loops_.push_back(t->startLoop());//底层创建线程,绑定一个新的EventLoop,并返回该loop的地址
}
//整个服务端只有一个线程,运行着baseloop
if(numThreads_==0&&cb)
{
cb(baseLoop_);
}
}
//如果工作在多线程中,baseloop_默认以轮询的方式分配channel给subloop
//通过轮询的方式从子线程中取loop(循环)
//IO线程 baseloop 用作处理用户的连接事件
//工作线程 新创建的loop 用于处理用户的读写事件
EventLoop* EventLoopThreadPool::getNextLoop()
{
EventLoop* loop=baseLoop_;
if(!loops_.empty())//通过轮询获取下一个处理事件的loop
{
loop=loops_[next_];
next_++;
if(next_>=loops_.size())
{
next_=0;
}
}
return loop;
}
std::vector<EventLoop*> EventLoopThreadPool::getAllLoops()
{
if(loops_.empty())
{
return std::vector<EventLoop*>(1,baseLoop_);
}
else
{
loops_;
}
}