从实例看muduo网络库各模块交互过程

article/2025/11/6 18:22:12

请添加图片描述

文章目录

    • muduo网络库的核心代码模块
    • 各模块功能解释
      • Channel
      • Poller
        • EpollPoller
      • EventLoop
      • EventLoopThread
      • EventLoopThreadPool
      • TcpServer
      • TcpConnection
    • 从实际应用出发

muduo网络库的核心代码模块

1、channel
2、Poller 和它的子类 EpollPoller
3、EventLoop
4、Thread、EventLoopThread、EventLoopThreadPool
5、Sock、Acceptor
6、Buffer
7、TcpServer、TCPConnection

至于其他还有Logger模块,就不是重点了吧。


各模块功能解释

经过我三天的研究,以及之前的源码铺垫,整理出来了第一个版本,当然后面会持续更新,预计更新到国庆节回来,那个版本应该是能看了。


Channel

根据收到的事件,调用相应的回调。

一个channel绑定一个fd
生命周期:新连接产生 -> 该连接断开。

由poller管理,从属于loop,可配置epoll监听事件。


Poller

muduo中多路事件分发器的核心模块,包含了一个 channel 数组,同时也是一个抽象基类(我只继承了epoll模块),
可以说:One loop per poller.

EpollPoller

实现了:
epoll_create:构造函数
epoll_ctl:一堆的 enable 函数
epoll_wait:poll方法。
通过epoll_wait,将有事件的channel通过参数传递给EventLoop。

此处参数:events[i].data.ptr。(经验呐!!!我觉得有这么一点,这篇就亮了!!!还不止呢。)

void EpollPoller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const {//for(Channel* channel:activeChannels){//这样剧让不行了!!!//eventloop 即将拿到它的poller返回的所有发生事件列表for (int i = 0; i < numEvents; ++i) {Channel* channel = static_cast<Channel*>(events_[i].data.ptr);  //666666666,这一行代码,N年的功力,你接得住吗?channel->set_revents(events_[i].events);activeChannels->push_back(channel);}
}void EpollPoller::update(int operation, Channel* channel) {epoll_event event;int fd = channel->fd();memset(&event, 0, sizeof(event));event.events = channel->events();event.data.ptr = channel;           //上面那行,配合上这行看event.data.fd = fd;if (::epoll_ctl(epollfd_, operation, fd, &event) < 0) {if (operation == EPOLL_CTL_DEL) {LOG_ERROR("epoll_ctl del error:%d\n", errno);}else {LOG_FATAL("epoll_ctl add/mod error:%d\n", errno);}}
}

EventLoop

事件循环,One loop per thread,per poller,many channels,per wakeupchannel.

这个 wakeupchannel 是干嘛的呢?专门用于监听唤醒 eventfd 相应的 loop,这个事件通知机制没有见过吧,反正我是第一次见,基于文件描述符的,据说比 condition 要高档一些,condition都显得有点老了,这个比较年轻。

//通过轮询的方式唤醒channel
int createEventfd() {//创建一个能被用户应用程序用于时间等待唤醒机制的eventfd对象//eventfd 单纯的使用文件描述符实现的线程间的通知机制,可以很好的融入select、poll、epoll的I/O复用机制中int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);if (evtfd < 0) {LOG_ERROR("Failed in eventfd%d\n", errno);}return evtfd;
}

每一个 EventLoop,都配备有一个wakeupfd,有一个wakeup channel 专门负责处理 wakeupfd 事件。
处理办法:随便读个数据,唤醒本 loop 起来干活了。(每一个 EventLoop 都监听了 wakeupchannel 的 EPOLL_IN 事件)


loop() 开始运行后,
1、通过poll函数进行epoll_wait,获取activeChannel。
2、唤醒相应channel。
3、执行doPendingFunction 方法(这里默认有子loop。)
这里又是个巧夺天工的设计:

void EventLoop::doPendingFunctors()
{std::vector<Functor> functors;callingPendingFunctors_ = true;{std::unique_lock<std::mutex> lock(mutex_);functors.swap(pendingFunctors_);}for (const Functor& functor : functors){functor();}callingPendingFunctors_ = false;
}

看这一行:functors.swap(pendingFunctors_);

这里为什么要把 pendingFunctors_ 置换出来?这个置换有意思吗?那可太有意思了。

如果不置换,直接拿着 pendingFunctors_ 去执行,这个资源是不是要被锁住?那接下来有新事件过来要放哪里?再开个pendingFunctors_ 2号吗?

这样一置换,相当于这些事件可以并发执行了。

有意思吧。


再看这个queueInLoop 和runInLoop:

void EventLoop::runInLoop(Functor cb) {if (isInLoopThread()) {cb();}else {queueInLoop(std::move(cb));}
}void EventLoop::queueInLoop(Functor cb) {{std::unique_lock<std::mutex> lock(mutex_);pendingFunctors_.push_back(std::move(cb));}if (!isInLoopThread() || callingPendingFunctors_) {//callingPendingFunctors_:我是在本线程,而且我还在执行回调,有啥事情赶紧的拿过来,不然一会儿loop转一圈过去又阻塞了wakeup();}
}

如果有子loop,主loop只处理 Acceptor(runInLoop),剩下的全都在queueInLoop -> wakeup 子Loop->handleRead(或者事件已就绪)->唤醒 channel。


EventLoopThread

功能:用于绑定一个 loop 和一个thread。
类成员:一个 EventLoop 指针,一个thread 对象,锁、条件变量、回调等。

one loop per thread 在此处体现:

startloop:启动底层新线程,执行回调,配置 loop 并返回,创建一个独立的 loop,并开启事件循环。


EventLoopThreadPool

事件循环线程池。
包含一个baseloop的指针,第一个EventLoopThread的vector,以及一个EventLoop的vector。

start:创建一定数量的事件循环线程,添加到 std::vector<std::unique_ptr<EventLoopThread>>。并启动这些线程,添加到std::vector<EventLoop*>中。
GetNextLoop:如果工作在多线程中,baseloop 会默认以轮询的方式分配channel给subloop。


TcpServer

负责处理新连接。
Acceptor、EventLoopThreadPool、TCPConnection。

TcpServer(EventLoop* loop,const InetAddress& listenAddr,const std::string& nameArg,Option option = kNoReusePort);//设置底层subloop个数
void setThreadNum(int numThreads);//开启服务器监听
void start();
TcpServer::TcpServer(EventLoop* loop,const InetAddress& listenAddr,const std::string& nameArg,Option option = kNoReusePort):loop_(CheckLoopNotNull(loop)),ipport_(listenAddr.toIpPort()),name_(nameArg),acceptor_(new Accept(loop, listenAddr, option == kReusePort)),	//在这里对sock进行了初始化,不过还没有监听,更没有accept,关于accept在后续章节再提,快了//只有在 server start 之后才会listen,listen到才会去acceptthreadpool_(new EventLoopThreadPool(loop, name_)),//构建一个 EventLoopThreadPool (可以视为mainreactor)对象,//不过也就是构建一下,不干啥,关于EventLoopThreadPool的章节后面会提//start之后会创建制定数量的线程,并绑定新的loop,返回地址。connectionCallback_(),messageCallback_(),nextConnId_(1)
{//当有新用户连接时,会执行NewConnectionCallbackacceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, std::placeholders::_1, std::placeholders::_2));//在Acceptor 的handleread方法了被调用
}//开启服务器监听
void TcpServer::start() {if (started_ == 0) {  //防止被多次startthreadpool_->start(threadInitCallback_);	//EventLoopThreadPool的startloop_->runInLoop(std::bind(&Accept::listen, acceptor_.get()));	//这里拿来run的loop就是mainloop++started_;}
}//当有新链接来的时候,acceptor会调用这个回调
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) {//根据轮询算法,选择一个subloop,唤醒subloopEventLoop* ioloop = threadpool_->GetNextLoop();char buf[64] = { 0 };snprintf(buf, sizeof buf, "-%s#%d", ipport_.c_str(), nextConnId_);++nextConnId_;std::string connName = name_ + buf;LOG_INFO("TcpConnnection::newConnection [%s] -new connection [%s] from %s \n",name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str());//通过sockfd获取其本机IPsockaddr_in local;::bzero(&local, sizeof local);socklen_t addrlen = sizeof local;if (::getSockname(sockfd, (sockaddr*)&local, &addrlen) < 0) {//pass,日志打印,写漏了}InetAddress localAddr(::getLocalAddr(sockfd));//根据连接成功的fd,创建TCPConnection连接对象//一个连接对应一个 TCPConnectionptr 管理//关于TCPConnection的事情也是接下来展开TcpConnectionptr conn(new TcpConnection(ioloop, connName, sockfd, localAddr, peerAddr));connections_[connName] = conn;conn->setConnectionCallback(connectionCallback_);conn->setMessageCallback(messageCallback_);conn->setWriteCompleteCallback(writeCompleteCallback_);conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafeioloop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));//把当前connfd封装成channel分发给subloop}//设置底层subloop个数
void TcpServer::setThreadNum(int numThreads) {threadpool_->setThreadNum(numThreads);
}

TcpConnection

一个conn对应一个fd。

创建channel、绑定读、写、关闭、错误回调。


从实际应用出发

直接看这段代码:

int main(int argc, char **argv){EventLoop loop;InetAddress addr(ip, port);ChatServer server(&loop,addr,"ChatServer");server.start();loop.loop();return 0;
}

这里面先初始化了一个 loop 的对象,这是一个baseloop,设置wakeup回调类型,以及时间发生后回调操作,监听wakeup channel 的EPOLLIN事件。

然后是 ChatServer server(&loop,addr,“ChatServer”);
1、构建Acceptor,执行到bind之后,为acceptchannel设置 ReadCallback 回调,绑定了监听套接字,在回调函数中有 accept 和 NewConnectionCallBack 回调。

2、构建EventLoopThreadPool,啥也不干。

3、设置 newConnectionCallBack。

4、server.start(); 开启服务监听,将Acceptor::listen 函数绑定在 loop上,开启listen,配置channel enableReading。
EventLoopThreadPool.start 创建事件循环线程,并运行起来。
将线程和事件循环绑定起来。

5、loop.loop() 启动主loop。


再看这段代码:

void ChatServer::onMessage(const TcpConnectionPtr &conn, Buffer *buff, Timestamp time){string buf = buff->retrieveAllAsString();json js = json::parse(buf);//通过msgid获取业务回调,进行网络模块和任务模块之间的解耦合auto msgHandler = ChatService::instance()->getHandle(js["msgid"].get<int>());//回调消息绑定好的事件处理器,执行相应的业务处理msgHandler(conn,js,time);//成功解耦
}

1、调用Buffer模块转码数据(这个模块的设计也很nice,可惜我还没把握住)。
2、调用JSON解析数据。
3、conn->send

在当前线程:sendInLoop;
不在当前线程:channel-> runInLoop->queueInLoop,(为啥不直接调用呢。。)


且先总结到此处,夜以深了,我该去想我该想的人了,哎、


http://chatgpt.dhexx.cn/article/V9sVC2GM.shtml

相关文章

muduo总结

本文重点在muduo TcpServer的启动&#xff0c;I/O线程池的启动&#xff0c;以及各种回调 文章目录 baseAsyncLogging.{h,cc}Atomic.hBlockinQueue.hBoundedBlockinQueue.hCondition.hcopyable.hCountDownLatch.{h,cc}Date.{h,cc}Exception.{h,cc}Logging.{h,cc}Mutex.hProcess…

muduo网络库——日志处理

测试程序 #include "muduo/base/AsyncLogging.h" #include "muduo/base/Logging.h" #include "muduo/base/Timestamp.h"#include <stdio.h> #include <sys/resource.h> #include <unistd.h>off_t kRollSize 500*1000*1000;m…

Muduo日志模块详解

Muduo日志模块解析 图片取自muduo网络库源码解析(1):多线程异步日志库(上)_李兆龙的技术博客_51CTO博客也是很好的日志讲解博客,这篇讲解流程基本上和它差不多,并且写的比我条理清楚很多 AppendFile::append() 这个函数是日志写入文件的最终函数,并且AppendFile这个类里面也是…

Muduo 定时器

TimeQueue定时器 图片转载自:muduo网络库源码解析(4):TimerQueue定时机制_李兆龙的技术博客_51CTO博客 添加新的定时器 TimerId TimerQueue::addTimer(TimerCallback cb, //用户自定义回调Timestamp when, //定时器的超时时刻double interval) //重复触发间隔,小于0则不重…

《muduo网络库》学习笔记——muduo学习总结

muduo是基于非阻塞的IO和事件驱动的网络库&#xff08;Reactor模式&#xff09;&#xff0c;其核心是一个事件循环EventLoop&#xff0c;用于响应计时器和IO事件。muduo采用基于对象&#xff08;object-based&#xff09;而非面向对象&#xff08;object-oriented&#xff09;的…

Ubuntu安装muduo库

1. 首先安装boost库&#xff1b; sudo apt-get update sudo apt-get install libboost-all-dev 2. 下载muduo库&#xff0c; https://github.com/chenshuo/muduo 3. 解压后进入解压目录&#xff0c;vim CMakeLists.txt&#xff0c;注释掉略过unit_test测试用例代码的编译&#…

linux muduo 编译安装,muduo记录

1.muduo编译安装 编译muduo遇见的报错可以在github上的issue上面查找。一般都能顺利解决,我遇到的就是没有安装boost-dev. centos7系统 执行: sudo yum install boost-dev 2.截取流程图 图片截取自《Linux多线程服务端编程&#xff1a;使用muduo C网络库》 3.源码摘录 摘录一个…

muduo源码分析之TcpServer模块

这次我们开始muduo源代码的实际编写&#xff0c;首先我们知道muduo是LT模式&#xff0c;Reactor模式&#xff0c;下图为Reactor模式的流程图[来源1] 然后我们来看下muduo的整体架构[来源1] 首先muduo有一个主反应堆mainReactor以及几个子反应堆subReactor&#xff0c;其中子反应…

muduo网络库学习(1)

muduo网络库学习&#xff08;1&#xff09; 文章目录 muduo网络库学习&#xff08;1&#xff09;前言一、muduo是什么&#xff1f;二、代码结构1.base库2.net库3.附属库 二、网络库结构总结 前言 本章节主要介绍muduo网络库的整体架构&#xff01;一、muduo是什么&#xff1f;…

muduo

muduo 概述 muduo是基于Reactor模式的网络库&#xff0c;用于响应计时器和IO事件。 muduo采用基于对象而非面向对象的设计风格&#xff0c;其事件回调采用functionbind&#xff0c;用户在使用muduo的时候不需要继承其中的class 架构 Multiple Reactor Reactor模式&#xff1a…

muduo日志库原理以及源码分析

muduo日志库特点 日志批量写入批量唤醒写线程写日志用notifywait_timeout 方式触发日志的写入锁的粒度&#xff0c;双缓冲&#xff0c;双队列buffer默认 4M 缓冲区&#xff0c; buffers 是 buffer 队列&#xff0c; push 、 pop 时使用 move 语义 减少内存拷贝 muduo的这些特点…

muduo网络库与服务模型介绍

目录 一、muduo网络库简介 1、特点 2、代码结构 &#xff08;1&#xff09;公共接口 &#xff08;2&#xff09;内部实现 二、muduo线程模型 1、单线程Reactor 2、Reactor线程池 3、one loop per thread 4、one loop per thread 线程池 muduo是陈硕个人使用C开发的一…

muduo 架构解析

muduo是一个基于Reactor模式的C网络库。它采用非阻塞I/O模型&#xff0c;基于事件驱动和回调。我们不仅可以通过muduo来学习linux服务端多线程编程&#xff0c;还可以通过它来学习C11。     Reactor是网络编程的一般范式。我们这里从reactor模式为出发点&#xff0c;根据R…

muduo库介绍

muduo库是一个多线程服务器开发库 muduo 作者陈硕&#xff0c;现在在美国加州硅谷某互联网大公司工作&#xff0c;从事大规模分布式的可靠系统工程。这个库是作者多年工作的总结&#xff0c;可以说大家学通了这个库&#xff0c;找一份Linux服务器开发的工作是没问题的&#xf…

C++ muduo网络库知识分享01 - Linux平台下muduo网络库源码编译安装

Muduo is a multithreaded C network library based on the reactor pattern. muduo库的介绍就是&#xff1a;一个基于reactor反应堆模型的多线程C网络库。 muduo网络库是C语言开发的一个非常优秀的网络库&#xff0c;作者陈硕&#xff0c;muduo网络库在多线程环境下性能非常高…

遗传算法示例

遗传的概念&#xff1a; 遗传算法是模拟达尔文生物进化论的自然选择和遗传学机理的生物进化过程的计算模型&#xff0c;是一种通过模拟自然进化过程搜索最优解的方法。 遗传算法的特点&#xff1a; 对于搜索算法的共同特征有 首先组成一组候选解。依据某些使用性条件测算这些…

10分钟搞懂遗传算法

大自然有种神奇的力量&#xff0c;它能够将优良的基因保留下来&#xff0c;从而进化出更加强大、更加适合生存的基因。遗传算法便基于达尔文的进化论&#xff0c;模拟了自然选择&#xff0c;物竞天择、适者生存&#xff0c;通过N代的遗传、变异、交叉、复制&#xff0c;进化出问…

遗传算法简单实例

遗传算法的手工模拟计算示例 为更好地理解遗传算法的运算过程&#xff0c;下面用手工计算来简单地模拟遗传算法的各 个主要执行步骤。 例&#xff1a;求下述二元函数的最大值&#xff1a; (1) 个体编码 遗传算法的运算对象是表示个体的符号串&#xff0…

遗传算法(基础知识)

遗传算法&#xff08;基础知识&#xff09; 遗传算法简称GA&#xff08;Genetic Algorithms&#xff09;模拟自然界生物遗传学&#xff08;孟德尔&#xff09;和生物进化论&#xff08;达尔文&#xff09;通过人工方式所构造的一类 并行随机搜索最优化方法&#xff0c;是对生物…

遗传算法概念、步骤、应用解析(案例直白--黄含驰)

遗传算法 ①  在几十亿年的演化过程中&#xff0c;自然界中的生物体已经 形成了一种优化自身结构的内在机制&#xff0c;它们能够不 断地从环境中学习&#xff0c;以适应不断变化的环境  对于大多数生物体&#xff0c;这个过程是通过自然选择和有性生殖来完成的。自然选择…