C++发布订阅模式
发布订阅模式主要包含三个部分:消息发布、消息订阅者、消息处理中心。与观察者模式相比多出了消息处理中心模块,这样在结构上可以解耦订阅者与发布者,功能上更加的丰富。
观察者模式
结构设计
- 有一个消息list,主线程向这个list尾部追加消息,同时另一个子线程从消息list头部不断取出第一个消息
- 查找消息订阅map,订阅者与消息设计为n:n关系,一个消息可被多个订阅者订阅,因此需依次执行订阅了这个消息的函数
- 如:Stocks Trade消息需被Suber1和Suber4两个订阅者处理
简单来说就是:EventDeta*就是一个数据,并为之取了一个别名 * Trade,称之为消息。而很多类函数(普通函数也一样的)的正常执行需要这个EventData,因此这个类将其某个函数和EventData通过订阅列表绑定,当这个EventData也即是消息成为消息列表的第一个元素时,就执行和这个EventData绑定的所有类函数。
代码
[知乎]](https://zhuanlan.zhihu.com/p/484171260)
消息发布
EventData.h:
/* 定义有哪些消息、消息对应的数据当想要发自己的消息,可以在这里进行添加
*/
// 消息句柄用于标识消息
char * event_data1 = "Stocks Trade";
// 消息主体,可做为订阅者的入参,以对此消息进行处理
struct EventData1
{int e = 0;
};char * event_data2 = "Bonds Trade";
struct EventData2
{char * i = "receiver";
};char * event_data3 = "Funds Trade";
struct EventData3
{EventData1 data1;EventData2 data2;
};
消息订阅者
EventSuber.h: 订阅者消息处理函数一般接收两个参数,一个是自己的参数,一个是传过来的消息
这里可以不申明类,直接写三个消息处理函数也可以
#include"EventData.h"
#include<iostream>
#include<string>using namespace std; // string是标准卡函数/*订阅消息处理模块 3个 */
class Subscriber1
{
public:// 利用静态成员函数,保留上一次消息处理结果// 不能访问非静态成员变量与非静态成员函数 => 传这个类的指针 void* _this// 强转_this指针,调用此类的成员函数、成员变量 _this=&(Subscriber1实例)// 给这个订阅器传一个消息static void HandleReceiveEvent (void * _this, void * data){Subscriber1 * th = (Subscriber1 *)_this;EventData1* d = (EventData1 *)data; d->e += th->_data;printf("%s, data:%d \n",__FUNCTION__,d->e);}int _data = 10;
};class Subscriber2
{
public:static void HandleReceiveEvent (void * _this, void * data){Subscriber2 * th = (Subscriber2 *)_this;EventData2* d = (EventData2 *)data;string str(d->i);str += th->_data;printf("%s, data:%s \n",__FUNCTION__,str.c_str());}char * _data = " Subscriber 2";
};class Subscriber3
{
public:static void HandleReceiveEvent (void * _this, void * data){Subscriber3 * th = (Subscriber3 *)_this;EventData3* d = (EventData3 *)data;d->data1.e += th->_data1;string str(d->data2.i);str += th->_data2;printf("%s, data:%d , %s \n",__FUNCTION__,d->data1.e,str.c_str());}int _data1 = 30;char * _data2 = " Subscriber 3";
};
消息处理中心
EventDeal.h: 开启一个子线程,不断去消息list取消息,并将该消息将由所有订阅者进行处理
#include<pthread.h>
#include<map>
#include<list>
#include "EventSuber.h"/*消息处理中心*/
class EventMsgCentre
{
public:typedef void(*HandleEvent)(void * , void *);/* 订阅消息节点 */// 消息处理函数设置两个入参:第一个_this指针是订阅器者,第二个是消息EventData// HandleReceiveEvent(suber,msg)形式struct EventSubscriberNode{void * _this; // 只是普通的指针变量HandleEvent func; // 函数指针,func无返回类型,且双入参都为void*};/* 发布消息节点 */struct EventPublishNode{char * event;void * data;};public:// 初始化消息发布、订阅锁EventMsgCentre(){pthread_mutex_init(&_mutexSubscriber, nullptr);pthread_mutex_init(&_mutexPublish, nullptr);}// 释放锁空间~EventMsgCentre(){pthread_mutex_destroy(&_mutexPublish);pthread_mutex_destroy(&_mutexSubscriber);}// 订阅消息 =》 订阅map追加元素void SubscriberEvent(char * event, EventSubscriberNode node){pthread_mutex_lock(&_mutexSubscriber);sMap[event].push_back(node); pthread_mutex_unlock(&_mutexSubscriber);}// 删除消息订阅void releaseSubscriberEvent(char * event, EventSubscriberNode node){pthread_mutex_lock(&_mutexSubscriber);auto it = sMap.find(event); // 订阅map中找到这个消息的订阅列表if(it != sMap.end()){for(auto ite = it->second.begin(); ite != it->second.end(); ){// 删除这个消息对应订阅列表的某个订阅if(ite->_this == node._this && ite->func == node.func){ite = it->second.erase(ite);}else{++ite;}}}pthread_mutex_unlock(&_mutexSubscriber);}// 发布消息,往消息列表添加一个消息节点void PublishEvent(char * event, void *data){EventPublishNode node;node.event = event;node.data = data;pthread_mutex_lock(&_mutexPublish);plist.push_back(node);pthread_mutex_unlock(&_mutexPublish);}/* 消息处理中心 */static void *EventProcess(void * _this){EventMsgCentre * th = (EventMsgCentre *)_this; while (1){EventPublishNode cur {nullptr, nullptr}; // 申明消息pthread_mutex_lock(&th->_mutexPublish); // 消息发布锁// 取出第一个消息if (th->plist.empty()){pthread_mutex_unlock(&th->_mutexPublish);continue;}cur = th->plist.front();th->plist.pop_front();pthread_mutex_unlock(&th->_mutexPublish);// 消息订阅锁// 找到这个消息对应的订阅 pthread_mutex_lock(&th->_mutexSubscriber);auto it = th->sMap.find(cur.event);if(it != th->sMap.end()){for(auto ite = it->second.begin(); ite != it->second.end(); ++ite){ite->func(ite->_this, cur.data); // 执行所有订阅者的消息动作}}pthread_mutex_unlock(&th->_mutexSubscriber);}}// 创建线程void theardProc(){/* 1.向调用者传递子线程的线程号 2.线程属性设置,一个结构体,包括线程优先级,线程栈大小3.指定子线程允许的函数,需要一个函数指针4.子线程运行的函数参数值*/int ret = pthread_create(&pt_id, nullptr, EventProcess, this);}public:pthread_mutex_t _mutexSubscriber; // 消息订阅互斥锁pthread_mutex_t _mutexPublish; // 消息发布互斥锁pthread_t pt_id; // 线程idlist<EventPublishNode> plist; // 消息发布列表map<char *, list<EventSubscriberNode>> sMap; // 消息订阅map,一个订阅者可订阅多个消息
};
main线程
添加三个消息,并分别添加如顶图所示的订阅者(订阅者4换成订阅者1)
#include<iostream>
#include<map>
#include<list>
#include <windows.h>
#include <process.h>
#include"EventDeal.h"/* 测试。 */
int main()
{EventMsgCentre eveMsg; /* 初始化 */eveMsg.theardProc(); /* 启线程 */ /* 订阅消息 */Subscriber1 ber;ber._data = 100; EventMsgCentre::EventSubscriberNode node1;// 节点的_this就是一个订阅器node1._this = &ber; // 节点的函数指针就是订阅器的HandleReceiveEventnode1.func = Subscriber1::HandleReceiveEvent; eveMsg.SubscriberEvent(event_data1, node1); // 为消息1("Stocks Trade")添加一个订阅器1// 由于消息2和消息3不能直接转换成消息1,所以给消息1添加了一个重复的订阅者1eveMsg.SubscriberEvent(event_data1, node1); Subscriber2 ber2;ber2._data = " Subscriber2_200";EventMsgCentre::EventSubscriberNode node2;node2._this = &ber2;node2.func = Subscriber2::HandleReceiveEvent;eveMsg.SubscriberEvent(event_data2, node2);Subscriber3 ber3;ber3._data1 = 300;ber3._data2 = " Subscriber3_300";EventMsgCentre::EventSubscriberNode node3;node3._this = &ber3;node3.func = Subscriber3::HandleReceiveEvent;eveMsg.SubscriberEvent(event_data3, node3);/* 发布消息 */EventData1 d1{1};eveMsg.PublishEvent(event_data1, &d1);EventData2 d2 {"event_data2"};eveMsg.PublishEvent(event_data2, &d2);EventData3 d3 {3,"event_data3"};eveMsg.PublishEvent(event_data3, &d3);// Sleep(1000); // 休眠一秒,主线程退出,子线程消息处理函数随之退出// eveMsg.~EventMsgCentre();while(1){} // 等待EventProcess执行完成return 0;
}




















