mirror of
https://github.com/samber/lo.git
synced 2026-04-22 23:47:11 +08:00
New helper collection: channel (#95)
* feat(channels): add ToChannel + Generator + Batch + BatchWithTimeout (WIP) * feat: return duration of Batch**** helpers * doc: improve BatchWithTimeout doc * fix(BatchWithTimeout): replace time.After by time.NewTimer in order to prevent memory leak * doc: improve BatchWithTimeout doc
This commit is contained in:
@@ -2,6 +2,15 @@
|
||||
|
||||
@samber: I sometimes forget to update this file. Ping me on [Twitter](https://twitter.com/samuelberthe) or open an issue in case of error. We need to keep a clear changelog for easier lib upgrade.
|
||||
|
||||
## 1.31.0 (2022-10-06)
|
||||
|
||||
Adding:
|
||||
|
||||
- lo.SliceToChannel
|
||||
- lo.Generator
|
||||
- lo.Batch
|
||||
- lo.BatchWithTimeout
|
||||
|
||||
## 1.30.1 (2022-10-06)
|
||||
|
||||
Fix:
|
||||
|
||||
@@ -135,6 +135,10 @@ Supported helpers for tuples:
|
||||
Supported helpers for channels:
|
||||
|
||||
- [ChannelDispatcher](#channeldispatcher)
|
||||
- [SliceToChannel](#slicetochannel)
|
||||
- [Generator](#generator)
|
||||
- [Batch](#batch)
|
||||
- [BatchWithTimeout](#batchwithtimeout)
|
||||
|
||||
Supported intersection helpers:
|
||||
|
||||
@@ -1262,6 +1266,134 @@ children := lo.ChannelDispatcher(ch, 5, 10, customStrategy)
|
||||
...
|
||||
```
|
||||
|
||||
### SliceToChannel
|
||||
|
||||
Returns a read-only channels of collection elements. Channel is closed after last element. Channel capacity can be customized.
|
||||
|
||||
```go
|
||||
list := []int{1, 2, 3, 4, 5}
|
||||
|
||||
for v := range lo.SliceToChannel(2, list) {
|
||||
println(v)
|
||||
}
|
||||
// prints 1, then 2, then 3, then 4, then 5
|
||||
```
|
||||
|
||||
### Generator
|
||||
|
||||
Implements the generator design pattern. Channel is closed after last element. Channel capacity can be customized.
|
||||
|
||||
```go
|
||||
generator := func(yield func(int)) {
|
||||
yield(1)
|
||||
yield(2)
|
||||
yield(3)
|
||||
}
|
||||
|
||||
for v := range lo.Generator(2, generator) {
|
||||
println(v)
|
||||
}
|
||||
// prints 1, then 2, then 3
|
||||
```
|
||||
|
||||
### Batch
|
||||
|
||||
Creates a slice of n elements from a channel. Returns the slice, the slice length, the read time and the channel status (opened/closed).
|
||||
|
||||
```go
|
||||
ch := lo.SliceToChannel(2, []int{1, 2, 3, 4, 5})
|
||||
|
||||
items1, length1, duration1, ok1 := lo.Batch(ch, 3)
|
||||
// []int{1, 2, 3}, 3, 0s, true
|
||||
items2, length2, duration2, ok2 := lo.Batch(ch, 3)
|
||||
// []int{4, 5}, 2, 0s, false
|
||||
```
|
||||
|
||||
Example: RabbitMQ consumer 👇
|
||||
|
||||
```go
|
||||
ch := readFromQueue()
|
||||
|
||||
for {
|
||||
// read 1k items
|
||||
items, length, _, ok := lo.Batch(ch, 1000)
|
||||
|
||||
// do batching stuff
|
||||
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### BatchWithTimeout
|
||||
|
||||
Creates a slice of n elements from a channel, with timeout. Returns the slice, the slice length, the read time and the channel status (opened/closed).
|
||||
|
||||
```go
|
||||
generator := func(yield func(int)) {
|
||||
for i := 0; i < 5; i++ {
|
||||
yield(i)
|
||||
time.Sleep(35*time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
ch := lo.Generator(0, generator)
|
||||
|
||||
items1, length1, duration1, ok1 := lo.BatchWithTimeout(ch, 3, 100*time.Millisecond)
|
||||
// []int{1, 2}, 2, 100ms, true
|
||||
items2, length2, duration2, ok2 := lo.BatchWithTimeout(ch, 3, 100*time.Millisecond)
|
||||
// []int{3, 4, 5}, 3, 75ms, true
|
||||
items3, length3, duration2, ok3 := lo.BatchWithTimeout(ch, 3, 100*time.Millisecond)
|
||||
// []int{}, 0, 10ms, false
|
||||
```
|
||||
|
||||
Example: RabbitMQ consumer 👇
|
||||
|
||||
```go
|
||||
ch := readFromQueue()
|
||||
|
||||
for {
|
||||
// read 1k items
|
||||
// wait up to 1 second
|
||||
items, length, _, ok := lo.BatchWithTimeout(ch, 1000, 1*time.Second)
|
||||
|
||||
// do batching stuff
|
||||
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Example: Multithreaded RabbitMQ consumer 👇
|
||||
|
||||
```go
|
||||
ch := readFromQueue()
|
||||
|
||||
// 5 workers
|
||||
// prefetch 1k messages per worker
|
||||
children := lo.ChannelDispatcher(ch, 5, 1000, DispatchingStrategyFirst[int])
|
||||
|
||||
consumer := func(c <-chan int) {
|
||||
for {
|
||||
// read 1k items
|
||||
// wait up to 1 second
|
||||
items, length, _, ok := lo.BatchWithTimeout(ch, 1000, 1*time.Second)
|
||||
|
||||
// do batching stuff
|
||||
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for i := range children {
|
||||
go consumer(children[i])
|
||||
}
|
||||
```
|
||||
|
||||
### Contains
|
||||
|
||||
Returns true if an element is present in a collection.
|
||||
|
||||
+77
@@ -149,3 +149,80 @@ func DispatchingStrategyMost[T any](msg T, index uint64, channels []<-chan T) in
|
||||
return len(channels[item]) > len(channels[max]) && channelIsNotFull(channels[item])
|
||||
})
|
||||
}
|
||||
|
||||
// SliceToChannel returns a read-only channels of collection elements.
|
||||
func SliceToChannel[T any](bufferSize int, collection []T) <-chan T {
|
||||
ch := make(chan T, bufferSize)
|
||||
|
||||
go func() {
|
||||
for _, item := range collection {
|
||||
ch <- item
|
||||
}
|
||||
|
||||
close(ch)
|
||||
}()
|
||||
|
||||
return ch
|
||||
}
|
||||
|
||||
// Generator implements the generator design pattern.
|
||||
func Generator[T any](bufferSize int, generator func(yield func(T))) <-chan T {
|
||||
ch := make(chan T, bufferSize)
|
||||
|
||||
go func() {
|
||||
// WARNING: infinite loop
|
||||
generator(func(t T) {
|
||||
ch <- t
|
||||
})
|
||||
|
||||
close(ch)
|
||||
}()
|
||||
|
||||
return ch
|
||||
}
|
||||
|
||||
// Batch creates a slice of n elements from a channel. Returns the slice and the slice length.
|
||||
// @TODO: we should probaby provide an helper that reuse the same buffer.
|
||||
func Batch[T any](ch <-chan T, size int) (collection []T, length int, readTime time.Duration, ok bool) {
|
||||
buffer := make([]T, 0, size)
|
||||
index := 0
|
||||
now := time.Now()
|
||||
|
||||
for ; index < size; index++ {
|
||||
item, ok := <-ch
|
||||
if !ok {
|
||||
return buffer, index, time.Since(now), false
|
||||
}
|
||||
|
||||
buffer = append(buffer, item)
|
||||
}
|
||||
|
||||
return buffer, index, time.Since(now), true
|
||||
}
|
||||
|
||||
// BatchWithTimeout creates a slice of n elements from a channel, with timeout. Returns the slice and the slice length.
|
||||
// @TODO: we should probaby provide an helper that reuse the same buffer.
|
||||
func BatchWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (collection []T, length int, readTime time.Duration, ok bool) {
|
||||
expire := time.NewTimer(timeout)
|
||||
defer expire.Stop()
|
||||
|
||||
buffer := make([]T, 0, size)
|
||||
index := 0
|
||||
now := time.Now()
|
||||
|
||||
for ; index < size; index++ {
|
||||
select {
|
||||
case item, ok := <-ch:
|
||||
if !ok {
|
||||
return buffer, index, time.Since(now), false
|
||||
}
|
||||
|
||||
buffer = append(buffer, item)
|
||||
|
||||
case <-expire.C:
|
||||
return buffer, index, time.Since(now), true
|
||||
}
|
||||
}
|
||||
|
||||
return buffer, index, time.Since(now), true
|
||||
}
|
||||
|
||||
+104
@@ -187,3 +187,107 @@ func TestDispatchingStrategyMost(t *testing.T) {
|
||||
children[1] <- 1
|
||||
is.Equal(0, DispatchingStrategyMost(42, 0, rochildren))
|
||||
}
|
||||
|
||||
func TestSliceToChannel(t *testing.T) {
|
||||
t.Parallel()
|
||||
testWithTimeout(t, 10*time.Millisecond)
|
||||
is := assert.New(t)
|
||||
|
||||
ch := SliceToChannel[int](2, []int{1, 2, 3})
|
||||
|
||||
r1, ok1 := <-ch
|
||||
r2, ok2 := <-ch
|
||||
r3, ok3 := <-ch
|
||||
is.True(ok1)
|
||||
is.Equal(1, r1)
|
||||
is.True(ok2)
|
||||
is.Equal(2, r2)
|
||||
is.True(ok3)
|
||||
is.Equal(3, r3)
|
||||
|
||||
_, ok4 := <-ch
|
||||
is.False(ok4)
|
||||
}
|
||||
|
||||
func TestGenerate(t *testing.T) {
|
||||
t.Parallel()
|
||||
testWithTimeout(t, 10*time.Millisecond)
|
||||
is := assert.New(t)
|
||||
|
||||
generator := func(yield func(int)) {
|
||||
yield(0)
|
||||
yield(1)
|
||||
yield(2)
|
||||
yield(3)
|
||||
}
|
||||
|
||||
i := 0
|
||||
|
||||
for v := range Generator(2, generator) {
|
||||
is.Equal(i, v)
|
||||
i++
|
||||
}
|
||||
|
||||
is.Equal(i, 4)
|
||||
}
|
||||
|
||||
func TestBatch(t *testing.T) {
|
||||
t.Parallel()
|
||||
testWithTimeout(t, 10*time.Millisecond)
|
||||
is := assert.New(t)
|
||||
|
||||
ch := SliceToChannel(2, []int{1, 2, 3})
|
||||
|
||||
items1, length1, _, ok1 := Batch(ch, 2)
|
||||
items2, length2, _, ok2 := Batch(ch, 2)
|
||||
items3, length3, _, ok3 := Batch(ch, 2)
|
||||
|
||||
is.Equal([]int{1, 2}, items1)
|
||||
is.Equal(2, length1)
|
||||
is.True(ok1)
|
||||
is.Equal([]int{3}, items2)
|
||||
is.Equal(1, length2)
|
||||
is.False(ok2)
|
||||
is.Equal([]int{}, items3)
|
||||
is.Equal(0, length3)
|
||||
is.False(ok3)
|
||||
}
|
||||
|
||||
func TestBatchWithTimeout(t *testing.T) {
|
||||
t.Parallel()
|
||||
testWithTimeout(t, 200*time.Millisecond)
|
||||
is := assert.New(t)
|
||||
|
||||
generator := func(yield func(int)) {
|
||||
for i := 0; i < 5; i++ {
|
||||
yield(i)
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
ch := Generator(0, generator)
|
||||
|
||||
items1, length1, _, ok1 := BatchWithTimeout(ch, 20, 15*time.Millisecond)
|
||||
is.Equal([]int{0, 1}, items1)
|
||||
is.Equal(2, length1)
|
||||
is.True(ok1)
|
||||
|
||||
items2, length2, _, ok2 := BatchWithTimeout(ch, 20, 2*time.Millisecond)
|
||||
is.Equal([]int{}, items2)
|
||||
is.Equal(0, length2)
|
||||
is.True(ok2)
|
||||
|
||||
items3, length3, _, ok3 := BatchWithTimeout(ch, 1, 30*time.Millisecond)
|
||||
is.Equal([]int{2}, items3)
|
||||
is.Equal(1, length3)
|
||||
is.True(ok3)
|
||||
|
||||
items4, length4, _, ok4 := BatchWithTimeout(ch, 2, 25*time.Millisecond)
|
||||
is.Equal([]int{3, 4}, items4)
|
||||
is.Equal(2, length4)
|
||||
is.True(ok4)
|
||||
|
||||
items5, length5, _, ok5 := BatchWithTimeout(ch, 3, 25*time.Millisecond)
|
||||
is.Equal([]int{}, items5)
|
||||
is.Equal(0, length5)
|
||||
is.False(ok5)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user