1、总体流程

1. acceptor 进行listen阶段后, 往channel中注册可读事件。
2. acceptor可读处理中生成TcpConnection指针,通过EventloopThreadPool
轮询出其中一个线程的eventloop, 并将此TcpConnection的可读、可写等事件注册到自己Channel(eventLoop)中。
3. 每个EventLoop监听到具体channel的事件后,分析是哪种事件并在当前线程执行绑定的TcpConnection的具体事件处理函数。
4.Channel是创建与TcpConnection和acceptor 中,但是在Eventloop中的poller保存了一份。不用管什么时候保存的。因为是在channel具体添加事件或者更新事件的时候发现不在容器中,顺便添加进去的。
5.于一些阻塞型或者耗时型的任务,例如 MySQL 操作等。这些显然是不能放在 IO 线程(即 EventLoop 所在的线程)中运行的,因为会严重影响 EventLoop 的正常运行。这些需要用户自己启用worker线程。muduo中有ThreadPool可以使用。

2、IO线程池的实现
类似于业务线程池ThreadPool, 这里只列出不同的部分,参考:worker线程剖析
成员变量
EventLoop* baseLoop_;string name_; bool started_;int numThreads_;int next_; //在getNextLoop中使用,为新的socket连接分配EventLoopstd::vector<std::unique_ptr<EventLoopThread>> threads_;std::vector<EventLoop*> loops_;
主要区别在:1. IO线程池没有任务队列, 主要是创建IO线程,并在IO线程中创建线程私有的EventLoop, 并保存在IO线程池的eventloop容器中,供acceptor分配socket连接使用。
IO线程池运行
void EventLoopThreadPool::start(const ThreadInitCallback& cb)
{assert(!started_);baseLoop_->assertInLoopThread();started_ = true;for (int i = 0; i < numThreads_; ++i){char buf[name_.size() + 32];snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);EventLoopThread* t = new EventLoopThread(cb, buf); //创建IO线程threads_.push_back(std::unique_ptr<EventLoopThread>(t)); //保存IO线程loops_.push_back(t->startLoop()); //保存IO线程私有的Eventloop到线程池容器中。供acceptor分配socket连接使用}if (numThreads_ == 0 && cb){cb(baseLoop_);}
}
提供Eventloop:以轮询的方式供accepor分配tcpconnection给不同的线程eventloop。
EventLoop* EventLoopThreadPool::getNextLoop()
{baseLoop_->assertInLoopThread();assert(started_);EventLoop* loop = baseLoop_; //默认是主loop,loops不为空时loop是线程池中的loop。if (!loops_.empty()){// round-robinloop = loops_[next_];++next_;if (implicit_cast<size_t>(next_) >= loops_.size()) //这里implicit_cast是muduo自己封装的转换{next_ = 0;}}return loop;
}
3. Acceptor实现,在主反应堆中实现
建立一个简单TCP服务需要四步骤:
- 步骤 1. socket() // 调用 socket 函数建立监听 socket
- 步骤 2. bind() // 绑定地址和端口
- 步骤 3. listen() // 开始监听端口
- 步骤 4. accept() // 返回新建立连接的 fd
构造Acceptro时,完成了步骤1和2.
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport): loop_(loop),acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())), //创建socket并设置为非阻塞状态acceptChannel_(loop, acceptSocket_.fd()),listening_(false),idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC))
{assert(idleFd_ >= 0);acceptSocket_.setReuseAddr(true);acceptSocket_.setReusePort(reuseport); acceptSocket_.bindAddress(listenAddr); //绑定地址acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}
TcpServer start的时候acceptor完成了步骤3 listening, 并将listen socket 可读事件注册到Eventloop中。
void Acceptor::listen()
{loop_->assertInLoopThread();listening_ = true;acceptSocket_.listen(); //开启监听acceptChannel_.enableReading(); //往事件循环注册可读事件
}
步骤四,在eventloop进行事件循环时,通过channel返回给acceptor可读事件已发生。此时进行accept.
//监听socket有事件发生
void Acceptor::handleRead()
{loop_->assertInLoopThread();InetAddress peerAddr;//FIXME loop until no moreint connfd = acceptSocket_.accept(&peerAddr);if (connfd >= 0){// string hostport = peerAddr.toIpPort();// LOG_TRACE << "Accepts of " << hostport;if (newConnectionCallback_){newConnectionCallback_(connfd, peerAddr); //生成TCPConnection,并注册连接、断开、可读、写完成事件.}else{sockets::close(connfd);}}else{LOG_SYSERR << "in Acceptor::handleRead";// Read the section named "The special problem of// accept()ing when you can't" in libev's doc.// By Marc Lehmann, author of libev.if (errno == EMFILE){::close(idleFd_);idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);::close(idleFd_);idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);}}
}
最后,newConnectionCallback_是TCpServer注册的函数。所以执行此函数就是在TCPServer中连接IO线程池和acceptor这两者进行socket 连接的分配。
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{loop_->assertInLoopThread();EventLoop* ioLoop = threadPool_->getNextLoop(); //为新连接分配IO线程char buf[64];snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);++nextConnId_;string connName = name_ + buf;LOG_INFO << "TcpServer::newConnection [" << name_<< "] - new connection [" << connName<< "] from " << peerAddr.toIpPort();InetAddress localAddr(sockets::getLocalAddr(sockfd));// FIXME poll with zero timeout to double confirm the new connection// FIXME use make_shared if necessaryTcpConnectionPtr conn(new TcpConnection(ioLoop,connName,sockfd,localAddr,peerAddr));connections_[connName] = conn;conn->setConnectionCallback(connectionCallback_);conn->setMessageCallback(messageCallback_);conn->setWriteCompleteCallback(writeCompleteCallback_);conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe/*连接建立后,放到属于此tcpconnect的eventloop中执行TcpConnection::connectEstablished函数。connectEstablished函数中,监听此tcp连接的可读事件。并通知用户的onConnection*/ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn)); }
未完待续。。。。。。















