一、队列的介绍
std::queue是 C++ 标准库提供的先进先出(FIFO, First In First Out)容器适配器,是队列数据结构的标准实现。它的核心规则是:只能在队尾添加元素、只能在队首移除元素,不支持随机访问中间元素,非常适合任务排队、数据缓冲、生产者 - 消费者解耦等场景,本次推流项目中的码流缓存队列,本质就是基于这个数据模型实现的。
在实际开发中队列发挥着巨大的作用,比方说多线程数据传输、缓存数据的存储、中间件的设计等等。
从上面这张图我们可以看到,队尾入队了三个元素分别是1,2,3。1号数据最早入队、2号数据第二入队、3号数据最后入队。出队的时候,1号最早出队(pop1)、2号排在1号数据后面(pop2)、3号最后出队(pop3)。所以使用队列的时候,我们可以保证数据的顺序不会出现乱序的错误。
二、队列的用处
队列常用于在多线程数据传输、数据解耦、缓存数据等方面。由于在大型的项目开发中,往往有许多线程同时运作。此时,许多线程之间需要进行数据的传递,所以此时我们就需要通过队列作为一条桥梁把数据从一个线程送到另外一个线程里面(如下图就是队列在两个线程之间的通信)。线程一把数据按照顺序把数据包存储到Queue上、线程二、三也按照顺序从队列拿到数据。
除了线程之间通信之外,队列还常用于数据量缓存方面。比方说,在音视频解码的时候,音视频数据会大量传入解码端。假设此时没有一个缓冲的时间,解码端可能会因为处理速度的问题,导致解码视频的时候会出现花屏、卡顿等问题。所以,此时我们就需要用队列进行缓冲,使其传输速度降下来,那解码端的解码压力就会大大降下来,此时解码出来的画面质量就会高很多,具体的流程如下图。
三、C++ STL队列的用法:
C++库已经提供了一套队列的api方便开发者进行开发,这样我们就不用重新再新造轮子去实现队列。下面我们就来看看我们用stl queue去实现队列:
3.1. queue的初始化:
#include <queue>
std::queue<object> object_queue;
初始化stl的queue,需要做两步。第一步要包含<queue>头文件,#include<queue>;第二步声明queue,std::queue<object> object_queue。这里的<object>里面的object是任意类型的数据,也包括结构体的数据。
3.2.queue的操作api:
front():返回 queue 中第一个元素的引用。如果 queue 是常量,就返回一个常引用;如果 queue 为空,返回值是未定义的。
back():返回 queue 中最后一个元素的引用。如果 queue 是常量,就返回一个常引用;如果 queue 为空,返回值是未定义的。
push(const T& obj):在 queue 的尾部添加一个元素的副本。这是通过调用底层容器的成员函数 push_back() 来完成的。
push(T&& obj):以移动的方式在 queue 的尾部添加元素。这是通过调用底层容器的具有右值引用参数的成员函数 push_back() 来完成的。
pop():删除 queue 中的第一个元素。
size():返回 queue 中元素的个数。
empty():如果 queue 中没有元素的话,返回 true。
emplace():用传给 emplace() 的参数调用 T 的构造函数,在 queue 的尾部生成对象。
swap(queue<T> &other_q):将当前 queue 中的元素和参数 queue 中的元素交换。它们需要包含相同类型的元素。也可以调用全局函数模板 swap() 来完成同样的操作。
3.3.queue的demo
#include <stdio.h> #include <queue> using namespace std; int main() { queue<int> q; // 循环6次,i从0到5 for (int i = 0; i < 6; i++) { // q.push(i):把数字i从队尾放入队列(入队操作) q.push(i); } // 循环结束后,队列里的元素(队首→队尾):0、1、2、3、4、5 // 连续执行4次出队操作:每次删除队首的元素 q.pop(); // 删除队首的 0,队列变成:1、2、3、4、5 q.pop(); // 删除队首的 1,队列变成:2、3、4、5 q.pop(); // 删除队首的 2,队列变成:3、4、5 q.pop(); // 删除队首的 3,队列变成:4、5 // q.front():读取当前队首的元素(不会删除元素) // 此时队首是 4,所以打印输出 4 printf("%d\n", q.front()); return 0; }上面这个是一个简单的stl queue操作,先入队6个元素(0-5)。然后再连续出队pop,这里总共出队了4次,此时元素0 1 2 3全部出队并删除,所以打印front的元素是4。
本项目中的VIDEO_QUEUE、LOW_VIDEO_QUEUE就是典型的生产者 - 消费者队列模型,完全贴合队列的设计初衷:
- 生产者:硬件编码线程,VENC 每编码完成一帧 H.264 数据,就封装成数据包
push到队列尾部; - 消费者:推流线程,循环从队列头部
front()取帧,处理完成后pop()移除,再进行时间戳转换、FLV 封装、RTMP 推送。 - 核心作用:解耦编码速度和推流速度,平滑网络波动带来的流量差,防止网络短暂卡顿导致硬件编码丢帧,是嵌入式流媒体项目的标准架构设计。
四、多线程队列使用
4.1 为什么需要多线程队列?
1. 普通单线程队列的致命问题:并发不安全
我们之前写的普通队列,只能单线程操作。如果两个线程同时操作队列(比如一个线程入队、一个线程同时出队,或者两个线程同时入队),会出现竞态问题:
- 数据被覆盖、丢失
- 队首 / 队尾指针错乱
- 内存越界,直接触发段错误崩溃
举个最简单的例子:两个线程同时入队,都读取了同一个队尾位置,同时写入数据,最后只有一个数据生效,另一个直接丢失,队尾指针还多加了一次,队列里出现无效空元素。
2. 核心应用:生产者 - 消费者模型
多线程队列最核心的作用,就是实现生产者 - 消费者解耦,这和你的推流项目架构完全对应:
- 生产者:硬件编码线程,VENC 每编码完一帧 H.264 数据,就把帧放进队列(入队)
- 消费者:推流线程,循环从队列里取帧,做时间戳转换、FLV 封装、RTMP 推送(出队)
- 队列:中间的缓冲带,平衡两边的速度差。比如网络短暂卡顿、推流变慢,队列可以缓存几帧,不会直接阻塞硬件编码、导致丢帧。
上图是同一个典型的多线程入队,出队的过程。这里需要创建两个线程,一个是入队线程、一个是出队线程。入队线程主要是通过push的api向Queue的队尾插入数据,插入数据的同时通过pthread_cond_broadcast通知出队线程取出数据。此时出队线程正在等待入队线程的唤醒(pthread_cond_wait),若收到唤醒通知则让队列数据出队。
4.2 多线程队列的三大核心组件
多线程队列 = 普通数据队列 + 互斥锁 + 条件变量
1. 队列本体
就是普通的先进先出队列,一般用环形数组或者链表实现,用来存实际的业务数据(你的项目里就是视频帧结构体video_data_packet_t)。 嵌入式场景优先用环形数组,不需要频繁申请释放内存,性能更稳定。
2. 互斥锁pthread_mutex_t
- 作用:给队列操作加「独占锁」,保证同一时间只能有一个线程操作队列(不管是入队还是出队)。
- 简单理解:操作队列前先「抢锁」,抢到了才能操作;操作完立刻解锁,让其他线程抢。从根源上解决并发冲突,保证数据安全。
3. 条件变量pthread_cond_t
- 作用:解决「队空的时候消费者没事干、队满的时候生产者没地方放」的问题,让线程阻塞休眠,等条件满足了再唤醒,不用死循环轮询浪费 CPU。
- 一般配套两个条件变量:
not_empty(队非空):队列里放入新数据了,唤醒等待的消费者线程取数据not_full(队非满):队列里取走数据有空位了,唤醒等待的生产者线程放数据
4.3 Linux多线程的基本API
4.3.1 pthread_mutex_lock:
int pthread_mutex_lock(pthread_mutex_t *mutex);
第一个传入参数:pthread_mutex_t结构体指针
功能:这个是互斥锁加锁功能,就是每次线程调用的时候都会把锁加上,使其保证访问数据的原子性,直到解锁为止。
4.3.2 pthread_mutex_unlock:
int pthread_mutex_unlock(pthread_mutex_t *mutex);
第一个传入参数:pthread_mutex_t结构体指针
功能:这个是互斥锁解锁功能,就是每次线程访问完资源的时候都会把锁解锁。
4.3.3 pthread_cond_broadcast:
int pthread_cond_broadcast(pthread_cond_t *cond)
传入参数:pthread_cond_t的结构体指针
功能:唤醒所有正在pthread_cond_wait(线程等待)的线程
4.3.4pthread_cond_wait:
int pthread_cond_wait (pthread_cond_t *__restrict __cond , pthread_mutex_t *__restrict __mutex)
第一个参数:pthread_cond_t的结构体指针
第二个参数:pthread_mutex_t结构体指针
功能:线程等待并挂起,若被唤醒了,则直接跳出挂起状态。
五、推流项目中视频队列的实现:
#include "ffmpeg_audio_queue.h" //AUDIO队列的构造器,包含mutex的初始化和条件变量初始化 AUDIO_QUEUE::AUDIO_QUEUE() { pthread_mutex_init(&audioMutex, NULL);//mutex的初始化 pthread_cond_init(&audioCond, NULL);//条件变量初始化 } //AUDIO队列的析构函数,锁的销毁和条件变量的销毁 AUDIO_QUEUE ::~AUDIO_QUEUE() { pthread_mutex_destroy(&audioMutex); pthread_cond_destroy(&audioCond); } //AUDIO_QUEUE的插入音频队列操作 int AUDIO_QUEUE::putAudioPacketQueue(audio_data_packet *audio_packet) { pthread_mutex_lock(&audioMutex);//上音频锁 audio_packet_queue.push(audio_packet);//向音频队列插入audio_data_packet包 pthread_cond_broadcast(&audioCond);//唤醒视音频队列 pthread_mutex_unlock(&audioMutex);//解音频锁 return 0; } //AUDIO_QUEUE取出音频包 audio_data_packet *AUDIO_QUEUE::getAudioPacketQueue() { pthread_mutex_lock(&audioMutex);//上音频锁 while (audio_packet_queue.size() == 0) { pthread_cond_wait(&audioCond, &audioMutex);//当音频队列没有数据的时候,等待被唤醒 } audio_data_packet *item = audio_packet_queue.front();//把音频数据包移到最前面 audio_packet_queue.pop();//pop取出音频数据并删除 pthread_mutex_unlock(&audioMutex);//解音频锁 return item; } //AUDIO_QUEUE音频队列长度 int AUDIO_QUEUE::getAudioPacketQueueSize() { unsigned int count = 0; pthread_mutex_lock(&audioMutex);//上音频锁 count = audio_packet_queue.size();//获取音频队列长度 pthread_mutex_unlock(&audioMutex);//解音频锁 return count; }这段代码是视频队列实现的过程,VIDEO_QUEUE是一个类。这个类里面,封装了添加视频队列(putVideoPacketQueue)、获取视频队列数据(getVideoPacketQueue)、获取视频队列长度(getVideoQueueSize)。
5.1 VIDEO_QUEUE构造器:队列初始化
// VIDEO队列的构造器,包含mutex的初始化和条件变量初始化 VIDEO_QUEUE::VIDEO_QUEUE() { pthread_mutex_init(&videoMutex, NULL); // 初始化互斥锁 pthread_cond_init(&videoCond, NULL); // 初始化条件变量 }这里创建一个VIDEO_QUEUE的C++的构造器,C++构造器主要初始化了线程的量。包括:线程锁的初始化(pthread_mutex_init)、线程条件变量的初始化(pthread_cond_init)。
5.2 析构函数:队列资源释放
// VIDEO队列的析构函数,锁的销毁和条件变量的销毁 VIDEO_QUEUE::~VIDEO_QUEUE() { pthread_mutex_destroy(&videoMutex); // 销毁互斥锁 pthread_cond_destroy(&videoCond); // 销毁条件变量 }销毁函数,队列对象生命周期结束时自动执行,释放锁和条件变量的系统资源,避免资源泄漏。
5.3putVideoPacketQueue入队函数:生产者(编码线程)放数据
//// VIDEO_QUEUE的插入视频队列操作 int VIDEO_QUEUE::putVideoPacketQueue(video_data_packet_t *video_packet) { pthread_mutex_lock(&videoMutex); // 加锁,独占队列访问权 video_packet_queue.push(video_packet); // 把视频帧包指针从队尾加入队列 pthread_cond_broadcast(&videoCond); // 广播唤醒所有等待数据的消费者线程 pthread_mutex_unlock(&videoMutex); // 解锁,释放队列访问权 return 0; }putVideoPacketQueue主要是video_data_packet_t入队的过程,入队前需要加锁pthread_mutex_lock。然后进行入队操作video_packet_queue.push(video_packet),入队完成之后再通知出队线程取出队列数据pthread_cond_broadcast,最后解锁pthread_mutex_unlock。
5.4getVideoPacketQueue出队函数:消费者(推流线程)取数据
// VIDEO_QUEUE取出视频包 video_data_packet_t *VIDEO_QUEUE::getVideoPacketQueue() { pthread_mutex_lock(&videoMutex); // 加锁 while (video_packet_queue.size() == 0) // 队列为空时,循环等待 { pthread_cond_wait(&videoCond, &videoMutex); // 队列为空就阻塞休眠:自动释放锁,被唤醒后自动重新加锁 } video_data_packet_t *item = video_packet_queue.front(); // 读取队首的帧包指针 video_packet_queue.pop(); // 删除队首元素(只删队列里的指针,数据本身还在) pthread_mutex_unlock(&videoMutex); // 解锁 return item; // 返回帧包指针给调用方 }getVideoPacketQueue主要是video_data_packet_t入队的过程,入队前需要加锁pthread_mutex_lock。然后判断视频队列是否有数据(video_packet_queue.size()==0)。若没有数据,则用pthread_cond_wait去等待线程被唤醒。若队列有数据则唤醒的此线程,则直接从队列取出数据。这里取出数据分两步:第一步,先把队列移动到最前面video_packet_queue.front()。第二步,video_packet_queue.pop出队并删除数据。
5.5getVideoQueueSize获取队列长度函数
// VIDEO_QUEUE视频队列长度 int VIDEO_QUEUE::getVideoQueueSize() { unsigned int count = 0; pthread_mutex_lock(&videoMutex); // 加锁 count = video_packet_queue.size(); // 安全读取队列当前元素个数 pthread_mutex_unlock(&videoMutex); // 解锁 return count; }getVideoQueueSize主要是获取当前队列的长度,获取长度的步骤跟上面也差不多。
首先,pthread_mutex_lock加锁,然后通过count = video_packet_queue.size(),获取队列的数量。然后pthread_mutex_unlock解锁。
在本项目中
- 生产者(编码线程):RV1126 VENC 编码出一帧 H.264 → 申请
video_data_packet_t装载数据 → 调用putVideoPacketQueue放入队列 - 消费者(推流线程):调用
getVideoPacketQueue阻塞取帧 → 数据拷贝到 AVPacket → 释放帧结构体 → 计算 PTS 时间戳 → FFmpeg 封装 FLV → RTMP 推送 - 队列在中间做缓冲,平衡编码和推流的速度差,网络轻微抖动不会直接导致硬件编码丢帧。