# 接收与发送队列
在开发过程中,当数据流变大时,消息总是处理不过来。
# 会出现 3 个问题:
1、接收数据太多
2、接收数据连包
3、阻塞式响应
# 咱们一个一个来解决这个问题。
# 1、接收队列
接收数据多,来不及处理下一条就又来了。
解决这个的办法就是接收时放到数组里,然后一条一条的去解析。
这里移植野火的数据队列
食用步骤:
- rx_queue_init ();// 初始化一下
- push_data_to_queue (uint8_t *src_dat,uint16_t src_len);// 这个函数存入数据队列,接收时用
- pull_data_from_queue ();// 这个从队列中取出,处理时用
data_queue.h
#ifndef __ESP_DATA_QUEUE_H_ | |
#define __ESP_DATA_QUEUE_H_ | |
#include "gd32f30x.h" | |
#include <string.h> | |
#include <stdio.h> | |
// 缓冲队列的个数需要为 2 的幂 | |
#define QUEUE_NODE_NUM (8) // 缓冲队列的大小(有多少个缓冲区) | |
#define QUEUE_NODE_DATA_LEN (1024) // 单个接收缓冲区大小 | |
// 队列的主体数据类型接口 | |
#define QUEUE_DATA_TYPE ESP_USART_FRAME | |
// 队列的调试输出接口 | |
#define DATA_QUEUE_LOG QUEUE_DEBUG | |
#define DATA_QUEUE_LOG_ARRAY QUEUE_DEBUG_ARRAY | |
// 数据主体 | |
typedef struct | |
{ | |
uint8_t *head; // 缓冲区头指针 | |
uint16_t len; // 接收到的数据长度 | |
}ESP_USART_FRAME; | |
// 队列结构 | |
typedef struct { | |
int size; /* 缓冲区大小 */ | |
int read; /* 读指针 */ | |
int write; /* 写指针 */ | |
int read_using; /* 正在读取的缓冲区指针 */ | |
int write_using; /* 正在写入的缓冲区指针 */ | |
QUEUE_DATA_TYPE *elems[QUEUE_NODE_NUM]; /* 缓冲区地址 */ | |
} QueueBuffer; | |
extern QueueBuffer rx_queue; | |
/* 信息输出 */ | |
#define QUEUE_DEBUG_ON 1 | |
#define QUEUE_DEBUG_ARRAY_ON 1 | |
#define QUEUE_INFO(fmt,arg...) printf("<<-QUEUE-INFO->> "fmt"\n",##arg) | |
#define QUEUE_ERROR(fmt,arg...) printf("<<-QUEUE-ERROR->> "fmt"\n",##arg) | |
#define QUEUE_DEBUG(fmt,arg...) do{\ | |
if(QUEUE_DEBUG_ON)\ | |
printf("<<-QUEUE-DEBUG->> [%d]"fmt"\n",__LINE__, ##arg);\ | |
}while(0) | |
#define QUEUE_DEBUG_ARRAY(array, num) do{\ | |
int32_t i;\ | |
uint8_t* a = array;\ | |
if(QUEUE_DEBUG_ARRAY_ON)\ | |
{\ | |
printf("\n<<-QUEUE-DEBUG-ARRAY->>\n");\ | |
for (i = 0; i < (num); i++)\ | |
{\ | |
printf("%02x ", (a)[i]);\ | |
if ((i + 1 ) %10 == 0)\ | |
{\ | |
printf("\n");\ | |
}\ | |
}\ | |
printf("\n");\ | |
}\ | |
}while(0) | |
// 输出队列的状态信息 | |
#define cbPrint(cb) DATA_QUEUE_LOG("size=0x%x, read=%d, write=%d\n", cb.size, cb.read, cb.write);\ | |
DATA_QUEUE_LOG("size=0x%x, read_using=%d, write_using=%d\n", cb.size, cb.read_using, cb.write_using); | |
QUEUE_DATA_TYPE* cbWrite(QueueBuffer *cb); | |
QUEUE_DATA_TYPE* cbRead(QueueBuffer *cb); | |
void cbReadFinish(QueueBuffer *cb); | |
void cbWriteFinish(QueueBuffer *cb); | |
//void cbPrint(QueueBuffer *cb); | |
QUEUE_DATA_TYPE* cbWriteUsing(QueueBuffer *cb) ; | |
int cbIsFull(QueueBuffer *cb) ; | |
int cbIsEmpty(QueueBuffer *cb) ; | |
void rx_queue_init(void); | |
void pull_data_from_queue(void); | |
void push_data_to_queue(uint8_t *src_dat,uint16_t src_len); | |
#endif |
data_queue.c
/** | |
****************************************************************************** | |
* @file rx_data_queue.c | |
* @author fire | |
* @version V1.0 | |
* @date 2015-01-xx | |
* @brief 环形缓冲区,适用于接收外部数据时用作缓冲 | |
****************************************************************************** | |
* @attention | |
* | |
* 实验平台:野火 IOT STM32 开发板 | |
* 论坛 :http://www.firebbs.cn | |
* 淘宝 :https://fire-stm32.taobao.com | |
* | |
****************************************************************************** | |
*/ | |
#include "data_queue.h" | |
// 实例化节点数据类型 | |
QUEUE_DATA_TYPE node_data[QUEUE_NODE_NUM]; | |
// 实例化队列类型 | |
QueueBuffer rx_queue; | |
// 队列缓冲区的内存池 | |
__align(4) uint8_t node_buff[QUEUE_NODE_NUM][QUEUE_NODE_DATA_LEN] ; | |
/* 环形缓冲队列 */ | |
/** | |
* @brief 初始化缓冲队列 | |
* @param cb: 缓冲队列结构体 | |
* @param size: 缓冲队列的元素个数 | |
* @note 初始化时还需要给 cb->elems 指针赋值 | |
*/ | |
void cbInit(QueueBuffer *cb, int size) | |
{ | |
cb->size = size; /* maximum number of elements */ | |
cb->read = 0; /* index of oldest element */ | |
cb->write = 0; /* index at which to write new element */ | |
// cb->elems = (uint8_t *) calloc (cb->size, sizeof (uint8_t)); //elems 要额外初始化 | |
} | |
/** | |
* @brief 判断缓冲队列是 (1) 否 (0) 已满 | |
* @param cb: 缓冲队列结构体 | |
*/ | |
int cbIsFull(QueueBuffer *cb) | |
{ | |
return cb->write == (cb->read ^ cb->size); /* This inverts the most significant bit of read before comparison */ | |
} | |
/** | |
* @brief 判断缓冲队列是 (1) 否 (0) 全空 | |
* @param cb: 缓冲队列结构体 | |
*/ | |
int cbIsEmpty(QueueBuffer *cb) | |
{ | |
return cb->write == cb->read; | |
} | |
/** | |
* @brief 对缓冲队列的指针加 1 | |
* @param cb: 缓冲队列结构体 | |
* @param p:要加 1 的指针 | |
* @return 返回加 1 的结果 | |
*/ | |
int cbIncr(QueueBuffer *cb, int p) | |
{ | |
return (p + 1) & (2 * cb->size - 1); /* read and write pointers incrementation is done modulo 2*size */ | |
} | |
/** | |
* @brief 获取可写入的缓冲区指针 | |
* @param cb: 缓冲队列结构体 | |
* @return 可进行写入的缓冲区指针 | |
* @note 得到指针后可进入写入操作,但写指针不会立即加 1, | |
写完数据时,应调用 cbWriteFinish 对写指针加 1 | |
*/ | |
QUEUE_DATA_TYPE *cbWrite(QueueBuffer *cb) | |
{ | |
if (cbIsFull(cb)) /* full, overwrite moves read pointer */ | |
{ | |
return 0; | |
} | |
else | |
{ | |
// 当 wriet 和 write_using 相等时,表示上一个缓冲区已写入完毕,需要对写指针加 1 | |
if(cb->write == cb->write_using) | |
{ | |
cb->write_using = cbIncr(cb, cb->write); // 未满,则增加 1 | |
} | |
} | |
return cb->elems[cb->write_using & (cb->size - 1)]; | |
} | |
/** | |
* @brief 数据写入完毕,更新写指针到缓冲结构体 | |
* @param cb: 缓冲队列结构体 | |
*/ | |
void cbWriteFinish(QueueBuffer *cb) | |
{ | |
cb->write = cb->write_using; | |
} | |
/** | |
* @brief 获取可读取的缓冲区指针 | |
* @param cb: 缓冲队列结构体 | |
* @return 可进行读取的缓冲区指针 | |
* @note 得到指针后可进入读取操作,但读指针不会立即加 1, | |
读取完数据时,应调用 cbReadFinish 对读指针加 1 | |
*/ | |
QUEUE_DATA_TYPE *cbRead(QueueBuffer *cb) | |
{ | |
if(cbIsEmpty(cb)) | |
return NULL; | |
// 当 read 和 read_using 相等时,表示上一个缓冲区已读取完毕 (即已调用 cbReadFinish), | |
// 需要对写指针加 1 | |
if(cb->read == cb->read_using) | |
cb->read_using = cbIncr(cb, cb->read); | |
return cb->elems[cb->read_using & (cb->size - 1)]; | |
} | |
/** | |
* @brief 数据读取完毕,更新读指针到缓冲结构体 | |
* @param cb: 缓冲队列结构体 | |
*/ | |
void cbReadFinish(QueueBuffer *cb) | |
{ | |
// 重置当前读完的数据节点的长度 | |
cb->elems[cb->read_using & (cb->size - 1)]->len = 0; | |
cb->read = cb->read_using; | |
} | |
// 队列的指针指向的缓冲区全部销毁 | |
void camera_queue_free(void) | |
{ | |
uint32_t i = 0; | |
for(i = 0; i < QUEUE_NODE_NUM; i ++) | |
{ | |
if(node_data[i].head != NULL) | |
{ | |
// 若是动态申请的空间才要 free | |
//free(node_data[i].head); | |
node_data[i].head = NULL; | |
} | |
} | |
return; | |
} | |
/** | |
* @brief 缓冲队列初始化,分配内存,使用缓冲队列时, | |
* @param 无 | |
* @retval 无 | |
*/ | |
void rx_queue_init(void) | |
{ | |
uint32_t i = 0; | |
memset(node_data, 0, sizeof(node_data)); | |
/* 初始化缓冲队列 */ | |
cbInit(&rx_queue, QUEUE_NODE_NUM); | |
for(i = 0; i < QUEUE_NODE_NUM; i ++) | |
{ | |
node_data[i].head = node_buff[i]; | |
/* 初始化队列缓冲指针,指向实际的内存 */ | |
rx_queue.elems[i] = &node_data[i]; | |
//DATA_QUEUE_LOG("node_data[i].head=0x%x,\r\nrx_queue.elems[i] =0x%x", (uint32_t)node_data[i].head,(uint32_t)rx_queue.elems[i]->head); | |
memset(node_data[i].head, 0, QUEUE_NODE_DATA_LEN); | |
} | |
cbPrint(rx_queue); | |
} | |
/** | |
* @brief 往队列中写入数据的样例 | |
*/ | |
void push_data_to_queue(uint8_t *src_dat, uint16_t src_len) | |
{ | |
QUEUE_DATA_TYPE *data_p; | |
uint16_t i; | |
for(i = 0; i < src_len; i++) | |
{ | |
/* 获取写缓冲区指针,准备写入新数据 */ | |
data_p = cbWrite(&rx_queue); | |
if (data_p != 0) // 若缓冲队列未满,开始传输 | |
{ | |
// 往缓冲区写入数据,如使用串口接收、dma 写入等方式 | |
*(data_p->head + i) = src_dat[i]; | |
data_p->len++; | |
//printf("\r\ndata_p->len =%d",data_p->len); | |
} | |
else return; | |
//cbPrint(rx_queue); | |
} | |
/* 写入缓冲区完毕 */ | |
cbWriteFinish(&rx_queue); | |
//cbPrint(rx_queue); | |
} | |
extern uint8_t datasource; | |
extern void vprotocolplatformProcess(uint8_t *DataBuffer, uint32_t DataLen); | |
extern ControllerFun_union ControllerFun; | |
extern ControllerID_union ControllerID; | |
extern BufferReceive_union bufferCmd; | |
extern bool vprotocolplatformCheckID(void); | |
extern void vprotocolplatformParsing(void); | |
extern bool vprotocolplatformCheckIsFunofCheckTime(void); | |
extern bool BufferRecieveFlag; | |
/** | |
* @brief 从队列中取数据的样例 | |
*/ | |
void pull_data_from_queue(void) | |
{ | |
QUEUE_DATA_TYPE *rx_data; | |
/* 从缓冲区读取数据,进行处理,*/ | |
rx_data = cbRead(&rx_queue); | |
if(rx_data != NULL)// 缓冲队列非空 | |
{ | |
// 处理接收数据 ------------------------------ 在这处理数据 ------------------------------------------------ | |
// 数据头指针 rx_data->head,数据长度 rx_data->len | |
// 使用完数据必须调用 cbReadFinish 更新读指针 -------------------------------------------------------------- | |
cbReadFinish(&rx_queue); | |
} | |
} |
到这里,大家可以试一下,是不是非常好用。感谢野火官方。大家可以去看看野火的教程,真的非常细!良心推荐。
# 2、数据连包
两条数据连接一起了
// 这个看大家的数据格式了。 | |
// 根据自己的数据格式自行解析吧! |
# 3、发送队列
既然可以接收,那么也可以发送啦😊
食用步骤:
1、vsendqueueInit ();// 初始化一下
2、vsendqueueSendQueue ();// 队列发送,这个放在主循环中
3、bsendqueueInsert (uint8_t *data,uint32_t len);// 插入数据,等待发送
根据野火的例程,随手写了一下。逻辑图如下:
sendqueue.h
#ifndef __SENDQUEUE_H | |
#define __SENDQUEUE_H | |
#include "gd32f30x.h" | |
#include <stdbool.h> | |
// 队列的数量 | |
#define SEND_QUEUE_DATA_NUMBER (16) //8 | |
// 单个队列的长度 | |
#define SEND_QUEUE_DATA_SIZE (256) //1024 | |
// 用于存储数据的信息 | |
typedef struct{ | |
uint8_t *head; | |
uint16_t len; | |
}SEND_QUEUE_DATA_STRUCT; | |
// 用于调整数据发送的间隔 | |
typedef struct{ | |
uint32_t time; | |
uint32_t send_interval; | |
}SEND_QUEUE_TIME_STRUCT; | |
// 存储信息 | |
typedef struct{ | |
uint32_t send_c;// 发送计数 | |
uint32_t storage_c;// 存储计数 | |
uint8_t *send_p;// 发送指针 数据是连续存储的 | |
uint8_t *storage_p;// 存储指针 | |
// 数据存放的数组 | |
uint8_t qbuffer[SEND_QUEUE_DATA_NUMBER][SEND_QUEUE_DATA_SIZE]; | |
// 时间 | |
SEND_QUEUE_TIME_STRUCT TIME; | |
// 队列的头指针和数据长度 | |
SEND_QUEUE_DATA_STRUCT DATA[SEND_QUEUE_DATA_NUMBER]; | |
}SEND_QUEUE_CONTROLLER_STRUCT; | |
// 初始化 | |
void vsendqueueInit(void); | |
// 队列发送,这个放在主循环中 | |
void vsendqueueSendQueue(void); | |
// 插入数据 | |
bool bsendqueueInsert(uint8_t *data,uint32_t len); | |
#endif //__SENDQUEUE_H |
sendqueue.c
#include "../User/FUN/sendqueue/sendqueue.h" | |
#include "../User/BSP/TIMER1/timer.h" | |
#include "controllerconfig.h" | |
#include "stdio.h" | |
#include "stdlib.h" | |
#include <string.h> | |
// 如果开启了这个功能,再编译,如果都没有,代码是灰色的,这个是为了模块化 | |
#if ((USE_SEND_BUFFER_QUEUE_RS485==1))||((USE_SEND_BUFFER_QUEUE_LORA==1)) | |
//timer.c | |
extern Timer_struct FlechazoTimer; | |
//1、a manage struct | |
SEND_QUEUE_CONTROLLER_STRUCT S_QUEUE_CTL; | |
bool bsendqueueSend(void); | |
void vsendqueueInit(void) | |
{ | |
// 时间计数值 | |
S_QUEUE_CTL.TIME.time = FlechazoTimer.Control.GetCurrentTime_ms(); | |
// 每次进入发送的时间间隔 | |
S_QUEUE_CTL.TIME.send_interval = 200; | |
S_QUEUE_CTL.send_c = 0; | |
S_QUEUE_CTL.storage_c = 0; | |
S_QUEUE_CTL.send_p = S_QUEUE_CTL.qbuffer[0]; | |
S_QUEUE_CTL.storage_p = S_QUEUE_CTL.qbuffer[0]; | |
} | |
//2、a send queue buffer mode---lora or rs485 ? | |
void vsendqueueSendQueue(void) | |
{ | |
// 判断是否超时 | |
if(FlechazoTimer.Control.IsTimeOut_ms(&S_QUEUE_CTL.TIME.time, S_QUEUE_CTL.TIME.send_interval)) | |
{ | |
//1、have data 存储和发送不相等就是有数据 | |
if(S_QUEUE_CTL.storage_c != S_QUEUE_CTL.send_c) | |
{ | |
bsendqueueSend(); | |
} | |
S_QUEUE_CTL.TIME.time = FlechazoTimer.Control.GetCurrentTime_ms(); | |
} | |
} | |
//3、insert queue | |
bool bsendqueueInsert(uint8_t *data, uint32_t len) | |
{ | |
//pos storage | |
if((++S_QUEUE_CTL.storage_c) > (SEND_QUEUE_DATA_NUMBER)) | |
{ | |
//return false; | |
S_QUEUE_CTL.storage_c = 1; | |
S_QUEUE_CTL.storage_p = S_QUEUE_CTL.qbuffer[0]; | |
} | |
//insert | |
memcpy(S_QUEUE_CTL.storage_p, data, len); | |
//storage info | |
S_QUEUE_CTL.DATA[S_QUEUE_CTL.storage_c - 1].head = S_QUEUE_CTL.storage_p; | |
S_QUEUE_CTL.DATA[S_QUEUE_CTL.storage_c - 1].len = len; | |
S_QUEUE_CTL.storage_p += len; | |
return true; | |
} | |
//4、send queue | |
bool bsendqueueSend(void) | |
{ | |
//2、check send status | |
//485 | |
if(SET != usart_flag_get(rUSARTx, USART_FLAG_BSY)) | |
{ | |
//3、send | |
if((++S_QUEUE_CTL.send_c) > SEND_QUEUE_DATA_NUMBER) | |
{ | |
S_QUEUE_CTL.send_c = 1; | |
} | |
//485 | |
vrs485Send(S_QUEUE_CTL.DATA[S_QUEUE_CTL.send_c-1].head, S_QUEUE_CTL.DATA[S_QUEUE_CTL.send_c-1].len); | |
} | |
return true; | |
} | |
#endif //((USE_SEND_BUFFER_QUEUE_RS485==1))||((USE_SEND_BUFFER_QUEUE_LORA==1)) |
那么到这,你就可以试一下啦!
连续快速发 5 条,看看他的回复。
终于可以愉快的处理大数据了!
当然你也可以再加上 DMA,没有频繁的中断响应,反应速度更快啦!
QT键值对处理协议解析
https://zhuanlan.zhihu.com/p/643181004