文章目录
- muduo网络库的核心代码模块
- 各模块功能解释
- Channel
- Poller
- EpollPoller
- EventLoop
- EventLoopThread
- EventLoopThreadPool
- TcpServer
- TcpConnection
- 从实际应用出发
muduo网络库的核心代码模块
1、channel
2、Poller 和它的子类 EpollPoller
3、EventLoop
4、Thread、EventLoopThread、EventLoopThreadPool
5、Sock、Acceptor
6、Buffer
7、TcpServer、TCPConnection
至于其他还有Logger模块,就不是重点了吧。
各模块功能解释
经过我三天的研究,以及之前的源码铺垫,整理出来了第一个版本,当然后面会持续更新,预计更新到国庆节回来,那个版本应该是能看了。
Channel
根据收到的事件,调用相应的回调。
一个channel绑定一个fd
生命周期:新连接产生 -> 该连接断开。
由poller管理,从属于loop,可配置epoll监听事件。
Poller
muduo中多路事件分发器的核心模块,包含了一个 channel 数组,同时也是一个抽象基类(我只继承了epoll模块),
可以说:One loop per poller.
EpollPoller
实现了:
epoll_create:构造函数
epoll_ctl:一堆的 enable 函数
epoll_wait:poll方法。
通过epoll_wait,将有事件的channel通过参数传递给EventLoop。
此处参数:events[i].data.ptr。(经验呐!!!我觉得有这么一点,这篇就亮了!!!还不止呢。)
void EpollPoller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const {//for(Channel* channel:activeChannels){//这样剧让不行了!!!//eventloop 即将拿到它的poller返回的所有发生事件列表for (int i = 0; i < numEvents; ++i) {Channel* channel = static_cast<Channel*>(events_[i].data.ptr); //666666666,这一行代码,N年的功力,你接得住吗?channel->set_revents(events_[i].events);activeChannels->push_back(channel);}
}void EpollPoller::update(int operation, Channel* channel) {epoll_event event;int fd = channel->fd();memset(&event, 0, sizeof(event));event.events = channel->events();event.data.ptr = channel; //上面那行,配合上这行看event.data.fd = fd;if (::epoll_ctl(epollfd_, operation, fd, &event) < 0) {if (operation == EPOLL_CTL_DEL) {LOG_ERROR("epoll_ctl del error:%d\n", errno);}else {LOG_FATAL("epoll_ctl add/mod error:%d\n", errno);}}
}
EventLoop
事件循环,One loop per thread,per poller,many channels,per wakeupchannel.
这个 wakeupchannel 是干嘛的呢?专门用于监听唤醒 eventfd 相应的 loop,这个事件通知机制没有见过吧,反正我是第一次见,基于文件描述符的,据说比 condition 要高档一些,condition都显得有点老了,这个比较年轻。
//通过轮询的方式唤醒channel
int createEventfd() {//创建一个能被用户应用程序用于时间等待唤醒机制的eventfd对象//eventfd 单纯的使用文件描述符实现的线程间的通知机制,可以很好的融入select、poll、epoll的I/O复用机制中int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);if (evtfd < 0) {LOG_ERROR("Failed in eventfd%d\n", errno);}return evtfd;
}
每一个 EventLoop,都配备有一个wakeupfd,有一个wakeup channel 专门负责处理 wakeupfd 事件。
处理办法:随便读个数据,唤醒本 loop 起来干活了。(每一个 EventLoop 都监听了 wakeupchannel 的 EPOLL_IN 事件)
loop() 开始运行后,
1、通过poll函数进行epoll_wait,获取activeChannel。
2、唤醒相应channel。
3、执行doPendingFunction 方法(这里默认有子loop。)
这里又是个巧夺天工的设计:
void EventLoop::doPendingFunctors()
{std::vector<Functor> functors;callingPendingFunctors_ = true;{std::unique_lock<std::mutex> lock(mutex_);functors.swap(pendingFunctors_);}for (const Functor& functor : functors){functor();}callingPendingFunctors_ = false;
}
看这一行:functors.swap(pendingFunctors_);
这里为什么要把 pendingFunctors_ 置换出来?这个置换有意思吗?那可太有意思了。
如果不置换,直接拿着 pendingFunctors_ 去执行,这个资源是不是要被锁住?那接下来有新事件过来要放哪里?再开个pendingFunctors_ 2号吗?
这样一置换,相当于这些事件可以并发执行了。
有意思吧。
再看这个queueInLoop 和runInLoop:
void EventLoop::runInLoop(Functor cb) {if (isInLoopThread()) {cb();}else {queueInLoop(std::move(cb));}
}void EventLoop::queueInLoop(Functor cb) {{std::unique_lock<std::mutex> lock(mutex_);pendingFunctors_.push_back(std::move(cb));}if (!isInLoopThread() || callingPendingFunctors_) {//callingPendingFunctors_:我是在本线程,而且我还在执行回调,有啥事情赶紧的拿过来,不然一会儿loop转一圈过去又阻塞了wakeup();}
}
如果有子loop,主loop只处理 Acceptor(runInLoop),剩下的全都在queueInLoop -> wakeup 子Loop->handleRead(或者事件已就绪)->唤醒 channel。
EventLoopThread
功能:用于绑定一个 loop 和一个thread。
类成员:一个 EventLoop 指针,一个thread 对象,锁、条件变量、回调等。
one loop per thread 在此处体现:
startloop:启动底层新线程,执行回调,配置 loop 并返回,创建一个独立的 loop,并开启事件循环。
EventLoopThreadPool
事件循环线程池。
包含一个baseloop的指针,第一个EventLoopThread的vector,以及一个EventLoop的vector。
start:创建一定数量的事件循环线程,添加到 std::vector<std::unique_ptr<EventLoopThread>>。并启动这些线程,添加到std::vector<EventLoop*>中。
GetNextLoop:如果工作在多线程中,baseloop 会默认以轮询的方式分配channel给subloop。
TcpServer
负责处理新连接。
Acceptor、EventLoopThreadPool、TCPConnection。
TcpServer(EventLoop* loop,const InetAddress& listenAddr,const std::string& nameArg,Option option = kNoReusePort);//设置底层subloop个数
void setThreadNum(int numThreads);//开启服务器监听
void start();
TcpServer::TcpServer(EventLoop* loop,const InetAddress& listenAddr,const std::string& nameArg,Option option = kNoReusePort):loop_(CheckLoopNotNull(loop)),ipport_(listenAddr.toIpPort()),name_(nameArg),acceptor_(new Accept(loop, listenAddr, option == kReusePort)), //在这里对sock进行了初始化,不过还没有监听,更没有accept,关于accept在后续章节再提,快了//只有在 server start 之后才会listen,listen到才会去acceptthreadpool_(new EventLoopThreadPool(loop, name_)),//构建一个 EventLoopThreadPool (可以视为mainreactor)对象,//不过也就是构建一下,不干啥,关于EventLoopThreadPool的章节后面会提//start之后会创建制定数量的线程,并绑定新的loop,返回地址。connectionCallback_(),messageCallback_(),nextConnId_(1)
{//当有新用户连接时,会执行NewConnectionCallbackacceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, std::placeholders::_1, std::placeholders::_2));//在Acceptor 的handleread方法了被调用
}//开启服务器监听
void TcpServer::start() {if (started_ == 0) { //防止被多次startthreadpool_->start(threadInitCallback_); //EventLoopThreadPool的startloop_->runInLoop(std::bind(&Accept::listen, acceptor_.get())); //这里拿来run的loop就是mainloop++started_;}
}//当有新链接来的时候,acceptor会调用这个回调
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) {//根据轮询算法,选择一个subloop,唤醒subloopEventLoop* ioloop = threadpool_->GetNextLoop();char buf[64] = { 0 };snprintf(buf, sizeof buf, "-%s#%d", ipport_.c_str(), nextConnId_);++nextConnId_;std::string connName = name_ + buf;LOG_INFO("TcpConnnection::newConnection [%s] -new connection [%s] from %s \n",name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str());//通过sockfd获取其本机IPsockaddr_in local;::bzero(&local, sizeof local);socklen_t addrlen = sizeof local;if (::getSockname(sockfd, (sockaddr*)&local, &addrlen) < 0) {//pass,日志打印,写漏了}InetAddress localAddr(::getLocalAddr(sockfd));//根据连接成功的fd,创建TCPConnection连接对象//一个连接对应一个 TCPConnectionptr 管理//关于TCPConnection的事情也是接下来展开TcpConnectionptr conn(new TcpConnection(ioloop, connName, sockfd, localAddr, peerAddr));connections_[connName] = conn;conn->setConnectionCallback(connectionCallback_);conn->setMessageCallback(messageCallback_);conn->setWriteCompleteCallback(writeCompleteCallback_);conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafeioloop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));//把当前connfd封装成channel分发给subloop}//设置底层subloop个数
void TcpServer::setThreadNum(int numThreads) {threadpool_->setThreadNum(numThreads);
}
TcpConnection
一个conn对应一个fd。
创建channel、绑定读、写、关闭、错误回调。
从实际应用出发
直接看这段代码:
int main(int argc, char **argv){EventLoop loop;InetAddress addr(ip, port);ChatServer server(&loop,addr,"ChatServer");server.start();loop.loop();return 0;
}
这里面先初始化了一个 loop 的对象,这是一个baseloop,设置wakeup回调类型,以及时间发生后回调操作,监听wakeup channel 的EPOLLIN事件。
然后是 ChatServer server(&loop,addr,“ChatServer”);
1、构建Acceptor,执行到bind之后,为acceptchannel设置 ReadCallback 回调,绑定了监听套接字,在回调函数中有 accept 和 NewConnectionCallBack 回调。
2、构建EventLoopThreadPool,啥也不干。
3、设置 newConnectionCallBack。
4、server.start(); 开启服务监听,将Acceptor::listen 函数绑定在 loop上,开启listen,配置channel enableReading。
EventLoopThreadPool.start 创建事件循环线程,并运行起来。
将线程和事件循环绑定起来。
5、loop.loop() 启动主loop。
再看这段代码:
void ChatServer::onMessage(const TcpConnectionPtr &conn, Buffer *buff, Timestamp time){string buf = buff->retrieveAllAsString();json js = json::parse(buf);//通过msgid获取业务回调,进行网络模块和任务模块之间的解耦合auto msgHandler = ChatService::instance()->getHandle(js["msgid"].get<int>());//回调消息绑定好的事件处理器,执行相应的业务处理msgHandler(conn,js,time);//成功解耦
}
1、调用Buffer模块转码数据(这个模块的设计也很nice,可惜我还没把握住)。
2、调用JSON解析数据。
3、conn->send
在当前线程:sendInLoop;
不在当前线程:channel-> runInLoop->queueInLoop,(为啥不直接调用呢。。)
且先总结到此处,夜以深了,我该去想我该想的人了,哎、
















