diff --git a/consumer/influx_stat.go b/consumer/influx_stat.go index 675dc0f..8895c9f 100644 --- a/consumer/influx_stat.go +++ b/consumer/influx_stat.go @@ -1,15 +1,11 @@ package consumer import ( - "fmt" - "os/exec" "amuz.es/src/infra/cpu_ctrl/util" - "time" + //"time" "amuz.es/src/infra/cpu_ctrl/processor" "amuz.es/src/infra/cpu_ctrl/logger" - "bytes" - "log" - "github.com/influxdata/influxdb/client/v2" + //"github.com/influxdata/influxdb/client/v2" ) var ( @@ -18,6 +14,7 @@ var ( type influxMetric struct { host string + processorCount int handler util.Handler fanSpeedConsumer chan processor.FanspeedInfo tempetureConsumer chan processor.TempetureInfo @@ -26,12 +23,13 @@ type influxMetric struct { type InfluxMetric interface { FanSpeedConsumer() chan<- processor.FanspeedInfo TempetureConsumer() chan<- processor.TempetureInfo - StartControl() + StartLogging() } -func NewInfluxMetric(host string, handler util.Handler) InfluxMetric { +func NewInfluxMetric(host string, processorCount int, handler util.Handler) InfluxMetric { return &influxMetric{ host: host, + processorCount: processorCount, handler: handler, fanSpeedConsumer: make(chan processor.FanspeedInfo, 1), tempetureConsumer: make(chan processor.TempetureInfo, 1), @@ -41,7 +39,7 @@ func NewInfluxMetric(host string, handler util.Handler) InfluxMetric { func (m *influxMetric) FanSpeedConsumer() chan<- processor.FanspeedInfo { return m.fanSpeedConsumer } func (m *influxMetric) TempetureConsumer() chan<- processor.TempetureInfo { return m.tempetureConsumer } -func (m *influxMetric) StartControl() { +func (m *influxMetric) StartLogging() { defer m.handler.DecreaseWait() defer func() { @@ -49,93 +47,61 @@ func (m *influxMetric) StartControl() { m.handler.NotifyError(err.(error)) } }() - var influxDbConn client.Client + //var influxDbConn client.Client + //for { + // conn, err := client.NewUDPClient(client.UDPConfig{Addr: m.host,}) + // if err != nil { + // influxLogger.Error(err) + // } else { + // influxDbConn = conn + // break + // } + // cont := time.After(3 * time.Second) + // select { + // case <-cont: + // continue + // case <-m.handler.Done(): + // return + // } + //} + // + //// Create a new point batch + //batchPoint, err := client.NewBatchPoints(client.BatchPointsConfig{ + // Database: "core", + // Precision: "s", + //}) + //if err != nil { + // panic(err) + //} + defer close(m.fanSpeedConsumer) + defer close(m.tempetureConsumer) + for { - conn, err := client.NewUDPClient(client.UDPConfig{Addr: m.host,}) - if err != nil { - influxLogger.Error("cannot connect influxdb %s", err) - } else { - influxDbConn = conn - break - } - cont := time.After(3 * time.Second) select { - case <-cont: - continue + case changedSpeed := <-m.fanSpeedConsumer: + influxLogger.Debugf("id %d speed %d",changedSpeed.Id,changedSpeed.FanSpeed) + case changedTempeture := <-m.tempetureConsumer: + influxLogger.Debugf("id %d temp %f",changedTempeture.Id,changedTempeture.Tempeture) case <-m.handler.Done(): return } } - - // Create a new point batch - batchPoint, err := client.NewBatchPoints(client.BatchPointsConfig{ - Database: "core", - Precision: "s", - }) - if err != nil { - panic(err) - } - - - // Create a point and add to batch - tags := map[string]string{"cpu": "cpu-total"} - fields := map[string]interface{}{ - "idle": 10.1, - "system": 53.3, - "user": 46.6, - } - - pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now()) - if err != nil { - log.Fatal(err) - } - batchPoint.AddPoint(pt) - - // Write the batch - if err := influxDbConn.Write(batchPoint); err != nil { - log.Fatal(err) - } - - //defer close(c.fanSpeedConsumer) - //defer log.Info("Fan control stopped") - //log.Info("Fan control started") + //// Create a point and add to batch + //tags := map[string]string{"cpu": "cpu-total"} + //fields := map[string]interface{}{ + // "idle": 10.1, + // "system": 53.3, + // "user": 46.6, + //} // - //ticker := time.Tick(c.sampleDuration) - //pastFanSpeedList := make([]int, c.processorCount) - //newFanSpeedList := make([]int, c.processorCount) - //for { - //checker: - // for { - // select { - // case <-ticker: - // break checker - // case changedSpeed := <-c.fanSpeedConsumer: - // newFanSpeedList[changedSpeed.Id] = changedSpeed.FanSpeed - // case <-c.handler.Done(): - // return - // } - // } - // if (!compareFanSpeed(pastFanSpeedList, newFanSpeedList)) { - // copy(pastFanSpeedList, newFanSpeedList) - // args := make([]string, 0) - // args = append(args, "raw", "0x3a", "0x01", ) - // for _, item := range newFanSpeedList { - // args = append(args, fmt.Sprintf("0x%x", item)) - // } - // args = append(args, - // "0x0", "0x0", "0x0", "0x0", "0x0", "0x0", - // ) - // cmd := exec.Command("ipmitool", args...) - // if err := cmd.Run(); err != nil { - // c.handler.NotifyError(err) - // return - // } - // buf := bytes.NewBufferString("") - // for _, item := range newFanSpeedList { - // buf.WriteString(fmt.Sprintf("0x%x", item)) - // buf.WriteRune(' ') - // } - // log.Infof("Commit fan speed with %s", buf.String()) - // } + //pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now()) + //if err != nil { + // log.Fatal(err) + //} + //batchPoint.AddPoint(pt) + // + //// Write the batch + //if err := influxDbConn.Write(batchPoint); err != nil { + // log.Fatal(err) //} } diff --git a/main.go b/main.go index 66dc2e0..19fd466 100644 --- a/main.go +++ b/main.go @@ -56,8 +56,30 @@ func setMaxProcs() { } } +func FanoutSpeed(sender <-chan processor.FanspeedInfo, handler util.Handler, receivers ...chan<- processor.FanspeedInfo) { + defer handler.DecreaseWait() + defer func() { + if err := recover(); err != nil { + handler.NotifyError(err.(error)) + } + }() + for { + select { + case tempeture := <-sender: + for _, receiver := range receivers { + select { + case receiver <- tempeture: + default: + log.Warn("Some Tempeture consumer blocked!") + } + } + case <-handler.Done(): + return + } + } +} + func FanoutTempeture(sender <-chan processor.TempetureInfo, handler util.Handler, receivers ...chan<- processor.TempetureInfo) { - handler.IncreaseWait() defer handler.DecreaseWait() defer func() { if err := recover(); err != nil { @@ -94,14 +116,18 @@ func main() { if processorCount == 0 { handler.NotifyError(errors.New("cpu not found!")) } - - fanController := consumer.NewFanControl(processorCount, sampleDuration, handler) + var ( + tempetureChannel = make(chan processor.TempetureInfo) + fanspeedChannel = make(chan processor.FanspeedInfo) + ) + defer close(tempetureChannel) + defer close(fanspeedChannel) processors = make([]processor.Processor, 0, processorCount) for i := 0; i < processorCount; i++ { if info, err := processor.NewProcessorInfo(handler, i, sampleDuration, *P, *I, *D, *SetPoint, 0x64, 0x4, - nil, fanController.Consumer(), + tempetureChannel, fanspeedChannel, ); err != nil { handler.NotifyError(err) @@ -112,8 +138,17 @@ func main() { } } + fanController := consumer.NewFanControl(processorCount, sampleDuration, handler) + metricLogger := consumer.NewInfluxMetric("", processorCount, handler) + + handler.IncreaseWait() + go FanoutTempeture(tempetureChannel, handler, metricLogger.TempetureConsumer()) + handler.IncreaseWait() + go FanoutSpeed(fanspeedChannel, handler, fanController.Consumer(), metricLogger.FanSpeedConsumer()) handler.IncreaseWait() go fanController.StartControl() + handler.IncreaseWait() + go metricLogger.StartLogging() signal.Notify(exitSignal, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) daemon.NotifyDaemon(daemon.DaemonStarted)