1
0
Fork 0

os metric 정보 표시

This commit is contained in:
Sangbum Kim 2018-07-09 04:54:24 +09:00
parent 5de0a1a5d9
commit eed7308e59
6 changed files with 717 additions and 5 deletions

View File

@ -13,8 +13,8 @@ import (
"amuz.es/src/infra/cpu_ctrl/daemon"
"amuz.es/src/infra/cpu_ctrl/producer"
"amuz.es/src/infra/cpu_ctrl/consumer"
"errors"
"go.uber.org/zap"
"errors"
)
func finalCloser() {
@ -30,8 +30,8 @@ func initLogger() func() {
formatter := zapcore.NewConsoleEncoder(zlog.LogCommonFormat)
level := zap.InfoLevel
if *verbose{
level=zap.DebugLevel
if *verbose {
level = zap.DebugLevel
}
// 전역 로거 초기화
var err error
@ -103,6 +103,26 @@ func initContext(handler *handler.Handler) (func(), func()) {
// 메인 웹서버 초기화
func initProcessor(handler *handler.Handler) func() {
FanoutOsMetricInfo := func(sender <-chan producer.OSMetricInfo, receivers ...chan<- producer.OSMetricInfo) {
defer func() {
for _, receiver := range receivers {
close(receiver)
}
if err := recover(); err != nil {
handler.NotifyError(err.(error))
}
}()
for metric := range sender {
for _, receiver := range receivers {
select {
case receiver <- metric:
default:
logger.Warn("Some OSMetricInfo consumer blocked!")
}
}
}
}
FanoutSpeed := func(sender <-chan producer.FanspeedInfo, receivers ...chan<- producer.FanspeedInfo) {
defer func() {
for _, receiver := range receivers {
@ -149,22 +169,28 @@ func initProcessor(handler *handler.Handler) func() {
panic(errors.New("cpu not found!"))
}
osMetricInfoChan := producer.NewOsMetric(
handler,
*SampleInterval,
)
tempetureInfoChan, fanspeedChan := producer.AggregateProcessorChannel(
handler,
*SampleInterval, processorCount,
*P, *I, *D,
*SetPoint,
)
simpleLogger := consumer.NewSampleOSLogger(*SampleInterval, handler)
fanController := consumer.NewFanControl(processorCount, *SampleInterval, handler)
metricLogger := consumer.NewInfluxMetric((*InfluxHost).String(), processorCount, handler)
handler.IncreaseWait()
go simpleLogger.StartControl()
handler.IncreaseWait()
go fanController.StartControl()
handler.IncreaseWait()
go metricLogger.StartLogging()
go FanoutOsMetricInfo(osMetricInfoChan, simpleLogger.Consumer())
go FanoutTempeture(tempetureInfoChan, metricLogger.TempetureConsumer())
go FanoutSpeed(fanspeedChan, fanController.Consumer(), metricLogger.FanSpeedConsumer())

68
consumer/testLogging.go Normal file
View File

@ -0,0 +1,68 @@
package consumer
import (
"time"
zlog "amuz.es/src/infra/goutils/logger/zap"
"amuz.es/src/infra/goutils/handler"
"amuz.es/src/infra/cpu_ctrl/producer"
"go.uber.org/zap"
"sync/atomic"
"unsafe"
)
type sampleOSLogger struct {
handler *handler.Handler
osMetricConsumer chan producer.OSMetricInfo
sampleDuration time.Duration
logger *zap.SugaredLogger
}
type SampleOSLogger interface {
Consumer() chan<- producer.OSMetricInfo
StartControl()
}
func NewSampleOSLogger(sampleDuration time.Duration, handler *handler.Handler) SampleOSLogger {
return &sampleOSLogger{
handler: handler,
osMetricConsumer: make(chan producer.OSMetricInfo, 1),
sampleDuration: sampleDuration,
logger: zlog.New(nil, "sample_os_log"),
}
}
func (c *sampleOSLogger) Consumer() chan<- producer.OSMetricInfo { return c.osMetricConsumer }
func (c *sampleOSLogger) StartControl() {
defer c.handler.DecreaseWait()
defer func() {
if err := recover(); err != nil {
c.handler.NotifyError(err.(error))
}
}()
defer c.logger.Info("os metric sampler stopped")
c.logger.Info("os metric sampler started")
var (
ticker = time.Tick(c.sampleDuration)
newOSMetricPtr unsafe.Pointer
)
go func() {
for sampledOsMetric := range c.osMetricConsumer {
atomic.StorePointer(&newOSMetricPtr, unsafe.Pointer(&sampledOsMetric))
}
}()
for {
select {
case <-ticker:
swapped := (*producer.OSMetricInfo)(atomic.SwapPointer(&newOSMetricPtr, nil))
if swapped == nil {
continue
}
c.logger.Info("CHANGED !! ", (*producer.OSMetricInfo)(swapped))
case <-c.handler.Done():
return
}
}
}

114
producer/os.go Normal file
View File

@ -0,0 +1,114 @@
package producer
import (
"time"
"sync"
zlog "amuz.es/src/infra/goutils/logger/zap"
"amuz.es/src/infra/goutils/handler"
"runtime"
"go.uber.org/zap"
"go.uber.org/multierr"
"context"
)
type osMetric struct {
handler *handler.Handler
sampleDuration time.Duration
metricChanged chan<- OSMetricInfo
info OSMetricInfo
logger *zap.SugaredLogger
}
type osMetricApplier func(*OSMetricInfo, chan<- error, *sync.WaitGroup)
type OsMetric interface {
StartMonitoring()
}
func NewOsMetric(
mainHandler *handler.Handler,
sampleDuration time.Duration,
) (<-chan OSMetricInfo) {
mainHandler.IncreaseWait()
aggHandler := handler.NewHandler(context.Background())
var (
oin, oout = NewOSMetricQueue()
)
go func() {
<-mainHandler.Done()
aggHandler.GracefulWait()
close(oin)
mainHandler.DecreaseWait()
}()
reader := &osMetric{
handler: aggHandler,
sampleDuration: sampleDuration,
metricChanged: oin,
logger: zlog.New(nil, "os"),
}
aggHandler.IncreaseWait()
go reader.StartMonitoring()
return oout
}
func (p *osMetric) StartMonitoring() {
defer func() {
if err := recover(); err != nil {
p.handler.NotifyError(err.(error))
}
p.handler.DecreaseWait()
}()
ticker := time.Tick(p.sampleDuration)
defer p.logger.Info("os metric monitor stopped")
p.logger.Infof("os metric monitor started with %s", p.sampleDuration)
metricRecorders := p.availableMetrics()
for {
select {
case now := <-ticker:
/*
SELECT mean("tempeture") as tempeture FROM "processor_tempeture" WHERE $timeFilter GROUP BY "processor", time(5s) fill(previous)
*/
if err := p.applyMetrics(&p.info, metricRecorders...); err != nil {
p.logger.Errorf("reading os metric error occoured!: ", err)
continue
}
p.info.At = now
if p.metricChanged != nil {
select {
case p.metricChanged <- p.info:
default:
runtime.Gosched()
}
}
case <-p.handler.Done():
return
}
}
}
func (p *osMetric) applyMetrics(info *OSMetricInfo, appliers ...osMetricApplier) (err error) {
var (
osMetricReadWaiter sync.WaitGroup
errorQueue = make(chan error, len(appliers))
)
for _, applier := range appliers {
osMetricReadWaiter.Add(1)
go applier(info, errorQueue, &osMetricReadWaiter)
}
osMetricReadWaiter.Wait()
close(errorQueue)
for newErr := range errorQueue {
err = multierr.Append(err, newErr)
}
return
}

204
producer/os_darwin.go Normal file
View File

@ -0,0 +1,204 @@
package producer
import (
"github.com/mackerelio/go-osstat/memory"
"github.com/mackerelio/go-osstat/loadavg"
"github.com/mackerelio/go-osstat/network"
"github.com/mackerelio/go-osstat/uptime"
"github.com/mackerelio/go-osstat/cpu"
"github.com/hako/durafmt"
"amuz.es/src/infra/goutils/misc"
"time"
"go.uber.org/multierr"
"sync"
)
type NetIOInfo struct {
Name string
RxBytes,
TxBytes uint64
}
type MemoryInfo struct {
Total,
Active,
Cached,
Free,
Inactive,
SwapFree,
SwapTotal,
SwapUsed,
Used uint64
}
type LoadInfo struct {
Avg1,
Avg5,
Avg15 float64
}
type CPUInfo struct {
Idle,
Nice,
System,
User float64
}
type OSMetricInfo struct {
Memory MemoryInfo
Load LoadInfo
NetIO map[string]NetIOInfo
Uptime time.Duration
CPU CPUInfo
At time.Time
}
func (p *osMetric) readMemoryStat(info *OSMetricInfo, errChan chan<- error, waiter *sync.WaitGroup) {
memoryInfo, err := memory.Get()
defer func() {
defer waiter.Done()
if panicErr := recover(); panicErr != nil {
err = multierr.Append(err, panicErr.(error))
}
if err != nil {
p.logger.Error("unable to retrieve readMemoryStat: ", err)
errChan <- err
}
}()
p.logger.Debugf("memory total: ", misc.FileSizeIEC(memoryInfo.Total))
p.logger.Debugf("memory active: ", misc.FileSizeIEC(memoryInfo.Active))
p.logger.Debugf("memory cached: ", misc.FileSizeIEC(memoryInfo.Cached))
p.logger.Debugf("memory free: ", misc.FileSizeIEC(memoryInfo.Free))
p.logger.Debugf("memory inactive: ", misc.FileSizeIEC(memoryInfo.Inactive))
p.logger.Debugf("memory swapFree: ", misc.FileSizeIEC(memoryInfo.SwapFree))
p.logger.Debugf("memory swapTotal: ", misc.FileSizeIEC(memoryInfo.SwapTotal))
p.logger.Debugf("memory swapUsed: ", misc.FileSizeIEC(memoryInfo.SwapUsed))
p.logger.Debugf("memory used: ", misc.FileSizeIEC(memoryInfo.Used))
info.Memory.Total = memoryInfo.Total
info.Memory.Active = memoryInfo.Active
info.Memory.Cached = memoryInfo.Cached
info.Memory.Free = memoryInfo.Free
info.Memory.Inactive = memoryInfo.Inactive
info.Memory.SwapFree = memoryInfo.SwapFree
info.Memory.SwapTotal = memoryInfo.SwapTotal
info.Memory.SwapUsed = memoryInfo.SwapUsed
info.Memory.Used = memoryInfo.Used
return
}
func (p *osMetric) readLoadStat(info *OSMetricInfo, errChan chan<- error, waiter *sync.WaitGroup) {
load, err := loadavg.Get()
defer func() {
defer waiter.Done()
if panicErr := recover(); panicErr != nil {
err = multierr.Append(err, panicErr.(error))
}
if err != nil {
p.logger.Error("unable to retrieve readLoadStat: ", err)
errChan <- err
}
}()
p.logger.Debugf("load Loadavg1: %f \n", load.Loadavg1)
p.logger.Debugf("load Loadavg5: %f \n", load.Loadavg5)
p.logger.Debugf("load Loadavg15: %f \n", load.Loadavg15)
info.Load.Avg1 = load.Loadavg1
info.Load.Avg5 = load.Loadavg5
info.Load.Avg15 = load.Loadavg15
return
}
func (p *osMetric) readNetworkStat(info *OSMetricInfo, errChan chan<- error, waiter *sync.WaitGroup) {
netios, err := network.Get()
defer func() {
defer waiter.Done()
if panicErr := recover(); panicErr != nil {
err = multierr.Append(err, panicErr.(error))
}
if err != nil {
p.logger.Error("unable to retrieve readNetworkStat: ", err)
errChan <- err
}
}()
netIoMap := make(map[string]NetIOInfo)
for _, netio := range netios {
p.logger.Debugf("netio name: ", netio.Name)
p.logger.Debugf("netio rxBytes: ", misc.FileSizeIEC(netio.RxBytes))
p.logger.Debugf("netio txBytes: ", misc.FileSizeIEC(netio.TxBytes))
netIoMap[netio.Name] = NetIOInfo{
Name: netio.Name,
RxBytes: netio.RxBytes,
TxBytes: netio.TxBytes,
}
}
info.NetIO = netIoMap
return
}
func (p *osMetric) readUptimeStat(info *OSMetricInfo, errChan chan<- error, waiter *sync.WaitGroup) {
ut, err := uptime.Get()
defer func() {
defer waiter.Done()
if panicErr := recover(); panicErr != nil {
err = multierr.Append(err, panicErr.(error))
}
if err != nil {
p.logger.Error("unable to retrieve readUptimeStat: ", err)
errChan <- err
}
}()
p.logger.Debugf("readUptimeStat: %s\n", durafmt.Parse(ut).String())
info.Uptime = ut
return
}
func (p *osMetric) readCpuStat(info *OSMetricInfo, errChan chan<- error, waiter *sync.WaitGroup) {
ct, err := cpu.Get()
defer func() {
defer waiter.Done()
if panicErr := recover(); panicErr != nil {
err = multierr.Append(err, panicErr.(error))
}
if err != nil {
p.logger.Error("unable to retrieve readCpuStat: ", err)
errChan <- err
}
}()
p.logger.Debugf("readCpuStat: idle=%d\n", ct.Idle)
p.logger.Debugf("readCpuStat: nice=%d\n", ct.Nice)
p.logger.Debugf("readCpuStat: system=%d\n", ct.System)
p.logger.Debugf("readCpuStat: total=%d\n", ct.Total)
p.logger.Debugf("readCpuStat: user=%d\n", ct.User)
p.logger.Debugf("readCpuStat: idle=%f%%\n", float64(ct.Idle*100)/float64(ct.Total))
p.logger.Debugf("readCpuStat: nice=%f%%\n", float64(ct.Nice*100)/float64(ct.Total))
p.logger.Debugf("readCpuStat: system=%f%%\n", float64(ct.System*100)/float64(ct.Total))
p.logger.Debugf("readCpuStat: user=%f%%\n", float64(ct.User*100)/float64(ct.Total))
info.CPU.Idle = float64(ct.Idle*100) / float64(ct.Total)
info.CPU.Nice = float64(ct.Nice*100) / float64(ct.Total)
info.CPU.System = float64(ct.System*100) / float64(ct.Total)
info.CPU.User = float64(ct.User*100) / float64(ct.Total)
return
}
func (p *osMetric) availableMetrics() (appliers []osMetricApplier) {
appliers = append(appliers,
p.readLoadStat,
p.readMemoryStat,
p.readCpuStat,
p.readNetworkStat,
p.readUptimeStat,
)
return
}

264
producer/os_linux.go Normal file
View File

@ -0,0 +1,264 @@
package main
import (
"github.com/mackerelio/go-osstat/memory"
"github.com/mackerelio/go-osstat/loadavg"
"github.com/mackerelio/go-osstat/network"
"github.com/mackerelio/go-osstat/uptime"
"github.com/mackerelio/go-osstat/cpu"
"github.com/mackerelio/go-osstat/disk"
"github.com/hako/durafmt"
"amuz.es/src/infra/goutils/misc"
"time"
"go.uber.org/multierr"
)
type NetIOInfo struct {
Name string
RxBytes,
TxBytes uint64
}
type DiskIOInfo struct {
Name string
ReadsCompletedBytes,
WritesCompletedBytes uint64
}
type MemoryInfo struct {
Total,
Used,
Buffers,
Cached,
Free,
Active,
Inactive,
SwapTotal,
SwapUsed,
SwapCached,
SwapFree uint64
}
type LoadInfo struct {
Avg1,
Avg5,
Avg15 float64
}
type CPUInfo struct {
User,
Nice,
System,
Idle,
Iowait,
Irq,
Softirq,
Steal float64
}
type OSMetricInfo struct {
Memory MemoryInfo
Load LoadInfo
NetIO map[string]NetIOInfo
Uptime time.Duration
CPU CPUInfo
DiskIO DiskIOInfo
At time.Time
}
func (p *osMetric) readMemoryStat(info *OSMetricInfo, errChan chan<- error, waiter *sync.WaitGroup) {
memoryInfo, err := memory.Get()
defer func() {
defer waiter.Done()
if panicErr := recover(); panicErr != nil {
err = multierr.Append(err, panicErr.(error))
}
if err != nil {
p.logger.Error("unable to retrieve readMemoryStat: ", err)
errChan <- err
}
}()
p.logger.Debugf("memory total: ", misc.FileSizeIEC(memoryInfo.Total))
p.logger.Debugf("memory used: ", misc.FileSizeIEC(memoryInfo.Used))
p.logger.Debugf("memory buffers: ", misc.FileSizeIEC(memoryInfo.Buffers))
p.logger.Debugf("memory cached: ", misc.FileSizeIEC(memoryInfo.Cached))
p.logger.Debugf("memory free: ", misc.FileSizeIEC(memoryInfo.Free))
p.logger.Debugf("memory active: ", misc.FileSizeIEC(memoryInfo.Active))
p.logger.Debugf("memory inactive: ", misc.FileSizeIEC(memoryInfo.Inactive))
p.logger.Debugf("memory swapTotal: ", misc.FileSizeIEC(memoryInfo.SwapTotal))
p.logger.Debugf("memory swapUsed: ", misc.FileSizeIEC(memoryInfo.SwapUsed))
p.logger.Debugf("memory swapCached: ", misc.FileSizeIEC(memoryInfo.SwapCached))
p.logger.Debugf("memory swapFree: ", misc.FileSizeIEC(memoryInfo.SwapFree))
info.Memory.Total = memoryInfo.Total
info.Memory.Used = memoryInfo.Used
info.Memory.Buffers = memoryInfo.Buffers
info.Memory.Cached = memoryInfo.Cached
info.Memory.Free = memoryInfo.Free
info.Memory.Active = memoryInfo.Active
info.Memory.Inactive = memoryInfo.Inactive
info.Memory.SwapTotal = memoryInfo.SwapTotal
info.Memory.SwapUsed = memoryInfo.SwapUsed
info.Memory.SwapCached = memoryInfo.SwapCached
info.Memory.SwapFree = memoryInfo.SwapFree
}
func (p *osMetric) readLoadStat(info *OSMetricInfo, errChan chan<- error, waiter *sync.WaitGroup) {
load, err := loadavg.Get()
defer func() {
defer waiter.Done()
if panicErr := recover(); panicErr != nil {
err = multierr.Append(err, panicErr.(error))
}
if err != nil {
p.logger.Error("unable to retrieve readLoadStat: ", err)
errChan <- err
}
}()
p.logger.Debugf("load Loadavg1: %f \n", load.Loadavg1)
p.logger.Debugf("load Loadavg5: %f \n", load.Loadavg5)
p.logger.Debugf("load Loadavg15: %f \n", load.Loadavg15)
info.Load.Avg1 = load.Loadavg1
info.Load.Avg5 = load.Loadavg5
info.Load.Avg15 = load.Loadavg15
return
}
func (p *osMetric) readNetworkStat(info *OSMetricInfo, errChan chan<- error, waiter *sync.WaitGroup) {
netios, err := network.Get()
defer func() {
defer waiter.Done()
if panicErr := recover(); panicErr != nil {
err = multierr.Append(err, panicErr.(error))
}
if err != nil {
p.logger.Error("unable to retrieve readNetworkStat: ", err)
errChan <- err
}
}()
netIoMap := make(map[string]NetIOInfo)
for _, netio := range netios {
p.logger.Debugf("netio name: ", netio.Name)
p.logger.Debugf("netio rxBytes: ", misc.FileSizeIEC(netio.RxBytes))
p.logger.Debugf("netio txBytes: ", misc.FileSizeIEC(netio.TxBytes))
netIoMap[netio.Name] = NetIOInfo{
Name: netio.Name,
RxBytes: netio.RxBytes,
TxBytes: netio.TxBytes,
}
}
info.NetIO = netIoMap
return
}
func (p *osMetric) readUptimeStat(info *OSMetricInfo, errChan chan<- error, waiter *sync.WaitGroup) {
ut, err := uptime.Get()
defer func() {
defer waiter.Done()
if panicErr := recover(); panicErr != nil {
err = multierr.Append(err, panicErr.(error))
}
if err != nil {
p.logger.Error("unable to retrieve readUptimeStat: ", err)
errChan <- err
}
}()
p.logger.Debugf("readUptimeStat: %s\n", durafmt.Parse(ut).String())
info.Uptime = ut
return
}
func (p *osMetric) readCpuStat(info *OSMetricInfo, errChan chan<- error, waiter *sync.WaitGroup) {
ct, err := cpu.Get()
defer func() {
defer waiter.Done()
if panicErr := recover(); panicErr != nil {
err = multierr.Append(err, panicErr.(error))
}
if err != nil {
p.logger.Error("unable to retrieve readCpuStat: ", err)
errChan <- err
}
}()
p.logger.Debugf("cpu user: %d\n", ct.User)
p.logger.Debugf("cpu nice: %d\n", ct.Nice)
p.logger.Debugf("cpu system: %d\n", ct.System)
p.logger.Debugf("cpu idle: %d\n", ct.Idle)
p.logger.Debugf("cpu iowait: %d\n", ct.Iowait)
p.logger.Debugf("cpu irq: %d\n", ct.Irq)
p.logger.Debugf("cpu softirq: %d\n", ct.Softirq)
p.logger.Debugf("cpu steal: %d\n", ct.Steal)
p.logger.Debugf("cpu total: %d\n", ct.Total)
p.logger.Debugf("cpu user: %f%%\n", float64(ct.User*100)/float64(ct.Total))
p.logger.Debugf("cpu nice: %f%%\n", float64(ct.Nice*100)/float64(ct.Total))
p.logger.Debugf("cpu system: %f%%\n", float64(ct.System*100)/float64(ct.Total))
p.logger.Debugf("cpu idle: %f%%\n", float64(ct.Idle*100)/float64(ct.Total))
p.logger.Debugf("cpu iowait: %f%%\n", float64(ct.Iowait*100)/float64(ct.Total))
p.logger.Debugf("cpu irq: %f%%\n", float64(ct.Irq*100)/float64(ct.Total))
p.logger.Debugf("cpu softirq: %f%%\n", float64(ct.Softirq*100)/float64(ct.Total))
p.logger.Debugf("cpu steal: %f%%\n", float64(ct.Steal*100)/float64(ct.Total))
info.CPU.User = float64(ct.User*100) / float64(ct.Total)
info.CPU.Nice = float64(ct.Nice*100) / float64(ct.Total)
info.CPU.System = float64(ct.System*100) / float64(ct.Total)
info.CPU.Idle = float64(ct.Idle*100) / float64(ct.Total)
info.CPU.Iowait = float64(ct.Iowait*100) / float64(ct.Total)
info.CPU.Irq = float64(ct.Irq*100) / float64(ct.Total)
info.CPU.Softirq = float64(ct.Softirq*100) / float64(ct.Total)
info.CPU.Steal = float64(ct.Steal*100) / float64(ct.Total)
return
}
func (p *osMetric) readDiskStat(info *OSMetricInfo, errChan chan<- error, waiter *sync.WaitGroup) {
diskinfos, err := disk.Get()
defer func() {
defer waiter.Done()
if panicErr := recover(); panicErr != nil {
err = multierr.Append(err, panicErr.(error))
}
if err != nil {
p.logger.Error("unable to retrieve readDiskStat: ", err)
errChan <- err
}
}()
diskIoMap := make(map[string]DiskIOInfo)
for _, dsk := range diskinfos {
p.logger.Debugf("disk name: ", dsk.Name)
p.logger.Debugf("disk read: ", misc.FileSizeIEC(dsk.ReadsCompleted))
p.logger.Debugf("disk written: ", misc.FileSizeIEC(dsk.WritesCompleted))
diskIoMap[dsk.Name] = DiskIOInfo{
Name: netio.Name,
ReadsCompletedBytes: dsk.ReadsCompleted,
WritesCompletedBytes: dsk.WritesCompleted,
}
}
info.DiskIO = diskIoMap
return
}
func (p *osMetric) availableMetrics() (appliers []osMetricApplier) {
appliers = append(appliers,
p.readLoadStat,
p.readMemoryStat,
p.readCpuStat,
p.readNetworkStat,
p.readDiskStat,
p.readUptimeStat,
)
return
}

View File

@ -0,0 +1,36 @@
package producer
import (
"container/list"
)
// 블럭되지 않는 큐체널
func NewOSMetricQueue() (chan<- OSMetricInfo, <-chan OSMetricInfo) {
send := make(chan OSMetricInfo, 1)
receive := make(chan OSMetricInfo, 1)
go manageOSMetricQueue(send, receive)
return send, receive
}
func manageOSMetricQueue(send <-chan OSMetricInfo, receive chan<- OSMetricInfo) {
queue := list.New()
defer close(receive)
for {
if front := queue.Front(); front == nil {
if value, ok := <-send; ok {
queue.PushBack(value)
} else {
break
}
} else {
select {
case receive <- front.Value.(OSMetricInfo):
queue.Remove(front)
case value, ok := <-send:
if ok {
queue.PushBack(value)
}
}
}
}
}