diff --git a/consumer/influx_stat.go b/consumer/influx_stat.go index 2d7e1d3..e9c584a 100644 --- a/consumer/influx_stat.go +++ b/consumer/influx_stat.go @@ -6,6 +6,9 @@ import ( "amuz.es/src/infra/cpu_ctrl/processor" "amuz.es/src/infra/cpu_ctrl/logger" //"github.com/influxdata/influxdb/client/v2" + "github.com/influxdata/influxdb/client/v2" + "time" + "log" ) var ( @@ -47,45 +50,67 @@ func (m *influxMetric) StartLogging() { m.handler.NotifyError(err.(error)) } }() - //var influxDbConn client.Client - //for { - // conn, err := client.NewUDPClient(client.UDPConfig{Addr: m.host,}) - // if err != nil { - // influxLogger.Error(err) - // } else { - // influxDbConn = conn - // break - // } - // cont := time.After(3 * time.Second) - // select { - // case <-cont: - // continue - // case <-m.handler.Done(): - // return - // } - //} - // - //// Create a new point batch - //batchPoint, err := client.NewBatchPoints(client.BatchPointsConfig{ - // Database: "core", - // Precision: "s", - //}) - //if err != nil { - // panic(err) - //} defer close(m.fanSpeedConsumer) defer close(m.tempetureConsumer) + defer influxLogger.Info("Metric logging stopped") + influxLogger.Info("Metric logging started") + + var influxDbConn client.Client for { + conn, err := client.NewUDPClient(client.UDPConfig{Addr: m.host,}) + if err != nil { + influxLogger.Error(err) + } else { + influxDbConn = conn + break + } + cont := time.After(3 * time.Second) select { - case changedSpeed := <-m.fanSpeedConsumer: - influxLogger.Debugf("id %d speed %d",changedSpeed.Id,changedSpeed.FanSpeed) - case changedTempeture := <-m.tempetureConsumer: - influxLogger.Debugf("id %d temp %f",changedTempeture.Id,changedTempeture.Tempeture) + case <-cont: + continue case <-m.handler.Done(): return } } + + // Create a new point batch + batchPoint, err := client.NewBatchPoints(client.BatchPointsConfig{ + Database: "core", + Precision: "s", + }) + if err != nil { + panic(err) + } + ticker := time.Tick(time.Second) + pointList := make([]*client.Point, 0) + + for { + pointList = pointList[:cap(pointList)] + checker: + for { + select { + case <-ticker: + break checker + case changedSpeed := <-m.fanSpeedConsumer: + influxLogger.Debugf("id %d speed %d", changedSpeed.Id, changedSpeed.FanSpeed) + case changedTempeture := <-m.tempetureConsumer: + influxLogger.Debugf("id %d temp %f", changedTempeture.Id, changedTempeture.Tempeture) + case <-m.handler.Done(): + return + } + } + batchPoint.AddPoints(pointList) + + if err := influxDbConn.Write(batchPoint); err != nil { + influxLogger.Warn(err) + } + pointList = pointList[:0] + } +} + +func (m *influxMetric) getTempeturePoint(info processor.TempetureInfo) (*client.Point, error) { + //// Create a point and add to batch //tags := map[string]string{"cpu": "cpu-total"} //fields := map[string]interface{}{ @@ -98,10 +123,20 @@ func (m *influxMetric) StartLogging() { //if err != nil { // log.Fatal(err) //} - //batchPoint.AddPoint(pt) +} + +func (m *influxMetric) getFanspeedPoint(info processor.FanspeedInfo) (*client.Point, error) { + + //// Create a point and add to batch + //tags := map[string]string{"cpu": "cpu-total"} + //fields := map[string]interface{}{ + // "idle": 10.1, + // "system": 53.3, + // "user": 46.6, + //} // - //// Write the batch - //if err := influxDbConn.Write(batchPoint); err != nil { + //pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now()) + //if err != nil { // log.Fatal(err) //} }