package consumer import ( "fmt" "os/exec" "amuz.es/src/infra/cpu_ctrl/util" "time" "amuz.es/src/infra/cpu_ctrl/processor" "amuz.es/src/infra/cpu_ctrl/logger" "bytes" "log" "github.com/influxdata/influxdb/client/v2" ) var ( influxLogger = logger.NewLogger("consumer") ) type influxMetric struct { host string handler util.Handler fanSpeedConsumer chan processor.FanspeedInfo tempetureConsumer chan processor.TempetureInfo } type InfluxMetric interface { FanSpeedConsumer() chan<- processor.FanspeedInfo TempetureConsumer() chan<- processor.TempetureInfo StartControl() } func NewInfluxMetric(host string, handler util.Handler) InfluxMetric { return &influxMetric{ host: host, handler: handler, fanSpeedConsumer: make(chan processor.FanspeedInfo, 1), tempetureConsumer: make(chan processor.TempetureInfo, 1), } } func (m *influxMetric) FanSpeedConsumer() chan<- processor.FanspeedInfo { return m.fanSpeedConsumer } func (m *influxMetric) TempetureConsumer() chan<- processor.TempetureInfo { return m.tempetureConsumer } func (m *influxMetric) StartControl() { defer m.handler.DecreaseWait() defer func() { if err := recover(); err != nil { m.handler.NotifyError(err.(error)) } }() var influxDbConn client.Client for { conn, err := client.NewUDPClient(client.UDPConfig{Addr: m.host,}) if err != nil { influxLogger.Error("cannot connect influxdb %s", err) } else { influxDbConn = conn break } cont := time.After(3 * time.Second) select { case <-cont: continue case <-m.handler.Done(): return } } // Create a new point batch batchPoint, err := client.NewBatchPoints(client.BatchPointsConfig{ Database: "core", Precision: "s", }) if err != nil { panic(err) } // Create a point and add to batch tags := map[string]string{"cpu": "cpu-total"} fields := map[string]interface{}{ "idle": 10.1, "system": 53.3, "user": 46.6, } pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now()) if err != nil { log.Fatal(err) } batchPoint.AddPoint(pt) // Write the batch if err := influxDbConn.Write(batchPoint); err != nil { log.Fatal(err) } //defer close(c.fanSpeedConsumer) //defer log.Info("Fan control stopped") //log.Info("Fan control started") // //ticker := time.Tick(c.sampleDuration) //pastFanSpeedList := make([]int, c.processorCount) //newFanSpeedList := make([]int, c.processorCount) //for { //checker: // for { // select { // case <-ticker: // break checker // case changedSpeed := <-c.fanSpeedConsumer: // newFanSpeedList[changedSpeed.Id] = changedSpeed.FanSpeed // case <-c.handler.Done(): // return // } // } // if (!compareFanSpeed(pastFanSpeedList, newFanSpeedList)) { // copy(pastFanSpeedList, newFanSpeedList) // args := make([]string, 0) // args = append(args, "raw", "0x3a", "0x01", ) // for _, item := range newFanSpeedList { // args = append(args, fmt.Sprintf("0x%x", item)) // } // args = append(args, // "0x0", "0x0", "0x0", "0x0", "0x0", "0x0", // ) // cmd := exec.Command("ipmitool", args...) // if err := cmd.Run(); err != nil { // c.handler.NotifyError(err) // return // } // buf := bytes.NewBufferString("") // for _, item := range newFanSpeedList { // buf.WriteString(fmt.Sprintf("0x%x", item)) // buf.WriteRune(' ') // } // log.Infof("Commit fan speed with %s", buf.String()) // } //} }