2017-09-11 02:24:28 +09:00
|
|
|
package consumer
|
|
|
|
|
|
|
|
import (
|
|
|
|
"amuz.es/src/infra/cpu_ctrl/util"
|
|
|
|
"amuz.es/src/infra/cpu_ctrl/processor"
|
|
|
|
"amuz.es/src/infra/cpu_ctrl/logger"
|
2017-09-11 08:39:41 +09:00
|
|
|
"github.com/influxdata/influxdb/client/v2"
|
|
|
|
"time"
|
2017-09-12 21:31:32 +09:00
|
|
|
"strconv"
|
2017-09-11 02:24:28 +09:00
|
|
|
)
|
|
|
|
|
|
|
|
var (
|
2017-09-11 07:44:30 +09:00
|
|
|
influxLogger = logger.NewLogger("influx")
|
2017-09-11 02:24:28 +09:00
|
|
|
)
|
|
|
|
|
|
|
|
type influxMetric struct {
|
|
|
|
host string
|
2017-09-11 07:41:59 +09:00
|
|
|
processorCount int
|
2017-09-11 02:24:28 +09:00
|
|
|
handler util.Handler
|
|
|
|
fanSpeedConsumer chan processor.FanspeedInfo
|
|
|
|
tempetureConsumer chan processor.TempetureInfo
|
|
|
|
}
|
|
|
|
|
|
|
|
type InfluxMetric interface {
|
|
|
|
FanSpeedConsumer() chan<- processor.FanspeedInfo
|
|
|
|
TempetureConsumer() chan<- processor.TempetureInfo
|
2017-09-11 07:41:59 +09:00
|
|
|
StartLogging()
|
2017-09-11 02:24:28 +09:00
|
|
|
}
|
|
|
|
|
2017-09-11 07:41:59 +09:00
|
|
|
func NewInfluxMetric(host string, processorCount int, handler util.Handler) InfluxMetric {
|
2017-09-11 02:24:28 +09:00
|
|
|
return &influxMetric{
|
|
|
|
host: host,
|
2017-09-11 07:41:59 +09:00
|
|
|
processorCount: processorCount,
|
2017-09-11 02:24:28 +09:00
|
|
|
handler: handler,
|
|
|
|
fanSpeedConsumer: make(chan processor.FanspeedInfo, 1),
|
|
|
|
tempetureConsumer: make(chan processor.TempetureInfo, 1),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *influxMetric) FanSpeedConsumer() chan<- processor.FanspeedInfo { return m.fanSpeedConsumer }
|
|
|
|
func (m *influxMetric) TempetureConsumer() chan<- processor.TempetureInfo { return m.tempetureConsumer }
|
|
|
|
|
2017-09-11 07:41:59 +09:00
|
|
|
func (m *influxMetric) StartLogging() {
|
2017-09-11 02:24:28 +09:00
|
|
|
defer m.handler.DecreaseWait()
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
if err := recover(); err != nil {
|
|
|
|
m.handler.NotifyError(err.(error))
|
|
|
|
}
|
|
|
|
}()
|
2017-09-11 07:41:59 +09:00
|
|
|
defer close(m.fanSpeedConsumer)
|
|
|
|
defer close(m.tempetureConsumer)
|
|
|
|
|
2017-09-11 08:39:41 +09:00
|
|
|
defer influxLogger.Info("Metric logging stopped")
|
|
|
|
influxLogger.Info("Metric logging started")
|
|
|
|
|
|
|
|
var influxDbConn client.Client
|
2017-09-11 02:24:28 +09:00
|
|
|
for {
|
2017-09-11 08:39:41 +09:00
|
|
|
conn, err := client.NewUDPClient(client.UDPConfig{Addr: m.host,})
|
|
|
|
if err != nil {
|
|
|
|
influxLogger.Error(err)
|
|
|
|
} else {
|
|
|
|
influxDbConn = conn
|
|
|
|
break
|
|
|
|
}
|
2017-09-12 21:31:32 +09:00
|
|
|
cont := time.After(1 * time.Second)
|
2017-09-11 02:24:28 +09:00
|
|
|
select {
|
2017-09-11 08:39:41 +09:00
|
|
|
case <-cont:
|
|
|
|
continue
|
2017-09-11 02:24:28 +09:00
|
|
|
case <-m.handler.Done():
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
2017-09-11 08:39:41 +09:00
|
|
|
|
|
|
|
// Create a new point batch
|
|
|
|
batchPoint, err := client.NewBatchPoints(client.BatchPointsConfig{
|
|
|
|
Database: "core",
|
|
|
|
Precision: "s",
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
ticker := time.Tick(time.Second)
|
2017-09-13 00:33:34 +09:00
|
|
|
fanspeedList, tempetureList := make([]processor.FanspeedInfo, 0, 0), make([]processor.TempetureInfo, 0, 0)
|
2017-09-11 08:39:41 +09:00
|
|
|
|
|
|
|
for {
|
2017-09-13 00:33:34 +09:00
|
|
|
select {
|
|
|
|
case <-ticker:
|
|
|
|
fanspeedListTemp, tempetureListTemp := fanspeedList, tempetureList
|
|
|
|
fanspeedList, tempetureList = make([]processor.FanspeedInfo, 0, 0), make([]processor.TempetureInfo, 0, 0)
|
|
|
|
go m.sendPoint(influxDbConn, batchPoint, fanspeedListTemp, tempetureListTemp)
|
|
|
|
case changedSpeed := <-m.fanSpeedConsumer:
|
|
|
|
fanspeedList = append(fanspeedList, changedSpeed)
|
|
|
|
|
|
|
|
case changedTempeture := <-m.tempetureConsumer:
|
|
|
|
tempetureList = append(tempetureList, changedTempeture)
|
|
|
|
case <-m.handler.Done():
|
|
|
|
return
|
2017-09-11 08:39:41 +09:00
|
|
|
}
|
2017-09-13 00:33:34 +09:00
|
|
|
}
|
|
|
|
}
|
|
|
|
func (m *influxMetric) sendPoint(
|
|
|
|
influxDbConn client.Client,
|
|
|
|
batchPoint client.BatchPoints,
|
|
|
|
fanspeeds []processor.FanspeedInfo, tempetures []processor.TempetureInfo) {
|
|
|
|
pointList := make([]*client.Point, 0, 0)
|
2017-09-11 08:39:41 +09:00
|
|
|
|
2017-09-13 00:33:34 +09:00
|
|
|
for _, fanspeed := range fanspeeds {
|
|
|
|
if point, err := m.getFanspeedPoint(fanspeed); err == nil {
|
|
|
|
pointList = append(pointList, point)
|
|
|
|
} else {
|
|
|
|
influxLogger.Debugf("id %d speed err %s", fanspeed.Id, err)
|
2017-09-11 08:39:41 +09:00
|
|
|
}
|
|
|
|
}
|
2017-09-13 00:33:34 +09:00
|
|
|
|
|
|
|
for _, tempeture := range tempetures {
|
|
|
|
if point, err := m.getTempeturePoint(tempeture); err == nil {
|
|
|
|
pointList = append(pointList, point)
|
|
|
|
} else {
|
|
|
|
influxLogger.Debugf("id %d tempeture err %s", tempeture.Id, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(pointList) == 0 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
batchPoint.AddPoints(pointList)
|
|
|
|
if err := influxDbConn.Write(batchPoint); err != nil {
|
|
|
|
influxLogger.Warn(err)
|
|
|
|
}
|
|
|
|
|
2017-09-11 08:39:41 +09:00
|
|
|
}
|
|
|
|
|
|
|
|
func (m *influxMetric) getTempeturePoint(info processor.TempetureInfo) (*client.Point, error) {
|
|
|
|
|
2017-09-12 21:31:32 +09:00
|
|
|
// Create a point and add to batch
|
|
|
|
tags := map[string]string{"processor": strconv.Itoa(info.Id)}
|
|
|
|
fields := map[string]interface{}{
|
|
|
|
"tempeture": info.Tempeture,
|
|
|
|
}
|
|
|
|
|
2017-09-13 00:33:34 +09:00
|
|
|
return client.NewPoint("processor_tempeture", tags, fields, info.At)
|
2017-09-11 08:39:41 +09:00
|
|
|
}
|
|
|
|
|
|
|
|
func (m *influxMetric) getFanspeedPoint(info processor.FanspeedInfo) (*client.Point, error) {
|
|
|
|
|
2017-09-12 21:31:32 +09:00
|
|
|
// Create a point and add to batch
|
|
|
|
tags := map[string]string{"processor": strconv.Itoa(info.Id)}
|
|
|
|
fields := map[string]interface{}{
|
|
|
|
"noob": info.FanSpeed,
|
|
|
|
}
|
|
|
|
|
2017-09-13 00:33:34 +09:00
|
|
|
return client.NewPoint("processor_cooling_fanspeed", tags, fields, info.At)
|
2017-09-11 02:24:28 +09:00
|
|
|
}
|