Files
paralus/pkg/event/queue_test.go
nirav-rafay c66bdc25cd restructure rcloud-base as a single base controller (#37)
* restructure rcloud-base as a single base controller
* updated master.rest
* moved sentry from internal to pkg as it is used by relay
* removing unused rpc and it's dependencies
* Fix usermgmt tests
* Don't redefine variables in rest file
Co-authored-by: Abin Simon <abin.simon@rafay.co>
2022-03-03 17:59:06 +05:30

105 lines
1.7 KiB
Go

package event
import (
"sync"
"sync/atomic"
"testing"
"time"
)
func TestUniqueScopedResourceQueue(t *testing.T) {
stop := make(chan struct{})
numCon := 5
bufSize := 100
in, out := NewUniqueQueue(stop, numCon, bufSize)
var gen1, gen2, gen3 int
var con1, con2, con3 int32
var wg sync.WaitGroup
wg.Add(numCon + 3)
go func(_wg *sync.WaitGroup) {
defer _wg.Done()
genLoop:
for {
select {
case <-stop:
break genLoop
default:
in <- Resource{ID: "rx28oml"}
gen1++
time.Sleep(time.Millisecond * 2)
}
}
//t.Log("exited gen1")
}(&wg)
go func(_wg *sync.WaitGroup) {
defer _wg.Done()
genLoop:
for {
select {
case <-stop:
break genLoop
default:
in <- Resource{ID: "4qkolkn"}
gen2++
time.Sleep(time.Millisecond * 2)
}
}
//t.Log("exited gen2")
}(&wg)
go func(_wg *sync.WaitGroup) {
defer _wg.Done()
genLoop:
for {
select {
case <-stop:
break genLoop
default:
in <- Resource{ID: "7w2lnkp"}
gen3++
time.Sleep(time.Millisecond * 2)
}
}
//t.Log("exited gen3")
}(&wg)
for i := 0; i < numCon; i++ {
go func(idx int, _wg *sync.WaitGroup) {
defer _wg.Done()
conLoop:
for {
select {
case <-stop:
break conLoop
case item := <-out:
switch item.ID {
case "rx28oml":
atomic.AddInt32(&con1, 1)
case "4qkolkn":
atomic.AddInt32(&con2, 1)
case "7w2lnkp":
atomic.AddInt32(&con3, 1)
default:
t.Log("unexpeced id", item.ID)
}
time.Sleep(time.Millisecond * 1)
}
}
//t.Log("exited ", idx)
}(i, &wg)
}
time.Sleep(time.Second * 5)
close(stop)
wg.Wait()
t.Log("gen1", gen1, "con1", con1, "gen2", gen2, "con2", con2, "gen3", gen3, "con3", con3)
}