1
0
Fork 0
cpu_ctrl/consumer/influx_stat.go

139 lines
3.5 KiB
Go
Raw Normal View History

package consumer
import (
"amuz.es/src/infra/cpu_ctrl/util"
"amuz.es/src/infra/cpu_ctrl/processor"
"amuz.es/src/infra/cpu_ctrl/logger"
2017-09-11 08:39:41 +09:00
"github.com/influxdata/influxdb/client/v2"
"time"
2017-09-12 21:31:32 +09:00
"strconv"
)
var (
influxLogger = logger.NewLogger("influx")
)
type influxMetric struct {
host string
2017-09-11 07:41:59 +09:00
processorCount int
handler util.Handler
fanSpeedConsumer chan processor.FanspeedInfo
tempetureConsumer chan processor.TempetureInfo
}
type InfluxMetric interface {
FanSpeedConsumer() chan<- processor.FanspeedInfo
TempetureConsumer() chan<- processor.TempetureInfo
2017-09-11 07:41:59 +09:00
StartLogging()
}
2017-09-11 07:41:59 +09:00
func NewInfluxMetric(host string, processorCount int, handler util.Handler) InfluxMetric {
return &influxMetric{
host: host,
2017-09-11 07:41:59 +09:00
processorCount: processorCount,
handler: handler,
fanSpeedConsumer: make(chan processor.FanspeedInfo, 1),
tempetureConsumer: make(chan processor.TempetureInfo, 1),
}
}
func (m *influxMetric) FanSpeedConsumer() chan<- processor.FanspeedInfo { return m.fanSpeedConsumer }
func (m *influxMetric) TempetureConsumer() chan<- processor.TempetureInfo { return m.tempetureConsumer }
2017-09-11 07:41:59 +09:00
func (m *influxMetric) StartLogging() {
defer m.handler.DecreaseWait()
defer func() {
if err := recover(); err != nil {
m.handler.NotifyError(err.(error))
}
}()
2017-09-11 07:41:59 +09:00
defer close(m.fanSpeedConsumer)
defer close(m.tempetureConsumer)
2017-09-11 08:39:41 +09:00
defer influxLogger.Info("Metric logging stopped")
influxLogger.Info("Metric logging started")
var influxDbConn client.Client
for {
2017-09-11 08:39:41 +09:00
conn, err := client.NewUDPClient(client.UDPConfig{Addr: m.host,})
if err != nil {
influxLogger.Error(err)
} else {
influxDbConn = conn
break
}
2017-09-12 21:31:32 +09:00
cont := time.After(1 * time.Second)
select {
2017-09-11 08:39:41 +09:00
case <-cont:
continue
case <-m.handler.Done():
return
}
}
2017-09-11 08:39:41 +09:00
// Create a new point batch
batchPoint, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: "core",
Precision: "s",
})
if err != nil {
panic(err)
}
ticker := time.Tick(time.Second)
2017-09-12 21:49:48 +09:00
pointList := make([]*client.Point, 0, 0)
2017-09-11 08:39:41 +09:00
for {
pointList = pointList[:cap(pointList)]
checker:
for {
select {
case <-ticker:
break checker
case changedSpeed := <-m.fanSpeedConsumer:
2017-09-12 21:39:08 +09:00
if point, err := m.getFanspeedPoint(changedSpeed); err == nil {
2017-09-12 21:31:32 +09:00
pointList = append(pointList, point)
} else {
influxLogger.Debugf("id %d speed err %s", changedSpeed.Id, err)
}
2017-09-11 08:39:41 +09:00
case changedTempeture := <-m.tempetureConsumer:
2017-09-12 21:39:08 +09:00
if point, err := m.getTempeturePoint(changedTempeture); err == nil {
2017-09-12 21:31:32 +09:00
pointList = append(pointList, point)
} else {
influxLogger.Debugf("id %d tempeture err %s", changedTempeture.Id, err)
}
2017-09-11 08:39:41 +09:00
case <-m.handler.Done():
return
}
}
batchPoint.AddPoints(pointList)
if err := influxDbConn.Write(batchPoint); err != nil {
influxLogger.Warn(err)
}
2017-09-12 21:49:48 +09:00
pointList = make([]*client.Point, 0, 0)
2017-09-11 08:39:41 +09:00
}
}
func (m *influxMetric) getTempeturePoint(info processor.TempetureInfo) (*client.Point, error) {
2017-09-12 21:31:32 +09:00
// Create a point and add to batch
tags := map[string]string{"processor": strconv.Itoa(info.Id)}
fields := map[string]interface{}{
"tempeture": info.Tempeture,
}
return client.NewPoint("processor_tempeture", tags, fields, time.Now())
2017-09-11 08:39:41 +09:00
}
func (m *influxMetric) getFanspeedPoint(info processor.FanspeedInfo) (*client.Point, error) {
2017-09-12 21:31:32 +09:00
// Create a point and add to batch
tags := map[string]string{"processor": strconv.Itoa(info.Id)}
fields := map[string]interface{}{
"noob": info.FanSpeed,
}
return client.NewPoint("processor_cooling_fanspeed", tags, fields, time.Now())
}