diff --git a/bootstrap.go b/bootstrap.go index b9fd036..d5bb6f3 100644 --- a/bootstrap.go +++ b/bootstrap.go @@ -13,8 +13,8 @@ import ( "amuz.es/src/infra/cpu_ctrl/daemon" "amuz.es/src/infra/cpu_ctrl/producer" "amuz.es/src/infra/cpu_ctrl/consumer" - "errors" "go.uber.org/zap" + "errors" ) func finalCloser() { @@ -30,8 +30,8 @@ func initLogger() func() { formatter := zapcore.NewConsoleEncoder(zlog.LogCommonFormat) level := zap.InfoLevel - if *verbose{ - level=zap.DebugLevel + if *verbose { + level = zap.DebugLevel } // 전역 로거 초기화 var err error @@ -103,6 +103,26 @@ func initContext(handler *handler.Handler) (func(), func()) { // 메인 웹서버 초기화 func initProcessor(handler *handler.Handler) func() { + FanoutOsMetricInfo := func(sender <-chan producer.OSMetricInfo, receivers ...chan<- producer.OSMetricInfo) { + defer func() { + for _, receiver := range receivers { + close(receiver) + } + if err := recover(); err != nil { + handler.NotifyError(err.(error)) + } + }() + for metric := range sender { + for _, receiver := range receivers { + select { + case receiver <- metric: + default: + logger.Warn("Some OSMetricInfo consumer blocked!") + } + } + } + } + FanoutSpeed := func(sender <-chan producer.FanspeedInfo, receivers ...chan<- producer.FanspeedInfo) { defer func() { for _, receiver := range receivers { @@ -149,22 +169,28 @@ func initProcessor(handler *handler.Handler) func() { panic(errors.New("cpu not found!")) } + osMetricInfoChan := producer.NewOsMetric( + handler, + *SampleInterval, + ) tempetureInfoChan, fanspeedChan := producer.AggregateProcessorChannel( handler, *SampleInterval, processorCount, *P, *I, *D, *SetPoint, ) - + simpleLogger := consumer.NewSampleOSLogger(*SampleInterval, handler) fanController := consumer.NewFanControl(processorCount, *SampleInterval, handler) metricLogger := consumer.NewInfluxMetric((*InfluxHost).String(), processorCount, handler) - + handler.IncreaseWait() + go simpleLogger.StartControl() handler.IncreaseWait() go fanController.StartControl() handler.IncreaseWait() go metricLogger.StartLogging() + go FanoutOsMetricInfo(osMetricInfoChan, simpleLogger.Consumer()) go FanoutTempeture(tempetureInfoChan, metricLogger.TempetureConsumer()) go FanoutSpeed(fanspeedChan, fanController.Consumer(), metricLogger.FanSpeedConsumer()) diff --git a/consumer/testLogging.go b/consumer/testLogging.go new file mode 100644 index 0000000..135dc6c --- /dev/null +++ b/consumer/testLogging.go @@ -0,0 +1,68 @@ +package consumer + +import ( + "time" + zlog "amuz.es/src/infra/goutils/logger/zap" + "amuz.es/src/infra/goutils/handler" + "amuz.es/src/infra/cpu_ctrl/producer" + "go.uber.org/zap" + "sync/atomic" + "unsafe" +) + +type sampleOSLogger struct { + handler *handler.Handler + osMetricConsumer chan producer.OSMetricInfo + sampleDuration time.Duration + logger *zap.SugaredLogger +} + +type SampleOSLogger interface { + Consumer() chan<- producer.OSMetricInfo + StartControl() +} + +func NewSampleOSLogger(sampleDuration time.Duration, handler *handler.Handler) SampleOSLogger { + return &sampleOSLogger{ + handler: handler, + osMetricConsumer: make(chan producer.OSMetricInfo, 1), + sampleDuration: sampleDuration, + logger: zlog.New(nil, "sample_os_log"), + } +} + +func (c *sampleOSLogger) Consumer() chan<- producer.OSMetricInfo { return c.osMetricConsumer } + +func (c *sampleOSLogger) StartControl() { + defer c.handler.DecreaseWait() + defer func() { + if err := recover(); err != nil { + c.handler.NotifyError(err.(error)) + } + }() + defer c.logger.Info("os metric sampler stopped") + c.logger.Info("os metric sampler started") + + var ( + ticker = time.Tick(c.sampleDuration) + newOSMetricPtr unsafe.Pointer + ) + go func() { + for sampledOsMetric := range c.osMetricConsumer { + atomic.StorePointer(&newOSMetricPtr, unsafe.Pointer(&sampledOsMetric)) + } + }() + + for { + select { + case <-ticker: + swapped := (*producer.OSMetricInfo)(atomic.SwapPointer(&newOSMetricPtr, nil)) + if swapped == nil { + continue + } + c.logger.Info("CHANGED !! ", (*producer.OSMetricInfo)(swapped)) + case <-c.handler.Done(): + return + } + } +} diff --git a/producer/os.go b/producer/os.go new file mode 100644 index 0000000..9d0ca9f --- /dev/null +++ b/producer/os.go @@ -0,0 +1,114 @@ +package producer + +import ( + "time" + "sync" + zlog "amuz.es/src/infra/goutils/logger/zap" + "amuz.es/src/infra/goutils/handler" + "runtime" + "go.uber.org/zap" + "go.uber.org/multierr" + "context" +) + +type osMetric struct { + handler *handler.Handler + sampleDuration time.Duration + metricChanged chan<- OSMetricInfo + info OSMetricInfo + logger *zap.SugaredLogger +} + +type osMetricApplier func(*OSMetricInfo, chan<- error, *sync.WaitGroup) +type OsMetric interface { + StartMonitoring() +} + +func NewOsMetric( + mainHandler *handler.Handler, + sampleDuration time.Duration, +) (<-chan OSMetricInfo) { + + mainHandler.IncreaseWait() + aggHandler := handler.NewHandler(context.Background()) + + var ( + oin, oout = NewOSMetricQueue() + ) + + go func() { + <-mainHandler.Done() + aggHandler.GracefulWait() + close(oin) + mainHandler.DecreaseWait() + }() + + reader := &osMetric{ + handler: aggHandler, + sampleDuration: sampleDuration, + metricChanged: oin, + logger: zlog.New(nil, "os"), + } + aggHandler.IncreaseWait() + go reader.StartMonitoring() + return oout +} + +func (p *osMetric) StartMonitoring() { + defer func() { + if err := recover(); err != nil { + p.handler.NotifyError(err.(error)) + } + p.handler.DecreaseWait() + }() + + ticker := time.Tick(p.sampleDuration) + defer p.logger.Info("os metric monitor stopped") + p.logger.Infof("os metric monitor started with %s", p.sampleDuration) + + metricRecorders := p.availableMetrics() + + for { + select { + case now := <-ticker: + /* + SELECT mean("tempeture") as tempeture FROM "processor_tempeture" WHERE $timeFilter GROUP BY "processor", time(5s) fill(previous) + */ + + if err := p.applyMetrics(&p.info, metricRecorders...); err != nil { + p.logger.Errorf("reading os metric error occoured!: ", err) + continue + } + + p.info.At = now + + if p.metricChanged != nil { + select { + case p.metricChanged <- p.info: + default: + runtime.Gosched() + } + } + case <-p.handler.Done(): + return + } + } +} +func (p *osMetric) applyMetrics(info *OSMetricInfo, appliers ...osMetricApplier) (err error) { + var ( + osMetricReadWaiter sync.WaitGroup + errorQueue = make(chan error, len(appliers)) + ) + + for _, applier := range appliers { + osMetricReadWaiter.Add(1) + go applier(info, errorQueue, &osMetricReadWaiter) + } + osMetricReadWaiter.Wait() + close(errorQueue) + for newErr := range errorQueue { + err = multierr.Append(err, newErr) + } + + return +} diff --git a/producer/os_darwin.go b/producer/os_darwin.go new file mode 100644 index 0000000..12a56ce --- /dev/null +++ b/producer/os_darwin.go @@ -0,0 +1,204 @@ +package producer + +import ( + "github.com/mackerelio/go-osstat/memory" + "github.com/mackerelio/go-osstat/loadavg" + "github.com/mackerelio/go-osstat/network" + "github.com/mackerelio/go-osstat/uptime" + "github.com/mackerelio/go-osstat/cpu" + "github.com/hako/durafmt" + + "amuz.es/src/infra/goutils/misc" + "time" + "go.uber.org/multierr" + "sync" +) + +type NetIOInfo struct { + Name string + RxBytes, + TxBytes uint64 +} +type MemoryInfo struct { + Total, + Active, + Cached, + Free, + Inactive, + SwapFree, + SwapTotal, + SwapUsed, + Used uint64 +} +type LoadInfo struct { + Avg1, + Avg5, + Avg15 float64 +} +type CPUInfo struct { + Idle, + Nice, + System, + User float64 +} +type OSMetricInfo struct { + Memory MemoryInfo + Load LoadInfo + NetIO map[string]NetIOInfo + + Uptime time.Duration + CPU CPUInfo + + At time.Time +} + +func (p *osMetric) readMemoryStat(info *OSMetricInfo, errChan chan<- error, waiter *sync.WaitGroup) { + memoryInfo, err := memory.Get() + + defer func() { + defer waiter.Done() + if panicErr := recover(); panicErr != nil { + err = multierr.Append(err, panicErr.(error)) + } + if err != nil { + p.logger.Error("unable to retrieve readMemoryStat: ", err) + errChan <- err + } + }() + + p.logger.Debugf("memory total: ", misc.FileSizeIEC(memoryInfo.Total)) + p.logger.Debugf("memory active: ", misc.FileSizeIEC(memoryInfo.Active)) + p.logger.Debugf("memory cached: ", misc.FileSizeIEC(memoryInfo.Cached)) + p.logger.Debugf("memory free: ", misc.FileSizeIEC(memoryInfo.Free)) + p.logger.Debugf("memory inactive: ", misc.FileSizeIEC(memoryInfo.Inactive)) + p.logger.Debugf("memory swapFree: ", misc.FileSizeIEC(memoryInfo.SwapFree)) + p.logger.Debugf("memory swapTotal: ", misc.FileSizeIEC(memoryInfo.SwapTotal)) + p.logger.Debugf("memory swapUsed: ", misc.FileSizeIEC(memoryInfo.SwapUsed)) + p.logger.Debugf("memory used: ", misc.FileSizeIEC(memoryInfo.Used)) + + info.Memory.Total = memoryInfo.Total + info.Memory.Active = memoryInfo.Active + info.Memory.Cached = memoryInfo.Cached + info.Memory.Free = memoryInfo.Free + info.Memory.Inactive = memoryInfo.Inactive + info.Memory.SwapFree = memoryInfo.SwapFree + info.Memory.SwapTotal = memoryInfo.SwapTotal + info.Memory.SwapUsed = memoryInfo.SwapUsed + info.Memory.Used = memoryInfo.Used + return +} + +func (p *osMetric) readLoadStat(info *OSMetricInfo, errChan chan<- error, waiter *sync.WaitGroup) { + load, err := loadavg.Get() + + defer func() { + defer waiter.Done() + if panicErr := recover(); panicErr != nil { + err = multierr.Append(err, panicErr.(error)) + } + if err != nil { + p.logger.Error("unable to retrieve readLoadStat: ", err) + errChan <- err + } + }() + + p.logger.Debugf("load Loadavg1: %f \n", load.Loadavg1) + p.logger.Debugf("load Loadavg5: %f \n", load.Loadavg5) + p.logger.Debugf("load Loadavg15: %f \n", load.Loadavg15) + + info.Load.Avg1 = load.Loadavg1 + info.Load.Avg5 = load.Loadavg5 + info.Load.Avg15 = load.Loadavg15 + return +} + +func (p *osMetric) readNetworkStat(info *OSMetricInfo, errChan chan<- error, waiter *sync.WaitGroup) { + netios, err := network.Get() + + defer func() { + defer waiter.Done() + if panicErr := recover(); panicErr != nil { + err = multierr.Append(err, panicErr.(error)) + } + if err != nil { + p.logger.Error("unable to retrieve readNetworkStat: ", err) + errChan <- err + } + }() + + netIoMap := make(map[string]NetIOInfo) + for _, netio := range netios { + p.logger.Debugf("netio name: ", netio.Name) + p.logger.Debugf("netio rxBytes: ", misc.FileSizeIEC(netio.RxBytes)) + p.logger.Debugf("netio txBytes: ", misc.FileSizeIEC(netio.TxBytes)) + netIoMap[netio.Name] = NetIOInfo{ + Name: netio.Name, + RxBytes: netio.RxBytes, + TxBytes: netio.TxBytes, + } + } + info.NetIO = netIoMap + return +} + +func (p *osMetric) readUptimeStat(info *OSMetricInfo, errChan chan<- error, waiter *sync.WaitGroup) { + ut, err := uptime.Get() + + defer func() { + defer waiter.Done() + if panicErr := recover(); panicErr != nil { + err = multierr.Append(err, panicErr.(error)) + } + if err != nil { + p.logger.Error("unable to retrieve readUptimeStat: ", err) + errChan <- err + } + }() + + p.logger.Debugf("readUptimeStat: %s\n", durafmt.Parse(ut).String()) + + info.Uptime = ut + return +} + +func (p *osMetric) readCpuStat(info *OSMetricInfo, errChan chan<- error, waiter *sync.WaitGroup) { + ct, err := cpu.Get() + + defer func() { + defer waiter.Done() + if panicErr := recover(); panicErr != nil { + err = multierr.Append(err, panicErr.(error)) + } + if err != nil { + p.logger.Error("unable to retrieve readCpuStat: ", err) + errChan <- err + } + }() + + p.logger.Debugf("readCpuStat: idle=%d\n", ct.Idle) + p.logger.Debugf("readCpuStat: nice=%d\n", ct.Nice) + p.logger.Debugf("readCpuStat: system=%d\n", ct.System) + p.logger.Debugf("readCpuStat: total=%d\n", ct.Total) + p.logger.Debugf("readCpuStat: user=%d\n", ct.User) + p.logger.Debugf("readCpuStat: idle=%f%%\n", float64(ct.Idle*100)/float64(ct.Total)) + p.logger.Debugf("readCpuStat: nice=%f%%\n", float64(ct.Nice*100)/float64(ct.Total)) + p.logger.Debugf("readCpuStat: system=%f%%\n", float64(ct.System*100)/float64(ct.Total)) + p.logger.Debugf("readCpuStat: user=%f%%\n", float64(ct.User*100)/float64(ct.Total)) + + info.CPU.Idle = float64(ct.Idle*100) / float64(ct.Total) + info.CPU.Nice = float64(ct.Nice*100) / float64(ct.Total) + info.CPU.System = float64(ct.System*100) / float64(ct.Total) + info.CPU.User = float64(ct.User*100) / float64(ct.Total) + return +} + +func (p *osMetric) availableMetrics() (appliers []osMetricApplier) { + appliers = append(appliers, + p.readLoadStat, + p.readMemoryStat, + p.readCpuStat, + p.readNetworkStat, + p.readUptimeStat, + ) + return +} diff --git a/producer/os_linux.go b/producer/os_linux.go new file mode 100644 index 0000000..6fcca51 --- /dev/null +++ b/producer/os_linux.go @@ -0,0 +1,264 @@ +package main + +import ( + "github.com/mackerelio/go-osstat/memory" + "github.com/mackerelio/go-osstat/loadavg" + "github.com/mackerelio/go-osstat/network" + "github.com/mackerelio/go-osstat/uptime" + "github.com/mackerelio/go-osstat/cpu" + "github.com/mackerelio/go-osstat/disk" + "github.com/hako/durafmt" + + "amuz.es/src/infra/goutils/misc" + "time" + "go.uber.org/multierr" +) + +type NetIOInfo struct { + Name string + RxBytes, + TxBytes uint64 +} + +type DiskIOInfo struct { + Name string + ReadsCompletedBytes, + WritesCompletedBytes uint64 +} +type MemoryInfo struct { + Total, + Used, + Buffers, + Cached, + Free, + Active, + Inactive, + SwapTotal, + SwapUsed, + SwapCached, + SwapFree uint64 +} +type LoadInfo struct { + Avg1, + Avg5, + Avg15 float64 +} + +type CPUInfo struct { + User, + Nice, + System, + Idle, + Iowait, + Irq, + Softirq, + Steal float64 +} +type OSMetricInfo struct { + Memory MemoryInfo + Load LoadInfo + NetIO map[string]NetIOInfo + + Uptime time.Duration + CPU CPUInfo + DiskIO DiskIOInfo + At time.Time +} + +func (p *osMetric) readMemoryStat(info *OSMetricInfo, errChan chan<- error, waiter *sync.WaitGroup) { + memoryInfo, err := memory.Get() + + defer func() { + defer waiter.Done() + if panicErr := recover(); panicErr != nil { + err = multierr.Append(err, panicErr.(error)) + } + if err != nil { + p.logger.Error("unable to retrieve readMemoryStat: ", err) + errChan <- err + } + }() + + p.logger.Debugf("memory total: ", misc.FileSizeIEC(memoryInfo.Total)) + p.logger.Debugf("memory used: ", misc.FileSizeIEC(memoryInfo.Used)) + p.logger.Debugf("memory buffers: ", misc.FileSizeIEC(memoryInfo.Buffers)) + p.logger.Debugf("memory cached: ", misc.FileSizeIEC(memoryInfo.Cached)) + p.logger.Debugf("memory free: ", misc.FileSizeIEC(memoryInfo.Free)) + p.logger.Debugf("memory active: ", misc.FileSizeIEC(memoryInfo.Active)) + p.logger.Debugf("memory inactive: ", misc.FileSizeIEC(memoryInfo.Inactive)) + p.logger.Debugf("memory swapTotal: ", misc.FileSizeIEC(memoryInfo.SwapTotal)) + p.logger.Debugf("memory swapUsed: ", misc.FileSizeIEC(memoryInfo.SwapUsed)) + p.logger.Debugf("memory swapCached: ", misc.FileSizeIEC(memoryInfo.SwapCached)) + p.logger.Debugf("memory swapFree: ", misc.FileSizeIEC(memoryInfo.SwapFree)) + + info.Memory.Total = memoryInfo.Total + info.Memory.Used = memoryInfo.Used + info.Memory.Buffers = memoryInfo.Buffers + info.Memory.Cached = memoryInfo.Cached + info.Memory.Free = memoryInfo.Free + info.Memory.Active = memoryInfo.Active + info.Memory.Inactive = memoryInfo.Inactive + info.Memory.SwapTotal = memoryInfo.SwapTotal + info.Memory.SwapUsed = memoryInfo.SwapUsed + info.Memory.SwapCached = memoryInfo.SwapCached + info.Memory.SwapFree = memoryInfo.SwapFree +} + +func (p *osMetric) readLoadStat(info *OSMetricInfo, errChan chan<- error, waiter *sync.WaitGroup) { + load, err := loadavg.Get() + + defer func() { + defer waiter.Done() + if panicErr := recover(); panicErr != nil { + err = multierr.Append(err, panicErr.(error)) + } + if err != nil { + p.logger.Error("unable to retrieve readLoadStat: ", err) + errChan <- err + } + }() + + p.logger.Debugf("load Loadavg1: %f \n", load.Loadavg1) + p.logger.Debugf("load Loadavg5: %f \n", load.Loadavg5) + p.logger.Debugf("load Loadavg15: %f \n", load.Loadavg15) + + info.Load.Avg1 = load.Loadavg1 + info.Load.Avg5 = load.Loadavg5 + info.Load.Avg15 = load.Loadavg15 + return +} + +func (p *osMetric) readNetworkStat(info *OSMetricInfo, errChan chan<- error, waiter *sync.WaitGroup) { + netios, err := network.Get() + + defer func() { + defer waiter.Done() + if panicErr := recover(); panicErr != nil { + err = multierr.Append(err, panicErr.(error)) + } + if err != nil { + p.logger.Error("unable to retrieve readNetworkStat: ", err) + errChan <- err + } + }() + + netIoMap := make(map[string]NetIOInfo) + for _, netio := range netios { + p.logger.Debugf("netio name: ", netio.Name) + p.logger.Debugf("netio rxBytes: ", misc.FileSizeIEC(netio.RxBytes)) + p.logger.Debugf("netio txBytes: ", misc.FileSizeIEC(netio.TxBytes)) + netIoMap[netio.Name] = NetIOInfo{ + Name: netio.Name, + RxBytes: netio.RxBytes, + TxBytes: netio.TxBytes, + } + } + info.NetIO = netIoMap + return +} + +func (p *osMetric) readUptimeStat(info *OSMetricInfo, errChan chan<- error, waiter *sync.WaitGroup) { + ut, err := uptime.Get() + + defer func() { + defer waiter.Done() + if panicErr := recover(); panicErr != nil { + err = multierr.Append(err, panicErr.(error)) + } + if err != nil { + p.logger.Error("unable to retrieve readUptimeStat: ", err) + errChan <- err + } + }() + + p.logger.Debugf("readUptimeStat: %s\n", durafmt.Parse(ut).String()) + + info.Uptime = ut + return +} + +func (p *osMetric) readCpuStat(info *OSMetricInfo, errChan chan<- error, waiter *sync.WaitGroup) { + ct, err := cpu.Get() + + defer func() { + defer waiter.Done() + if panicErr := recover(); panicErr != nil { + err = multierr.Append(err, panicErr.(error)) + } + if err != nil { + p.logger.Error("unable to retrieve readCpuStat: ", err) + errChan <- err + } + }() + + p.logger.Debugf("cpu user: %d\n", ct.User) + p.logger.Debugf("cpu nice: %d\n", ct.Nice) + p.logger.Debugf("cpu system: %d\n", ct.System) + p.logger.Debugf("cpu idle: %d\n", ct.Idle) + p.logger.Debugf("cpu iowait: %d\n", ct.Iowait) + p.logger.Debugf("cpu irq: %d\n", ct.Irq) + p.logger.Debugf("cpu softirq: %d\n", ct.Softirq) + p.logger.Debugf("cpu steal: %d\n", ct.Steal) + p.logger.Debugf("cpu total: %d\n", ct.Total) + + p.logger.Debugf("cpu user: %f%%\n", float64(ct.User*100)/float64(ct.Total)) + p.logger.Debugf("cpu nice: %f%%\n", float64(ct.Nice*100)/float64(ct.Total)) + p.logger.Debugf("cpu system: %f%%\n", float64(ct.System*100)/float64(ct.Total)) + p.logger.Debugf("cpu idle: %f%%\n", float64(ct.Idle*100)/float64(ct.Total)) + p.logger.Debugf("cpu iowait: %f%%\n", float64(ct.Iowait*100)/float64(ct.Total)) + p.logger.Debugf("cpu irq: %f%%\n", float64(ct.Irq*100)/float64(ct.Total)) + p.logger.Debugf("cpu softirq: %f%%\n", float64(ct.Softirq*100)/float64(ct.Total)) + p.logger.Debugf("cpu steal: %f%%\n", float64(ct.Steal*100)/float64(ct.Total)) + + info.CPU.User = float64(ct.User*100) / float64(ct.Total) + info.CPU.Nice = float64(ct.Nice*100) / float64(ct.Total) + info.CPU.System = float64(ct.System*100) / float64(ct.Total) + info.CPU.Idle = float64(ct.Idle*100) / float64(ct.Total) + info.CPU.Iowait = float64(ct.Iowait*100) / float64(ct.Total) + info.CPU.Irq = float64(ct.Irq*100) / float64(ct.Total) + info.CPU.Softirq = float64(ct.Softirq*100) / float64(ct.Total) + info.CPU.Steal = float64(ct.Steal*100) / float64(ct.Total) + + return +} +func (p *osMetric) readDiskStat(info *OSMetricInfo, errChan chan<- error, waiter *sync.WaitGroup) { + diskinfos, err := disk.Get() + + defer func() { + defer waiter.Done() + if panicErr := recover(); panicErr != nil { + err = multierr.Append(err, panicErr.(error)) + } + if err != nil { + p.logger.Error("unable to retrieve readDiskStat: ", err) + errChan <- err + } + }() + + diskIoMap := make(map[string]DiskIOInfo) + for _, dsk := range diskinfos { + p.logger.Debugf("disk name: ", dsk.Name) + p.logger.Debugf("disk read: ", misc.FileSizeIEC(dsk.ReadsCompleted)) + p.logger.Debugf("disk written: ", misc.FileSizeIEC(dsk.WritesCompleted)) + diskIoMap[dsk.Name] = DiskIOInfo{ + Name: netio.Name, + ReadsCompletedBytes: dsk.ReadsCompleted, + WritesCompletedBytes: dsk.WritesCompleted, + } + } + info.DiskIO = diskIoMap + + return +} + +func (p *osMetric) availableMetrics() (appliers []osMetricApplier) { + appliers = append(appliers, + p.readLoadStat, + p.readMemoryStat, + p.readCpuStat, + p.readNetworkStat, + p.readDiskStat, + p.readUptimeStat, + ) + return +} diff --git a/producer/os_metric_queue.go b/producer/os_metric_queue.go new file mode 100644 index 0000000..0efe306 --- /dev/null +++ b/producer/os_metric_queue.go @@ -0,0 +1,36 @@ +package producer + +import ( + "container/list" +) + +// 블럭되지 않는 큐체널 +func NewOSMetricQueue() (chan<- OSMetricInfo, <-chan OSMetricInfo) { + send := make(chan OSMetricInfo, 1) + receive := make(chan OSMetricInfo, 1) + go manageOSMetricQueue(send, receive) + return send, receive +} + +func manageOSMetricQueue(send <-chan OSMetricInfo, receive chan<- OSMetricInfo) { + queue := list.New() + defer close(receive) + for { + if front := queue.Front(); front == nil { + if value, ok := <-send; ok { + queue.PushBack(value) + } else { + break + } + } else { + select { + case receive <- front.Value.(OSMetricInfo): + queue.Remove(front) + case value, ok := <-send: + if ok { + queue.PushBack(value) + } + } + } + } +}