package consumer import ( "amuz.es/src/infra/cpu_ctrl/util" //"time" "amuz.es/src/infra/cpu_ctrl/processor" "amuz.es/src/infra/cpu_ctrl/logger" //"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/influxdb/client/v2" "time" "log" ) var ( influxLogger = logger.NewLogger("influx") ) type influxMetric struct { host string processorCount int handler util.Handler fanSpeedConsumer chan processor.FanspeedInfo tempetureConsumer chan processor.TempetureInfo } type InfluxMetric interface { FanSpeedConsumer() chan<- processor.FanspeedInfo TempetureConsumer() chan<- processor.TempetureInfo StartLogging() } func NewInfluxMetric(host string, processorCount int, handler util.Handler) InfluxMetric { return &influxMetric{ host: host, processorCount: processorCount, handler: handler, fanSpeedConsumer: make(chan processor.FanspeedInfo, 1), tempetureConsumer: make(chan processor.TempetureInfo, 1), } } func (m *influxMetric) FanSpeedConsumer() chan<- processor.FanspeedInfo { return m.fanSpeedConsumer } func (m *influxMetric) TempetureConsumer() chan<- processor.TempetureInfo { return m.tempetureConsumer } func (m *influxMetric) StartLogging() { defer m.handler.DecreaseWait() defer func() { if err := recover(); err != nil { m.handler.NotifyError(err.(error)) } }() defer close(m.fanSpeedConsumer) defer close(m.tempetureConsumer) defer influxLogger.Info("Metric logging stopped") influxLogger.Info("Metric logging started") var influxDbConn client.Client for { conn, err := client.NewUDPClient(client.UDPConfig{Addr: m.host,}) if err != nil { influxLogger.Error(err) } else { influxDbConn = conn break } cont := time.After(3 * time.Second) select { case <-cont: continue case <-m.handler.Done(): return } } // Create a new point batch batchPoint, err := client.NewBatchPoints(client.BatchPointsConfig{ Database: "core", Precision: "s", }) if err != nil { panic(err) } ticker := time.Tick(time.Second) pointList := make([]*client.Point, 0) for { pointList = pointList[:cap(pointList)] checker: for { select { case <-ticker: break checker case changedSpeed := <-m.fanSpeedConsumer: influxLogger.Debugf("id %d speed %d", changedSpeed.Id, changedSpeed.FanSpeed) case changedTempeture := <-m.tempetureConsumer: influxLogger.Debugf("id %d temp %f", changedTempeture.Id, changedTempeture.Tempeture) case <-m.handler.Done(): return } } batchPoint.AddPoints(pointList) if err := influxDbConn.Write(batchPoint); err != nil { influxLogger.Warn(err) } pointList = pointList[:0] } } func (m *influxMetric) getTempeturePoint(info processor.TempetureInfo) (*client.Point, error) { //// Create a point and add to batch //tags := map[string]string{"cpu": "cpu-total"} //fields := map[string]interface{}{ // "idle": 10.1, // "system": 53.3, // "user": 46.6, //} // //pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now()) //if err != nil { // log.Fatal(err) //} } func (m *influxMetric) getFanspeedPoint(info processor.FanspeedInfo) (*client.Point, error) { //// Create a point and add to batch //tags := map[string]string{"cpu": "cpu-total"} //fields := map[string]interface{}{ // "idle": 10.1, // "system": 53.3, // "user": 46.6, //} // //pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now()) //if err != nil { // log.Fatal(err) //} }