项目中的数据队列基于轮询和selep的实时性及CUP性能差,需要进行优化,尝试使用concurrentqueue进行优化。网上有一些资料介绍,可供参考。
使用后的个人理解:一个线程安全的queue,并且concurrentqueue的线程安全并不是一味的加锁,它有特殊的技巧,总的来说线程安全且高效但是不保证数据的有序性。是一个很nice的MQ。分析需求,使用阻塞队列,一旦有数据便被读取了,所以使用concurrentqueue可行。
使用:
方法 | 功能 | |
---|---|---|
ConcurrentQueue(size_t initialSizeEstimate) | 构造函数,它可选地接受队列将包含的元素数量的估计值 | |
enqueue(T&& item) | 将一个项目排队,必要时分配额外的空间 | |
try_enqueue(T&& item) | 将一个项目入队,但前提是已经分配了足够的内存,返回值为bool类型 | |
try_dequeue(T& item) | 将一个项目出队,如果找到一个项目,则返回true;如果队列为空,则返回false | |
(size_t size_approx() | 返回元素个数 |
方法 | 作用 | |
---|---|---|
bool try_dequeue_from_producer(producer_token_t const& producer, U& item) | 从特定的生产者队列出列 | |
size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max) | 批量出列 | |
bool is_lock_free() | 检测是否是原子操作 |
concurrentqueue的API
封装于:“concurrentqueue.h”
# Allocates more memory if necessary
enqueue(item) : bool
enqueue(prod_token, item) : bool
enqueue_bulk(item_first, count) : bool
enqueue_bulk(prod_token, item_first, count) : bool# Fails if not enough memory to enqueue
try_enqueue(item) : bool
try_enqueue(prod_token, item) : bool
try_enqueue_bulk(item_first, count) : bool
try_enqueue_bulk(prod_token, item_first, count) : bool# Attempts to dequeue from the queue (never allocates)
try_dequeue(item&) : bool
try_dequeue(cons_token, item&) : bool
try_dequeue_bulk(item_first, max) : size_t
try_dequeue_bulk(cons_token, item_first, max) : size_t# If you happen to know which producer you want to dequeue from
try_dequeue_from_producer(prod_token, item&) : bool
try_dequeue_bulk_from_producer(prod_token, item_first, max) : size_t# A not-necessarily-accurate count of the total number of elements
size_approx() : size_t
简单示例
#include<iostream>
#include"concurrentqueue.h"
int main()
{int x=0,y=0,z=0;moodycamel::ConcurrentQueue<int> q;q.enqueue(2);q.enqueue(4);q.enqueue(6);q.try_dequeue(x);std::cout << q.size_approx() <<" "<<x<< std::endl;if (q.is_lock_free)std::cout << "is lok free" << std::endl;return 0;
}
- 需要注意的是concurrentqueue不保证数据的有序性
#include<iostream>
#include<string>
#include<thread>
#include<mutex>
#include"concurrentqueue.h"class MintorAbstractBuffer
{
public:MintorAbstractBuffer(const uint32_t buffersize):_Buffersize(buffersize){}bool push(std::string s){int32_t size = _buffer.size_approx();if (_Buffersize > size){if (_buffer.enqueue(s)){++_Size;return true;}}return false;}bool RecursivePush(std::string s,int32_t num){//std::lock_guard<std::mutex> lock(_mut);for (int32_t i = 0; i < num; ++i){int32_t size = _buffer.size_approx();if (size >= _Buffersize){std::cout << "Push Error" << " ";return false;}_buffer.enqueue(s);++_Size;}return true;}void pint(){int32_t size = _buffer.size_approx();for (int32_t i = 0; i <= size; ++i){std::string curs;if (!_buffer.try_dequeue(curs))std::cout << "Pint Error" << std::endl;std::cout << curs<< i <<" ";if (!_buffer.enqueue(curs))std::cout << "Pint Error" << std::endl;}std::cout << std::endl;std::cout <<_buffer.size_approx()<< std::endl;std::cout <<_Size<< std::endl;}
private:std::mutex _mut;int32_t _Buffersize;moodycamel::ConcurrentQueue<std::string> _buffer;uint32_t _Size=0;
};int main()
{std::string s1 = "a", s2 = "b", s3 = "c";MintorAbstractBuffer buffer(100000);std::thread t1(&MintorAbstractBuffer::RecursivePush, &buffer, s1, 333);std::thread t2(&MintorAbstractBuffer::RecursivePush, &buffer, s2, 333);std::thread t3(&MintorAbstractBuffer::RecursivePush, &buffer, s3, 334);t1.join();t2.join();t3.join();buffer.pint();return 0;
}
加锁之后使得++size不会被打断:
_size是cont的push次数,在++size过程中可能被其他线程的语句终止导致并没有++size,大概是1%的终止率。
想要数据有序还是要使用STL::queue,或者使用ConcurrentQueue的阻塞版本,见文末
#include<iostream>
#include<string>
#include<thread>
#include<mutex>
#include<queue>class MintorAbstractBuffer
{
public:MintorAbstractBuffer(const uint32_t buffersize):_Buffersize(buffersize){}bool push(std::string s){int32_t size = _buffer.size();if (_Buffersize > size){_buffer.push(s);++_Size;return true;}return false;}bool RecursivePush(std::string s,int32_t num){std::lock_guard<std::mutex> lock(_mut);for (int32_t i = 0; i < num; ++i){int32_t size = _buffer.size();if (size > _Buffersize){std::cout << "Push Error" << " ";return false;}_buffer.push(s);++_Size;}return true;}void pint(){int32_t size = _buffer.size();for (int32_t i = 0; i <= size; ++i){std::cout << i;std::cout << _buffer.front() << " ";std::string curs;curs = _buffer.front();_buffer.pop();_buffer.push(curs);}std::cout <<std::endl;std::cout << "push cont :" << _Size << std::endl;std::cout << "_buffer.size() :" << _buffer.size() << std::endl;}private:std::mutex _mut;int32_t _Buffersize;std::queue<std::string> _buffer;uint32_t _Size=0;
};int main()
{std::string s1 = "a", s2 = "b", s3 = "c";MintorAbstractBuffer buffer(100000);int32_t x=3333;std::thread t1(&MintorAbstractBuffer::RecursivePush, &buffer, s1, x);std::thread t2(&MintorAbstractBuffer::RecursivePush, &buffer, s2, x);std::thread t3(&MintorAbstractBuffer::RecursivePush, &buffer, s3, x+1);t1.join();t2.join();t3.join();buffer.pint();return 0;
}
阻塞版本(blockingconcurrentqueue)的API
封装于:“blockingconcurrentqueue.h”,在工程中使用一个线程不断使用阻塞版本API调用数据可以提升数据的处理速度(缓冲区基本为空)。当然需要知道阻塞与非阻塞队列的区别,下面的API可以通过调整参数来控制多长时间从阻塞的队列中重新获取数据。github上有详细的介绍,出队操作有更改API如下。
wait_dequeue(U& item) : void
wait_dequeue_timed(U& item, std::int64_t timeout_usecs) : bool //时间参数 std::chrono::milliseconds(5)
wait_dequeue_bulk(It itemFirst, size_t max) : size_t
wait_dequeue_bulk_timed(consumer_token_t& token, It itemFirst, size_t max, std::int64_t timeout_usecs) : size_t