zky_mandate/internal/app/mqueue/mqueue_test.go
2023-08-11 18:00:35 +08:00

169 lines
3.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* @Company: 云南奇讯科技有限公司
* @Author: yxf
* @Description:
* @Date: 2023/7/7 11:26
*/
package mqueue
import (
"bytes"
"fmt"
"github.com/tiger1103/gfast/v3/internal/app/mqueue/consts"
"github.com/tiger1103/gfast/v3/internal/app/mqueue/logic/diskqueue"
_ "github.com/tiger1103/gfast/v3/internal/app/mqueue/logic/mqueue"
"github.com/tiger1103/gfast/v3/internal/app/mqueue/model"
"github.com/tiger1103/gfast/v3/internal/app/mqueue/service"
"sync"
"testing"
"time"
)
var mBody = []byte("gfast-mqueue 测试消息队列内容")
var wg = sync.WaitGroup{}
const (
TOPIC = "producer_test6"
SENDCOUNT = 10
// 最好等待10秒来刷盘或更新rocketmq消费偏移
TIMEOUT = 10
)
// channel 在rocketmq中可用,delay 1-18 对应时间: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
func producer(topic string, delay consts.MsgDelayLevel, timeout time.Duration, t *testing.T) {
mq := service.MQueue()
if mq == nil {
t.Error("get mQueue failed")
return
}
fmt.Println("start send msg")
t1 := time.Now()
pCount := 0
for i := 0; i < SENDCOUNT; i++ {
msg := &model.MQSendMsg{
Topic: topic,
Body: mBody,
}
if delay > 0 {
msg.Delay = delay
msg.SendMethod = consts.SendMsgDelay
}
err := mq.SendMsg(msg)
if err != nil {
t.Error(err)
return
}
pCount++
}
fmt.Println(fmt.Sprintf("发送数据 %d 条,耗时:%f", pCount, time.Since(t1).Seconds()))
// 如果是diskqueue至少等待一次刷盘时避免未刷就退出主线程
time.Sleep(timeout * time.Second)
}
func consumer(topic, channel string, timeout time.Duration, t *testing.T) {
fmt.Println(fmt.Sprintf("消费者 %s %s 启动", topic, channel))
cChan := make(chan []byte)
cCount := 0
go func() {
for {
select {
case <-cChan:
cCount++
}
}
}()
mq := service.MQueue()
if mq == nil {
t.Error("get mQueue failed")
}
// 订阅消息
err := mq.Subscribe(topic, channel, func(m *model.MQMessage) error {
if !bytes.Equal(mBody, m.Body) {
fmt.Println(fmt.Sprintf("消费者1第 %d 条数据错误", cCount))
}
cChan <- m.Body
//fmt.Println(m.ID, m.Timestamp)
return nil
})
if err != nil {
t.Error("消息订阅失败:" + err.Error())
return
}
// 至少等待一次刷盘或同步消费偏移,避免未刷就退出主线程
time.Sleep(timeout * time.Second)
fmt.Println(fmt.Sprintf("%s %s 消费数据 %d 条", topic, channel, cCount))
}
// 测试生产者
func TestProducer(t *testing.T) {
producer(TOPIC, 0, 5, t)
}
// 测试生产者
func TestProducerDelay(t *testing.T) {
fmt.Println("开始发送延迟消息")
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
producer("produce_delay_test1", consts.MsgDelay5s, 5, t)
wg.Done()
}()
go func() {
consumer("produce_delay_test1", "channel", 20, t)
wg.Done()
}()
wg.Wait()
}
// 测试消费者
func TestConsumer(t *testing.T) {
consumer(TOPIC, "channel", 120, t)
}
// 测试多个消费者
func TestMultiConsumer(t *testing.T) {
wg.Add(3)
go func() {
consumer(TOPIC, "channel", TIMEOUT, t)
wg.Done()
}()
go func() {
consumer(TOPIC, "channel1", TIMEOUT, t)
wg.Done()
}()
go func() {
consumer(TOPIC, "channel2", TIMEOUT, t)
wg.Done()
}()
wg.Wait()
}
// 同时测试生产者和消费者
func TestProducerAndConsumer(t *testing.T) {
wg.Add(4)
go func() {
producer(TOPIC, 0, 5, t)
wg.Done()
}()
go func() {
consumer(TOPIC, "channel", TIMEOUT, t)
wg.Done()
}()
go func() {
consumer(TOPIC, "channel2", TIMEOUT, t)
wg.Done()
}()
go func() {
consumer(TOPIC, "channel3", TIMEOUT, t)
wg.Done()
}()
wg.Wait()
}
// 测试删除diskQueue 的所有本地文件
func TestClearDiskQueueFiles(t *testing.T) {
diskqueue.Clear()
}