diff --git a/bootstrap.go b/bootstrap.go index 7a6b551..80478da 100644 --- a/bootstrap.go +++ b/bootstrap.go @@ -14,6 +14,7 @@ import ( "amuz.es/src/infra/cpu_ctrl/producer" "amuz.es/src/infra/cpu_ctrl/consumer" "go.uber.org/zap" + "errors" ) func finalCloser() { @@ -122,77 +123,78 @@ func initProcessor(handler *handler.Handler) func() { } } - //FanoutSpeed := func(sender <-chan producer.FanspeedInfo, receivers ...chan<- producer.FanspeedInfo) { - // defer func() { - // for _, receiver := range receivers { - // close(receiver) - // } - // if err := recover(); err != nil { - // handler.NotifyError(err.(error)) - // } - // }() - // for speed := range sender { - // for _, receiver := range receivers { - // select { - // case receiver <- speed: - // default: - // logger.Warn("Some Fanspeed consumer blocked!") - // } - // } - // } - //} - // - //FanoutTempeture := func(sender <-chan producer.TempetureInfo, receivers ...chan<- producer.TempetureInfo) { - // defer func() { - // - // for _, receiver := range receivers { - // close(receiver) - // } - // if err := recover(); err != nil { - // handler.NotifyError(err.(error)) - // } - // }() - // for tempeture := range sender { - // for _, receiver := range receivers { - // select { - // case receiver <- tempeture: - // default: - // logger.Warn("Some Tempeture consumer blocked!") - // } - // } - // } - //} - // - //processorCount := producer.GetProcessorCount() - //if processorCount == 0 { - // panic(errors.New("cpu not found!")) - //} + FanoutSpeed := func(sender <-chan producer.FanspeedInfo, receivers ...chan<- producer.FanspeedInfo) { + defer func() { + for _, receiver := range receivers { + close(receiver) + } + if err := recover(); err != nil { + handler.NotifyError(err.(error)) + } + }() + for speed := range sender { + for _, receiver := range receivers { + select { + case receiver <- speed: + default: + logger.Warn("Some Fanspeed consumer blocked!") + } + } + } + } + + FanoutTempeture := func(sender <-chan producer.TempetureInfo, receivers ...chan<- producer.TempetureInfo) { + defer func() { + + for _, receiver := range receivers { + close(receiver) + } + if err := recover(); err != nil { + handler.NotifyError(err.(error)) + } + }() + for tempeture := range sender { + for _, receiver := range receivers { + select { + case receiver <- tempeture: + default: + logger.Warn("Some Tempeture consumer blocked!") + } + } + } + } + + processorCount := producer.GetProcessorCount() + if processorCount == 0 { + panic(errors.New("cpu not found!")) + } osMetricInfoChan := producer.NewOsMetric( handler, *SampleInterval, ) - //tempetureInfoChan, fanspeedChan := producer.AggregateProcessorChannel( - // handler, - // *SampleInterval, processorCount, - // *P, *I, *D, - // *SetPoint, - //) - simpleLogger := consumer.NewSampleOSLogger(*SampleInterval, handler) - //fanController := consumer.NewFanControl(processorCount, *SampleInterval, handler) - //metricLogger := consumer.NewInfluxMetric((*InfluxHost).String(), processorCount, handler) + tempetureInfoChan, fanspeedChan := producer.AggregateProcessorChannel( + handler, + *SampleInterval, processorCount, + *P, *I, *D, + *SetPoint, + ) + //simpleLogger := consumer.NewSampleOSLogger(*SampleInterval, handler) + fanController := consumer.NewFanControl(processorCount, *SampleInterval, handler) + metricLogger := consumer.NewInfluxMetric((*InfluxHost).String(), processorCount, handler) + //handler.IncreaseWait() + //go simpleLogger.StartControl() handler.IncreaseWait() - go simpleLogger.StartControl() - //handler.IncreaseWait() - //go fanController.StartControl() - //handler.IncreaseWait() - //go metricLogger.StartLogging() + go fanController.StartControl() + handler.IncreaseWait() + go metricLogger.StartLogging() - go FanoutOsMetricInfo(osMetricInfoChan, simpleLogger.Consumer()) + //go FanoutOsMetricInfo(osMetricInfoChan, simpleLogger.Consumer()) //go FanoutOsMetricInfo(osMetricInfoChan, simpleLogger.Consumer(), metricLogger.OsMetricConsumer()) - //go FanoutTempeture(tempetureInfoChan, metricLogger.TempetureConsumer()) - //go FanoutSpeed(fanspeedChan, fanController.Consumer(), metricLogger.FanSpeedConsumer()) + go FanoutOsMetricInfo(osMetricInfoChan, metricLogger.OsMetricConsumer()) + go FanoutTempeture(tempetureInfoChan, metricLogger.TempetureConsumer()) + go FanoutSpeed(fanspeedChan, fanController.Consumer(), metricLogger.FanSpeedConsumer()) return func() {} }