package consumer import ( "amuz.es/src/infra/cpu_ctrl/util" "amuz.es/src/infra/cpu_ctrl/processor" "amuz.es/src/infra/cpu_ctrl/logger" "github.com/influxdata/influxdb/client/v2" "time" "strconv" ) var ( influxLogger = logger.NewLogger("influx") ) type influxMetric struct { host string processorCount int handler util.Handler fanSpeedConsumer chan processor.FanspeedInfo tempetureConsumer chan processor.TempetureInfo } type InfluxMetric interface { FanSpeedConsumer() chan<- processor.FanspeedInfo TempetureConsumer() chan<- processor.TempetureInfo StartLogging() } func NewInfluxMetric(host string, processorCount int, handler util.Handler) InfluxMetric { return &influxMetric{ host: host, processorCount: processorCount, handler: handler, fanSpeedConsumer: make(chan processor.FanspeedInfo, 1), tempetureConsumer: make(chan processor.TempetureInfo, 1), } } func (m *influxMetric) FanSpeedConsumer() chan<- processor.FanspeedInfo { return m.fanSpeedConsumer } func (m *influxMetric) TempetureConsumer() chan<- processor.TempetureInfo { return m.tempetureConsumer } func (m *influxMetric) StartLogging() { defer m.handler.DecreaseWait() defer func() { if err := recover(); err != nil { m.handler.NotifyError(err.(error)) } }() defer close(m.fanSpeedConsumer) defer close(m.tempetureConsumer) defer influxLogger.Info("Metric logging stopped") influxLogger.Info("Metric logging started") var influxDbConn client.Client for { conn, err := client.NewUDPClient(client.UDPConfig{Addr: m.host,}) if err != nil { influxLogger.Error(err) } else { influxDbConn = conn break } cont := time.After(1 * time.Second) select { case <-cont: continue case <-m.handler.Done(): return } } // Create a new point batch batchPoint, err := client.NewBatchPoints(client.BatchPointsConfig{ Database: "core", Precision: "s", }) if err != nil { panic(err) } ticker := time.Tick(time.Second) pointList := make([]*client.Point, 0) for { pointList = pointList[:cap(pointList)] checker: for { select { case <-ticker: break checker case changedSpeed := <-m.fanSpeedConsumer: if point, err := m.getFanspeedPoint(changedSpeed); err == nil { pointList = append(pointList, point) } else { influxLogger.Debugf("id %d speed err %s", changedSpeed.Id, err) } case changedTempeture := <-m.tempetureConsumer: if point, err := m.getTempeturePoint(changedTempeture); err == nil { pointList = append(pointList, point) } else { influxLogger.Debugf("id %d tempeture err %s", changedTempeture.Id, err) } case <-m.handler.Done(): return } } batchPoint.AddPoints(pointList) if err := influxDbConn.Write(batchPoint); err != nil { influxLogger.Warn(err) } pointList = pointList[:0] } } func (m *influxMetric) getTempeturePoint(info processor.TempetureInfo) (*client.Point, error) { // Create a point and add to batch tags := map[string]string{"processor": strconv.Itoa(info.Id)} fields := map[string]interface{}{ "tempeture": info.Tempeture, } return client.NewPoint("processor_tempeture", tags, fields, time.Now()) } func (m *influxMetric) getFanspeedPoint(info processor.FanspeedInfo) (*client.Point, error) { // Create a point and add to batch tags := map[string]string{"processor": strconv.Itoa(info.Id)} fields := map[string]interface{}{ "noob": info.FanSpeed, } return client.NewPoint("processor_cooling_fanspeed", tags, fields, time.Now()) }