Files
lo/channel_test.go
T
Samuel Berthe 380e1a0ae2 fix(tests): fix flaky time-based tests (#699)
* fix(tests): fix flaky time-based tests

* test: replace time package with a mock
Using a dedicated dependency would have been awesome, but i try to keep this repo with minimal dependencies
2025-10-05 00:34:07 +02:00

438 lines
9.7 KiB
Go

package lo
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestChannelDispatcher(t *testing.T) {
t.Parallel()
testWithTimeout(t, 100*time.Millisecond)
is := assert.New(t)
ch := make(chan int, 10)
ch <- 0
ch <- 1
ch <- 2
ch <- 3
is.Len(ch, 4)
children := ChannelDispatcher(ch, 5, 10, DispatchingStrategyRoundRobin[int])
time.Sleep(10 * time.Millisecond)
// check channels allocation
is.Len(children, 5)
is.Equal(10, cap(children[0]))
is.Equal(10, cap(children[1]))
is.Equal(10, cap(children[2]))
is.Equal(10, cap(children[3]))
is.Equal(10, cap(children[4]))
is.Len(children[0], 1)
is.Len(children[1], 1)
is.Len(children[2], 1)
is.Len(children[3], 1)
is.Empty(children[4])
// check channels content
is.Empty(ch)
msg0, ok0 := <-children[0]
is.True(ok0)
is.Zero(msg0)
msg1, ok1 := <-children[1]
is.True(ok1)
is.Equal(1, msg1)
msg2, ok2 := <-children[2]
is.True(ok2)
is.Equal(2, msg2)
msg3, ok3 := <-children[3]
is.True(ok3)
is.Equal(3, msg3)
// msg4, ok4 := <-children[4]
// is.False(ok4)
// is.Zero(msg4)
// is.Nil(children[4])
// check it is closed
close(ch)
time.Sleep(10 * time.Millisecond)
is.Panics(func() {
ch <- 42
})
msg0, ok0 = <-children[0]
is.False(ok0)
is.Zero(msg0)
msg1, ok1 = <-children[1]
is.False(ok1)
is.Zero(msg1)
msg2, ok2 = <-children[2]
is.False(ok2)
is.Zero(msg2)
msg3, ok3 = <-children[3]
is.False(ok3)
is.Zero(msg3)
msg4, ok4 := <-children[4]
is.False(ok4)
is.Zero(msg4)
// unbuffered channels
children = ChannelDispatcher(ch, 5, 0, DispatchingStrategyRoundRobin[int])
is.Zero(cap(children[0]))
}
func TestDispatchingStrategyRoundRobin(t *testing.T) {
t.Parallel()
testWithTimeout(t, 100*time.Millisecond)
is := assert.New(t)
children := createChannels[int](3, 2)
rochildren := channelsToReadOnly(children)
defer closeChannels(children)
is.Zero(DispatchingStrategyRoundRobin(42, 0, rochildren))
is.Equal(1, DispatchingStrategyRoundRobin(42, 1, rochildren))
is.Equal(2, DispatchingStrategyRoundRobin(42, 2, rochildren))
is.Zero(DispatchingStrategyRoundRobin(42, 3, rochildren))
}
func TestDispatchingStrategyRandom(t *testing.T) {
t.Parallel()
testWithTimeout(t, 100*time.Millisecond)
is := assert.New(t)
children := createChannels[int](2, 2)
rochildren := channelsToReadOnly(children)
defer closeChannels(children)
for i := 0; i < 2; i++ {
children[1] <- i
}
is.Zero(DispatchingStrategyRandom(42, 0, rochildren))
}
func TestDispatchingStrategyWeightedRandom(t *testing.T) {
t.Parallel()
testWithTimeout(t, 100*time.Millisecond)
is := assert.New(t)
children := createChannels[int](2, 2)
rochildren := channelsToReadOnly(children)
defer closeChannels(children)
dispatcher := DispatchingStrategyWeightedRandom[int]([]int{0, 42})
is.Equal(1, dispatcher(42, 0, rochildren))
children[0] <- 0
is.Equal(1, dispatcher(42, 0, rochildren))
children[1] <- 1
is.Equal(1, dispatcher(42, 0, rochildren))
}
func TestDispatchingStrategyFirst(t *testing.T) {
t.Parallel()
testWithTimeout(t, 100*time.Millisecond)
is := assert.New(t)
children := createChannels[int](2, 2)
rochildren := channelsToReadOnly(children)
defer closeChannels(children)
is.Zero(DispatchingStrategyFirst(42, 0, rochildren))
children[0] <- 0
is.Zero(DispatchingStrategyFirst(42, 0, rochildren))
children[0] <- 1
is.Equal(1, DispatchingStrategyFirst(42, 0, rochildren))
}
func TestDispatchingStrategyLeast(t *testing.T) {
t.Parallel()
testWithTimeout(t, 100*time.Millisecond)
is := assert.New(t)
children := createChannels[int](2, 2)
rochildren := channelsToReadOnly(children)
defer closeChannels(children)
is.Zero(DispatchingStrategyLeast(42, 0, rochildren))
children[0] <- 0
is.Equal(1, DispatchingStrategyLeast(42, 0, rochildren))
children[1] <- 0
is.Zero(DispatchingStrategyLeast(42, 0, rochildren))
children[0] <- 1
is.Equal(1, DispatchingStrategyLeast(42, 0, rochildren))
children[1] <- 1
is.Zero(DispatchingStrategyLeast(42, 0, rochildren))
}
func TestDispatchingStrategyMost(t *testing.T) {
t.Parallel()
testWithTimeout(t, 100*time.Millisecond)
is := assert.New(t)
children := createChannels[int](2, 2)
rochildren := channelsToReadOnly(children)
defer closeChannels(children)
is.Zero(DispatchingStrategyMost(42, 0, rochildren))
children[0] <- 0
is.Zero(DispatchingStrategyMost(42, 0, rochildren))
children[1] <- 0
is.Zero(DispatchingStrategyMost(42, 0, rochildren))
children[0] <- 1
is.Zero(DispatchingStrategyMost(42, 0, rochildren))
children[1] <- 1
is.Zero(DispatchingStrategyMost(42, 0, rochildren))
}
func TestSliceToChannel(t *testing.T) {
t.Parallel()
testWithTimeout(t, 100*time.Millisecond)
is := assert.New(t)
ch := SliceToChannel(2, []int{1, 2, 3})
r1, ok1 := <-ch
r2, ok2 := <-ch
r3, ok3 := <-ch
is.True(ok1)
is.Equal(1, r1)
is.True(ok2)
is.Equal(2, r2)
is.True(ok3)
is.Equal(3, r3)
_, ok4 := <-ch
is.False(ok4)
}
func TestChannelToSlice(t *testing.T) {
t.Parallel()
testWithTimeout(t, 100*time.Millisecond)
is := assert.New(t)
ch := SliceToChannel(2, []int{1, 2, 3})
items := ChannelToSlice(ch)
is.Equal([]int{1, 2, 3}, items)
}
func TestGenerate(t *testing.T) {
t.Parallel()
testWithTimeout(t, 100*time.Millisecond)
is := assert.New(t)
generator := func(yield func(int)) {
yield(0)
yield(1)
yield(2)
yield(3)
}
i := 0
for v := range Generator(2, generator) {
is.Equal(i, v)
i++
}
is.Equal(4, i)
}
func TestBuffer(t *testing.T) {
t.Parallel()
testWithTimeout(t, 100*time.Millisecond)
is := assert.New(t)
ch := SliceToChannel(2, []int{1, 2, 3})
items1, length1, _, ok1 := Buffer(ch, 2)
items2, length2, _, ok2 := Buffer(ch, 2)
items3, length3, _, ok3 := Buffer(ch, 2)
is.Equal([]int{1, 2}, items1)
is.Equal(2, length1)
is.True(ok1)
is.Equal([]int{3}, items2)
is.Equal(1, length2)
is.False(ok2)
is.Empty(items3)
is.Zero(length3)
is.False(ok3)
}
func TestBufferWithContext(t *testing.T) { //nolint:paralleltest
// t.Parallel()
testWithTimeout(t, 200*time.Millisecond)
is := assert.New(t)
ch1 := make(chan int, 10)
ctx, cancel := context.WithCancel(context.Background())
go func() {
ch1 <- 0
ch1 <- 1
ch1 <- 2
time.Sleep(5 * time.Millisecond)
cancel()
ch1 <- 3
ch1 <- 4
ch1 <- 5
close(ch1)
}()
items1, length1, _, ok1 := BufferWithContext(ctx, ch1, 20)
is.Equal([]int{0, 1, 2}, items1)
is.Equal(3, length1)
is.True(ok1)
ch2 := make(chan int, 10)
ctx, cancel = context.WithCancel(context.Background())
defer cancel()
defer close(ch2)
for i := 0; i < 10; i++ {
ch2 <- i
}
items2, length2, _, ok2 := BufferWithContext(ctx, ch2, 5)
is.Equal([]int{0, 1, 2, 3, 4}, items2)
is.Equal(5, length2)
is.True(ok2)
}
func TestBufferWithTimeout(t *testing.T) { //nolint:paralleltest
// t.Parallel()
testWithTimeout(t, 2000*time.Millisecond)
is := assert.New(t)
generator := func(n ...int) func(yield func(int)) {
return func(yield func(int)) {
for i := 0; i < len(n); i++ {
yield(n[i])
time.Sleep(100 * time.Millisecond)
}
}
}
ch := Generator(0, generator(0, 1, 2, 3, 4))
items1, length1, duration1, ok1 := BufferWithTimeout(ch, 20, 150*time.Millisecond)
is.Equal([]int{0, 1}, items1)
is.Equal(2, length1)
is.InDelta(150*time.Millisecond, duration1, float64(20*time.Millisecond))
is.True(ok1)
items2, length2, duration2, ok2 := BufferWithTimeout(ch, 20, 10*time.Millisecond)
is.Empty(items2)
is.Zero(length2)
is.InDelta(10*time.Millisecond, duration2, float64(10*time.Millisecond))
is.True(ok2)
items3, length3, duration3, ok3 := BufferWithTimeout(ch, 1, 300*time.Millisecond)
is.Equal([]int{2}, items3)
is.Equal(1, length3)
is.InDelta(50*time.Millisecond, duration3, float64(20*time.Millisecond))
is.True(ok3)
items4, length4, duration4, ok4 := BufferWithTimeout(ch, 2, 250*time.Millisecond)
is.Equal([]int{3, 4}, items4)
is.Equal(2, length4)
is.InDelta(200*time.Millisecond, duration4, float64(50*time.Millisecond))
is.True(ok4)
items5, length5, duration5, ok5 := BufferWithTimeout(ch, 3, 250*time.Millisecond)
is.Empty(items5)
is.Zero(length5)
is.InDelta(100*time.Millisecond, duration5, float64(50*time.Millisecond))
is.False(ok5)
items6, length6, duration6, ok6 := BufferWithTimeout(ch, 3, 250*time.Millisecond)
is.Empty(items6)
is.Zero(length6)
is.InDelta(1*time.Millisecond, duration6, float64(10*time.Millisecond))
is.False(ok6)
}
func TestFanIn(t *testing.T) { //nolint:paralleltest
// t.Parallel()
testWithTimeout(t, 100*time.Millisecond)
is := assert.New(t)
upstreams := createChannels[int](3, 10)
roupstreams := channelsToReadOnly(upstreams)
for i := range roupstreams {
go func(i int) {
upstreams[i] <- 1
upstreams[i] <- 1
close(upstreams[i])
}(i)
}
out := FanIn(10, roupstreams...)
time.Sleep(10 * time.Millisecond)
// check input channels
is.Empty(roupstreams[0])
is.Empty(roupstreams[1])
is.Empty(roupstreams[2])
// check channels allocation
is.Len(out, 6)
is.Equal(10, cap(out))
// check channels content
for i := 0; i < 6; i++ {
msg0, ok0 := <-out
is.True(ok0)
is.Equal(1, msg0)
}
// check it is closed
time.Sleep(10 * time.Millisecond)
msg0, ok0 := <-out
is.False(ok0)
is.Zero(msg0)
}
func TestFanOut(t *testing.T) { //nolint:paralleltest
// t.Parallel()
testWithTimeout(t, 100*time.Millisecond)
is := assert.New(t)
upstream := SliceToChannel(10, []int{0, 1, 2, 3, 4, 5})
rodownstreams := FanOut(3, 10, upstream)
time.Sleep(10 * time.Millisecond)
// check output channels
is.Len(rodownstreams, 3)
// check channels allocation
for i := range rodownstreams {
is.Len(rodownstreams[i], 6)
is.Equal(10, cap(rodownstreams[i]))
is.Equal([]int{0, 1, 2, 3, 4, 5}, ChannelToSlice(rodownstreams[i]))
}
// check it is closed
time.Sleep(10 * time.Millisecond)
// check channels allocation
for i := range rodownstreams {
msg, ok := <-rodownstreams[i]
is.False(ok)
is.Zero(msg)
}
}