Files
queue/broker_test.go
what 1322280daf fix: 修复无订阅者时消息静默丢失问题,完善测试
- 新增 pending 缓冲区,publish 时若无订阅者则暂存消息
- subscribe 时自动将缓冲消息投入 channel,解决服务重启后恢复任务丢失的问题
- 去除 broadcast 5ms 超时导致的消息丢失
- chan bool 改为 chan struct{},RWMutex 改为 Mutex
- 新增 broker_test.go,12 个单元测试覆盖核心场景(含 -race)
- 为 client_test.go 中的无限循环 demo 添加 t.Skip()
2026-06-02 19:21:47 +08:00

239 lines
4.9 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package queue
import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
)
// recv 从 channel 读取一条消息,超时则返回 nil。
func recv(ch <-chan any, timeout time.Duration) any {
select {
case v := <-ch:
return v
case <-time.After(timeout):
return nil
}
}
// TestPublishSubscribe 基本收发
func TestPublishSubscribe(t *testing.T) {
b := NewBroker()
defer b.close()
ch, err := b.subscribe("job-a")
if err != nil {
t.Fatalf("subscribe: %v", err)
}
if err := b.publish("job-a", "hello"); err != nil {
t.Fatalf("publish: %v", err)
}
got := recv(ch, time.Second)
if got != "hello" {
t.Fatalf("want %q, got %v", "hello", got)
}
}
// TestPendingBuffer 先发布再订阅,消息不能丢失(队列核心保证)
func TestPendingBuffer(t *testing.T) {
b := NewBroker()
defer b.close()
// 先 publish此时无订阅者
for i := 0; i < 3; i++ {
if err := b.publish("job-b", i); err != nil {
t.Fatalf("publish %d: %v", i, err)
}
}
// 再 subscribe应收到全部缓冲消息
ch, _ := b.subscribe("job-b")
for want := 0; want < 3; want++ {
got := recv(ch, time.Second)
if got != want {
t.Fatalf("pending msg[%d]: want %d, got %v", want, want, got)
}
}
}
// TestPendingThenNormal 缓冲消息先于后续消息到达,顺序正确
func TestPendingThenNormal(t *testing.T) {
b := NewBroker()
defer b.close()
b.publish("job-c", "buffered")
ch, _ := b.subscribe("job-c")
b.publish("job-c", "live")
msgs := []any{recv(ch, time.Second), recv(ch, time.Second)}
if msgs[0] != "buffered" || msgs[1] != "live" {
t.Fatalf("order wrong: %v", msgs)
}
}
// TestMultipleSubscribers 同一 topic 多个订阅者都能收到消息
func TestMultipleSubscribers(t *testing.T) {
b := NewBroker()
defer b.close()
ch1, _ := b.subscribe("broadcast")
ch2, _ := b.subscribe("broadcast")
b.publish("broadcast", "msg")
if recv(ch1, time.Second) != "msg" {
t.Fatal("ch1 did not receive message")
}
if recv(ch2, time.Second) != "msg" {
t.Fatal("ch2 did not receive message")
}
}
// TestUnsubscribe 取消订阅后不再收到消息
func TestUnsubscribe(t *testing.T) {
b := NewBroker()
defer b.close()
ch, _ := b.subscribe("job-d")
b.unsubscribe("job-d", ch)
b.publish("job-d", "should not arrive")
if got := recv(ch, 100*time.Millisecond); got != nil {
t.Fatalf("after unsubscribe, still got %v", got)
}
}
// TestClosedPublish 关闭后 publish 返回错误
func TestClosedPublish(t *testing.T) {
b := NewBroker()
b.close()
if err := b.publish("x", "msg"); err == nil {
t.Fatal("expected error after close, got nil")
}
}
// TestClosedSubscribe 关闭后 subscribe 返回错误
func TestClosedSubscribe(t *testing.T) {
b := NewBroker()
b.close()
if _, err := b.subscribe("x"); err == nil {
t.Fatal("expected error after close, got nil")
}
}
// TestConcurrentPublish 并发发布不丢消息,无 data race
func TestConcurrentPublish(t *testing.T) {
b := NewBroker()
defer b.close()
b.setConditions(100)
const n = 50
ch, _ := b.subscribe("concurrent")
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
b.publish("concurrent", i)
}(i)
}
wg.Wait()
var count int32
done := make(chan struct{})
go func() {
for recv(ch, 200*time.Millisecond) != nil {
atomic.AddInt32(&count, 1)
}
close(done)
}()
<-done
if int(count) != n {
t.Fatalf("want %d messages, got %d", n, count)
}
}
// TestSetConditions 容量设置生效channel 满时不丢已缓冲的消息)
func TestSetConditions(t *testing.T) {
b := NewBroker()
defer b.close()
b.setConditions(5)
ch, _ := b.subscribe("cap-test")
for i := 0; i < 5; i++ {
if err := b.publish("cap-test", i); err != nil {
t.Fatalf("publish %d: %v", i, err)
}
}
for want := 0; want < 5; want++ {
got := recv(ch, time.Second)
if got != want {
t.Fatalf("msg[%d]: want %d, got %v", want, want, got)
}
}
}
// TestClientWrapper Client 封装与 Broker 行为一致
func TestClientWrapper(t *testing.T) {
c := NewClient()
defer c.Close()
c.SetConditions(10)
ch, err := c.Subscribe("wrap")
if err != nil {
t.Fatalf("Subscribe: %v", err)
}
c.Publish("wrap", "ok")
got := recv(ch, time.Second)
if got != "ok" {
t.Fatalf("want %q got %v", "ok", got)
}
}
// TestGetPayload GetPayload 在 channel 关闭时返回 nil 而非阻塞
func TestGetPayload(t *testing.T) {
c := NewClient()
ch, _ := c.Subscribe("gp")
c.Publish("gp", "payload")
got := c.GetPayload(ch)
if got != "payload" {
t.Fatalf("want %q got %v", "payload", got)
}
}
// TestPendingBufferMultipleTopics 多个 topic 的缓冲互不干扰
func TestPendingBufferMultipleTopics(t *testing.T) {
b := NewBroker()
defer b.close()
for i := 0; i < 5; i++ {
topic := fmt.Sprintf("topic-%d", i)
b.publish(topic, i*10)
}
for i := 0; i < 5; i++ {
topic := fmt.Sprintf("topic-%d", i)
ch, _ := b.subscribe(topic)
got := recv(ch, time.Second)
if got != i*10 {
t.Fatalf("topic-%d: want %d, got %v", i, i*10, got)
}
}
}