From 3160a5f198e8276624fd6355128a71a60184076a Mon Sep 17 00:00:00 2001 From: Sangbum Kim Date: Mon, 11 Sep 2017 02:24:28 +0900 Subject: [PATCH] =?UTF-8?q?consumer=20logger=20=EC=9D=B4=EB=A6=84=20?= =?UTF-8?q?=EB=B0=94=EA=BF=88=20influxdb=20=EC=A4=80=EB=B9=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consumer/influx_stat.go | 141 +++++++++++++++++++++++++++++++++++ consumer/speed_controller.go | 8 +- 2 files changed, 145 insertions(+), 4 deletions(-) create mode 100644 consumer/influx_stat.go diff --git a/consumer/influx_stat.go b/consumer/influx_stat.go new file mode 100644 index 0000000..675dc0f --- /dev/null +++ b/consumer/influx_stat.go @@ -0,0 +1,141 @@ +package consumer + +import ( + "fmt" + "os/exec" + "amuz.es/src/infra/cpu_ctrl/util" + "time" + "amuz.es/src/infra/cpu_ctrl/processor" + "amuz.es/src/infra/cpu_ctrl/logger" + "bytes" + "log" + "github.com/influxdata/influxdb/client/v2" +) + +var ( + influxLogger = logger.NewLogger("consumer") +) + +type influxMetric struct { + host string + handler util.Handler + fanSpeedConsumer chan processor.FanspeedInfo + tempetureConsumer chan processor.TempetureInfo +} + +type InfluxMetric interface { + FanSpeedConsumer() chan<- processor.FanspeedInfo + TempetureConsumer() chan<- processor.TempetureInfo + StartControl() +} + +func NewInfluxMetric(host string, handler util.Handler) InfluxMetric { + return &influxMetric{ + host: host, + handler: handler, + fanSpeedConsumer: make(chan processor.FanspeedInfo, 1), + tempetureConsumer: make(chan processor.TempetureInfo, 1), + } +} + +func (m *influxMetric) FanSpeedConsumer() chan<- processor.FanspeedInfo { return m.fanSpeedConsumer } +func (m *influxMetric) TempetureConsumer() chan<- processor.TempetureInfo { return m.tempetureConsumer } + +func (m *influxMetric) StartControl() { + defer m.handler.DecreaseWait() + + defer func() { + if err := recover(); err != nil { + m.handler.NotifyError(err.(error)) + } + }() + var influxDbConn client.Client + for { + conn, err := client.NewUDPClient(client.UDPConfig{Addr: m.host,}) + if err != nil { + influxLogger.Error("cannot connect influxdb %s", err) + } else { + influxDbConn = conn + break + } + cont := time.After(3 * time.Second) + select { + case <-cont: + continue + case <-m.handler.Done(): + return + } + } + + // Create a new point batch + batchPoint, err := client.NewBatchPoints(client.BatchPointsConfig{ + Database: "core", + Precision: "s", + }) + if err != nil { + panic(err) + } + + + // Create a point and add to batch + tags := map[string]string{"cpu": "cpu-total"} + fields := map[string]interface{}{ + "idle": 10.1, + "system": 53.3, + "user": 46.6, + } + + pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now()) + if err != nil { + log.Fatal(err) + } + batchPoint.AddPoint(pt) + + // Write the batch + if err := influxDbConn.Write(batchPoint); err != nil { + log.Fatal(err) + } + + //defer close(c.fanSpeedConsumer) + //defer log.Info("Fan control stopped") + //log.Info("Fan control started") + // + //ticker := time.Tick(c.sampleDuration) + //pastFanSpeedList := make([]int, c.processorCount) + //newFanSpeedList := make([]int, c.processorCount) + //for { + //checker: + // for { + // select { + // case <-ticker: + // break checker + // case changedSpeed := <-c.fanSpeedConsumer: + // newFanSpeedList[changedSpeed.Id] = changedSpeed.FanSpeed + // case <-c.handler.Done(): + // return + // } + // } + // if (!compareFanSpeed(pastFanSpeedList, newFanSpeedList)) { + // copy(pastFanSpeedList, newFanSpeedList) + // args := make([]string, 0) + // args = append(args, "raw", "0x3a", "0x01", ) + // for _, item := range newFanSpeedList { + // args = append(args, fmt.Sprintf("0x%x", item)) + // } + // args = append(args, + // "0x0", "0x0", "0x0", "0x0", "0x0", "0x0", + // ) + // cmd := exec.Command("ipmitool", args...) + // if err := cmd.Run(); err != nil { + // c.handler.NotifyError(err) + // return + // } + // buf := bytes.NewBufferString("") + // for _, item := range newFanSpeedList { + // buf.WriteString(fmt.Sprintf("0x%x", item)) + // buf.WriteRune(' ') + // } + // log.Infof("Commit fan speed with %s", buf.String()) + // } + //} +} diff --git a/consumer/speed_controller.go b/consumer/speed_controller.go index e452591..0f10c0f 100644 --- a/consumer/speed_controller.go +++ b/consumer/speed_controller.go @@ -11,7 +11,7 @@ import ( ) var ( - log = logger.NewLogger("consumer") + fanspeedLogger = logger.NewLogger("fanspeed") ) type fanControl struct { @@ -45,8 +45,8 @@ func (c *fanControl) StartControl() { } }() defer close(c.fanSpeedConsumer) - defer log.Info("Fan control stopped") - log.Info("Fan control started") + defer fanspeedLogger.Info("Fan control stopped") + fanspeedLogger.Info("Fan control started") ticker := time.Tick(c.sampleDuration) pastFanSpeedList := make([]int, c.processorCount) @@ -83,7 +83,7 @@ func (c *fanControl) StartControl() { buf.WriteString(fmt.Sprintf("0x%x", item)) buf.WriteRune(' ') } - log.Infof("Commit fan speed with %s", buf.String()) + fanspeedLogger.Infof("Commit fan speed with %s", buf.String()) } } }