# 接收与发送队列

在开发过程中,当数据流变大时,消息总是处理不过来。

# 会出现 3 个问题:

1、接收数据太多

2、接收数据连包

3、阻塞式响应

# 咱们一个一个来解决这个问题。

# 1、接收队列

接收数据多,来不及处理下一条就又来了。

解决这个的办法就是接收时放到数组里,然后一条一条的去解析。

这里移植野火的数据队列

食用步骤:

  • rx_queue_init ();// 初始化一下
  • push_data_to_queue (uint8_t *src_dat,uint16_t src_len);// 这个函数存入数据队列,接收时用
  • pull_data_from_queue ();// 这个从队列中取出,处理时用

data_queue.h

#ifndef __ESP_DATA_QUEUE_H_
#define __ESP_DATA_QUEUE_H_
#include "gd32f30x.h"
#include <string.h>
#include <stdio.h>
// 缓冲队列的个数需要为 2 的幂
#define QUEUE_NODE_NUM        (8)            // 缓冲队列的大小(有多少个缓冲区)
#define QUEUE_NODE_DATA_LEN   (1024)       // 单个接收缓冲区大小
// 队列的主体数据类型接口
#define QUEUE_DATA_TYPE  				ESP_USART_FRAME
// 队列的调试输出接口
#define DATA_QUEUE_LOG  				QUEUE_DEBUG
#define DATA_QUEUE_LOG_ARRAY 		QUEUE_DEBUG_ARRAY
// 数据主体
typedef struct 
{
	uint8_t  *head; 	// 缓冲区头指针
	uint16_t len; // 接收到的数据长度
}ESP_USART_FRAME;
// 队列结构
typedef struct {
	int size;  /* 缓冲区大小 */
	int read; /* 读指针 */
	int write;   /* 写指针 */
	int read_using;	/* 正在读取的缓冲区指针 */
	int write_using;		/* 正在写入的缓冲区指针 */
	QUEUE_DATA_TYPE *elems[QUEUE_NODE_NUM];  /* 缓冲区地址 */
} QueueBuffer;
extern QueueBuffer rx_queue;
/* 信息输出 */
#define QUEUE_DEBUG_ON          1
#define QUEUE_DEBUG_ARRAY_ON		1
#define QUEUE_INFO(fmt,arg...)           printf("<<-QUEUE-INFO->> "fmt"\n",##arg)
#define QUEUE_ERROR(fmt,arg...)          printf("<<-QUEUE-ERROR->> "fmt"\n",##arg)
#define QUEUE_DEBUG(fmt,arg...)          do{\
                                          if(QUEUE_DEBUG_ON)\
                                          printf("<<-QUEUE-DEBUG->> [%d]"fmt"\n",__LINE__, ##arg);\
                                          }while(0)
#define QUEUE_DEBUG_ARRAY(array, num)    do{\
										   int32_t i;\
                                           uint8_t* a = array;\
                                           if(QUEUE_DEBUG_ARRAY_ON)\
                                           {\
                                             	printf("\n<<-QUEUE-DEBUG-ARRAY->>\n");\
                                             	for (i = 0; i < (num); i++)\
                                                {\
                                                    printf("%02x   ", (a)[i]);\
                                                    if ((i + 1 ) %10 == 0)\
                                                    {\
                                                        printf("\n");\
                                                    }\
                                                }\
                                            	printf("\n");\
                                            }\
                                           }while(0)	
// 输出队列的状态信息
#define cbPrint(cb)		    DATA_QUEUE_LOG("size=0x%x, read=%d, write=%d\n", cb.size, cb.read, cb.write);\
	  DATA_QUEUE_LOG("size=0x%x, read_using=%d, write_using=%d\n", cb.size, cb.read_using, cb.write_using);
QUEUE_DATA_TYPE* cbWrite(QueueBuffer *cb);
QUEUE_DATA_TYPE* cbRead(QueueBuffer *cb);
void cbReadFinish(QueueBuffer *cb);
void cbWriteFinish(QueueBuffer *cb);
//void cbPrint(QueueBuffer *cb);
QUEUE_DATA_TYPE* cbWriteUsing(QueueBuffer *cb) ;
int cbIsFull(QueueBuffer *cb) ; 
int cbIsEmpty(QueueBuffer *cb) ;
void rx_queue_init(void);
void pull_data_from_queue(void);
void push_data_to_queue(uint8_t *src_dat,uint16_t src_len);
#endif

data_queue.c

/**
  ******************************************************************************
  * @file    rx_data_queue.c
  * @author  fire
  * @version V1.0
  * @date    2015-01-xx
  * @brief   环形缓冲区,适用于接收外部数据时用作缓冲
  ******************************************************************************
  * @attention
  *
  * 实验平台:野火 IOT STM32 开发板
  * 论坛    :http://www.firebbs.cn
  * 淘宝    :https://fire-stm32.taobao.com
  *
  ******************************************************************************
  */
#include "data_queue.h"
// 实例化节点数据类型
QUEUE_DATA_TYPE  node_data[QUEUE_NODE_NUM];
// 实例化队列类型
QueueBuffer rx_queue;
// 队列缓冲区的内存池
__align(4) uint8_t node_buff[QUEUE_NODE_NUM][QUEUE_NODE_DATA_LEN] ;
/* 环形缓冲队列 */
/**
  * @brief  初始化缓冲队列
  * @param  cb: 缓冲队列结构体
  * @param  size: 缓冲队列的元素个数
  * @note 	初始化时还需要给 cb->elems 指针赋值
  */
void cbInit(QueueBuffer *cb, int size)
{
    cb->size  = size;	/* maximum number of elements           */
    cb->read = 0; 		/* index of oldest element              */
    cb->write   = 0; 	 	/* index at which to write new element  */
    //    cb->elems = (uint8_t *) calloc (cb->size, sizeof (uint8_t));  //elems 要额外初始化
}
/**
  * @brief  判断缓冲队列是 (1) 否 (0) 已满
  * @param  cb: 缓冲队列结构体
  */
int cbIsFull(QueueBuffer *cb)
{
    return cb->write == (cb->read ^ cb->size); /* This inverts the most significant bit of read before comparison */
}
/**
  * @brief  判断缓冲队列是 (1) 否 (0) 全空
  * @param  cb: 缓冲队列结构体
  */
int cbIsEmpty(QueueBuffer *cb)
{
    return cb->write == cb->read;
}
/**
  * @brief  对缓冲队列的指针加 1
  * @param  cb: 缓冲队列结构体
  * @param  p:要加 1 的指针
  * @return  返回加 1 的结果
  */
int cbIncr(QueueBuffer *cb, int p)
{
    return (p + 1) & (2 * cb->size - 1); /* read and write pointers incrementation is done modulo 2*size */
}
/**
  * @brief  获取可写入的缓冲区指针
  * @param  cb: 缓冲队列结构体
  * @return  可进行写入的缓冲区指针
  * @note  得到指针后可进入写入操作,但写指针不会立即加 1,
           写完数据时,应调用 cbWriteFinish 对写指针加 1
  */
QUEUE_DATA_TYPE *cbWrite(QueueBuffer *cb)
{
    if (cbIsFull(cb)) /* full, overwrite moves read pointer */
    {
        return 0;
    }
    else
    {
        // 当 wriet 和 write_using 相等时,表示上一个缓冲区已写入完毕,需要对写指针加 1
        if(cb->write == cb->write_using)
        {
            cb->write_using = cbIncr(cb, cb->write); // 未满,则增加 1
        }
    }
    return  cb->elems[cb->write_using & (cb->size - 1)];
}
/**
  * @brief 数据写入完毕,更新写指针到缓冲结构体
  * @param  cb: 缓冲队列结构体
  */
void cbWriteFinish(QueueBuffer *cb)
{
    cb->write = cb->write_using;
}
/**
  * @brief  获取可读取的缓冲区指针
  * @param  cb: 缓冲队列结构体
  * @return  可进行读取的缓冲区指针
  * @note  得到指针后可进入读取操作,但读指针不会立即加 1,
					 读取完数据时,应调用 cbReadFinish 对读指针加 1
  */
QUEUE_DATA_TYPE *cbRead(QueueBuffer *cb)
{
    if(cbIsEmpty(cb))
        return NULL;
    // 当 read 和 read_using 相等时,表示上一个缓冲区已读取完毕 (即已调用 cbReadFinish),
    // 需要对写指针加 1
    if(cb->read == cb->read_using)
        cb->read_using = cbIncr(cb, cb->read);
    return cb->elems[cb->read_using & (cb->size - 1)];
}
/**
  * @brief 数据读取完毕,更新读指针到缓冲结构体
  * @param  cb: 缓冲队列结构体
  */
void cbReadFinish(QueueBuffer *cb)
{
    // 重置当前读完的数据节点的长度
    cb->elems[cb->read_using & (cb->size - 1)]->len = 0;
    cb->read = cb->read_using;
}
// 队列的指针指向的缓冲区全部销毁
void camera_queue_free(void)
{
    uint32_t i = 0;
    for(i = 0; i < QUEUE_NODE_NUM; i ++)
    {
        if(node_data[i].head != NULL)
        {
            // 若是动态申请的空间才要 free
            //free(node_data[i].head);
            node_data[i].head = NULL;
        }
    }
    return;
}
/**
  * @brief  缓冲队列初始化,分配内存,使用缓冲队列时,
  * @param  无
  * @retval 无
  */
void rx_queue_init(void)
{
    uint32_t i = 0;
    memset(node_data, 0, sizeof(node_data));
    /* 初始化缓冲队列 */
    cbInit(&rx_queue, QUEUE_NODE_NUM);
    for(i = 0; i < QUEUE_NODE_NUM; i ++)
    {
        node_data[i].head = node_buff[i];
        /* 初始化队列缓冲指针,指向实际的内存 */
        rx_queue.elems[i] = &node_data[i];
        //DATA_QUEUE_LOG("node_data[i].head=0x%x,\r\nrx_queue.elems[i] =0x%x", (uint32_t)node_data[i].head,(uint32_t)rx_queue.elems[i]->head);
        memset(node_data[i].head, 0, QUEUE_NODE_DATA_LEN);
    }
    cbPrint(rx_queue);
}
/**
  * @brief  往队列中写入数据的样例
  */
void push_data_to_queue(uint8_t *src_dat, uint16_t src_len)
{
    QUEUE_DATA_TYPE *data_p;
    uint16_t i;
    for(i = 0; i < src_len; i++)
    {
        /* 获取写缓冲区指针,准备写入新数据 */
        data_p = cbWrite(&rx_queue);
        if (data_p != 0)	// 若缓冲队列未满,开始传输
        {
            // 往缓冲区写入数据,如使用串口接收、dma 写入等方式
            *(data_p->head + i) = src_dat[i];
            data_p->len++;
            //printf("\r\ndata_p->len =%d",data_p->len);
        }
        else return;
        //cbPrint(rx_queue);
    }
    /* 写入缓冲区完毕 */
    cbWriteFinish(&rx_queue);
    //cbPrint(rx_queue);
}
extern uint8_t datasource;
extern void vprotocolplatformProcess(uint8_t *DataBuffer, uint32_t DataLen);
extern ControllerFun_union ControllerFun;
extern ControllerID_union ControllerID;
extern BufferReceive_union bufferCmd;
extern bool vprotocolplatformCheckID(void);
extern void vprotocolplatformParsing(void);
extern bool vprotocolplatformCheckIsFunofCheckTime(void);
extern bool BufferRecieveFlag;
/**
  * @brief  从队列中取数据的样例
  */
void pull_data_from_queue(void)
{
    QUEUE_DATA_TYPE *rx_data;
    /* 从缓冲区读取数据,进行处理,*/
    rx_data = cbRead(&rx_queue);
    if(rx_data != NULL)// 缓冲队列非空
    {
        // 处理接收数据 ------------------------------ 在这处理数据 ------------------------------------------------
        // 数据头指针 rx_data->head,数据长度 rx_data->len
        
        // 使用完数据必须调用 cbReadFinish 更新读指针 --------------------------------------------------------------
        cbReadFinish(&rx_queue);
    }
}

到这里,大家可以试一下,是不是非常好用。感谢野火官方。大家可以去看看野火的教程,真的非常细!良心推荐。

# 2、数据连包

两条数据连接一起了

// 这个看大家的数据格式了。
// 根据自己的数据格式自行解析吧!

# 3、发送队列

既然可以接收,那么也可以发送啦😊

食用步骤:

1、vsendqueueInit ();// 初始化一下

2、vsendqueueSendQueue ();// 队列发送,这个放在主循环中

3、bsendqueueInsert (uint8_t *data,uint32_t len);// 插入数据,等待发送

根据野火的例程,随手写了一下。逻辑图如下:

image-20230801143036999

sendqueue.h

#ifndef __SENDQUEUE_H
#define __SENDQUEUE_H
#include "gd32f30x.h"
#include <stdbool.h>
// 队列的数量
#define SEND_QUEUE_DATA_NUMBER (16)  //8
// 单个队列的长度
#define SEND_QUEUE_DATA_SIZE   (256) //1024
// 用于存储数据的信息
typedef struct{
	uint8_t  *head;
	uint16_t len;
}SEND_QUEUE_DATA_STRUCT;
// 用于调整数据发送的间隔
typedef struct{
	uint32_t time;
	uint32_t send_interval;
}SEND_QUEUE_TIME_STRUCT;
// 存储信息
typedef struct{
	uint32_t send_c;// 发送计数
	uint32_t storage_c;// 存储计数
	uint8_t *send_p;// 发送指针    数据是连续存储的
	uint8_t *storage_p;// 存储指针
	// 数据存放的数组
	uint8_t qbuffer[SEND_QUEUE_DATA_NUMBER][SEND_QUEUE_DATA_SIZE];
	// 时间
	SEND_QUEUE_TIME_STRUCT TIME;
	// 队列的头指针和数据长度
	SEND_QUEUE_DATA_STRUCT DATA[SEND_QUEUE_DATA_NUMBER];
}SEND_QUEUE_CONTROLLER_STRUCT;
// 初始化
void vsendqueueInit(void);
// 队列发送,这个放在主循环中
void vsendqueueSendQueue(void);
// 插入数据
bool bsendqueueInsert(uint8_t *data,uint32_t len);
#endif   //__SENDQUEUE_H

sendqueue.c

#include "../User/FUN/sendqueue/sendqueue.h"
#include "../User/BSP/TIMER1/timer.h"
#include "controllerconfig.h"
#include "stdio.h"
#include "stdlib.h"
#include <string.h>
// 如果开启了这个功能,再编译,如果都没有,代码是灰色的,这个是为了模块化
#if ((USE_SEND_BUFFER_QUEUE_RS485==1))||((USE_SEND_BUFFER_QUEUE_LORA==1))
//timer.c
extern Timer_struct FlechazoTimer;
//1、a manage struct
SEND_QUEUE_CONTROLLER_STRUCT S_QUEUE_CTL;
bool bsendqueueSend(void);
void vsendqueueInit(void)
{
		// 时间计数值
    S_QUEUE_CTL.TIME.time = FlechazoTimer.Control.GetCurrentTime_ms();
		// 每次进入发送的时间间隔
    S_QUEUE_CTL.TIME.send_interval = 200;
    S_QUEUE_CTL.send_c = 0;
    S_QUEUE_CTL.storage_c = 0;
    S_QUEUE_CTL.send_p = S_QUEUE_CTL.qbuffer[0];
    S_QUEUE_CTL.storage_p = S_QUEUE_CTL.qbuffer[0];
}
//2、a send queue buffer    mode---lora or rs485 ?
void vsendqueueSendQueue(void)
{
		// 判断是否超时
    if(FlechazoTimer.Control.IsTimeOut_ms(&S_QUEUE_CTL.TIME.time, S_QUEUE_CTL.TIME.send_interval))
    {
				//1、have data 存储和发送不相等就是有数据
        if(S_QUEUE_CTL.storage_c != S_QUEUE_CTL.send_c)
        {
            bsendqueueSend();
        }
        S_QUEUE_CTL.TIME.time = FlechazoTimer.Control.GetCurrentTime_ms();
    }
}
//3、insert queue
bool bsendqueueInsert(uint8_t *data, uint32_t len)
{
    //pos storage
    if((++S_QUEUE_CTL.storage_c) > (SEND_QUEUE_DATA_NUMBER))
    {
				//return false;
        S_QUEUE_CTL.storage_c = 1;
        S_QUEUE_CTL.storage_p = S_QUEUE_CTL.qbuffer[0];
    }
    //insert
    memcpy(S_QUEUE_CTL.storage_p, data, len);
    //storage info
    S_QUEUE_CTL.DATA[S_QUEUE_CTL.storage_c - 1].head = S_QUEUE_CTL.storage_p;
    S_QUEUE_CTL.DATA[S_QUEUE_CTL.storage_c - 1].len = len;
    S_QUEUE_CTL.storage_p += len;
    return true;
}
//4、send queue
bool bsendqueueSend(void)
{
    //2、check send status
		//485
		if(SET != usart_flag_get(rUSARTx, USART_FLAG_BSY))
		{
				//3、send
				if((++S_QUEUE_CTL.send_c) > SEND_QUEUE_DATA_NUMBER)
				{
						S_QUEUE_CTL.send_c = 1;
				}
				//485
				vrs485Send(S_QUEUE_CTL.DATA[S_QUEUE_CTL.send_c-1].head, S_QUEUE_CTL.DATA[S_QUEUE_CTL.send_c-1].len);
		}
		return true;
}
#endif //((USE_SEND_BUFFER_QUEUE_RS485==1))||((USE_SEND_BUFFER_QUEUE_LORA==1))

那么到这,你就可以试一下啦!

连续快速发 5 条,看看他的回复。

image-20230802085116169

终于可以愉快的处理大数据了!

当然你也可以再加上 DMA,没有频繁的中断响应,反应速度更快啦!

更新于 阅读次数

请我喝[茶]~( ̄▽ ̄)~*

flechazo 微信支付

微信支付

flechazo 支付宝

支付宝

flechazo 贝宝

贝宝