From 8e9ffe48083756f1172ce47d9d03d7678f836927 Mon Sep 17 00:00:00 2001 From: Sangbum Kim Date: Wed, 13 Sep 2017 00:33:34 +0900 Subject: [PATCH] =?UTF-8?q?=EB=B0=80=EB=A6=AC=EC=A7=80=20=EC=95=8A?= =?UTF-8?q?=EB=8F=84=EB=A1=9D=20=EB=A1=9C=EC=A7=81=20=EC=A1=B0=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consumer/influx_stat.go | 75 ++++++++++++++++++++++-------------- consumer/speed_controller.go | 70 ++++++++++++++++----------------- processor/processor.go | 9 +++-- 3 files changed, 85 insertions(+), 69 deletions(-) diff --git a/consumer/influx_stat.go b/consumer/influx_stat.go index 5a47463..28fb911 100644 --- a/consumer/influx_stat.go +++ b/consumer/influx_stat.go @@ -81,39 +81,56 @@ func (m *influxMetric) StartLogging() { panic(err) } ticker := time.Tick(time.Second) - pointList := make([]*client.Point, 0, 0) + fanspeedList, tempetureList := make([]processor.FanspeedInfo, 0, 0), make([]processor.TempetureInfo, 0, 0) for { - pointList = pointList[:cap(pointList)] - checker: - for { - select { - case <-ticker: - break checker - case changedSpeed := <-m.fanSpeedConsumer: - if point, err := m.getFanspeedPoint(changedSpeed); err == nil { - pointList = append(pointList, point) - } else { - influxLogger.Debugf("id %d speed err %s", changedSpeed.Id, err) - } - case changedTempeture := <-m.tempetureConsumer: - if point, err := m.getTempeturePoint(changedTempeture); err == nil { - pointList = append(pointList, point) - } else { - influxLogger.Debugf("id %d tempeture err %s", changedTempeture.Id, err) - } - case <-m.handler.Done(): - return - } - } - batchPoint.AddPoints(pointList) + select { + case <-ticker: + fanspeedListTemp, tempetureListTemp := fanspeedList, tempetureList + fanspeedList, tempetureList = make([]processor.FanspeedInfo, 0, 0), make([]processor.TempetureInfo, 0, 0) + go m.sendPoint(influxDbConn, batchPoint, fanspeedListTemp, tempetureListTemp) + case changedSpeed := <-m.fanSpeedConsumer: + fanspeedList = append(fanspeedList, changedSpeed) - if err := influxDbConn.Write(batchPoint); err != nil { - influxLogger.Warn(err) + case changedTempeture := <-m.tempetureConsumer: + tempetureList = append(tempetureList, changedTempeture) + case <-m.handler.Done(): + return } - pointList = make([]*client.Point, 0, 0) } } +func (m *influxMetric) sendPoint( + influxDbConn client.Client, + batchPoint client.BatchPoints, + fanspeeds []processor.FanspeedInfo, tempetures []processor.TempetureInfo) { + pointList := make([]*client.Point, 0, 0) + + for _, fanspeed := range fanspeeds { + if point, err := m.getFanspeedPoint(fanspeed); err == nil { + pointList = append(pointList, point) + } else { + influxLogger.Debugf("id %d speed err %s", fanspeed.Id, err) + } + } + + for _, tempeture := range tempetures { + if point, err := m.getTempeturePoint(tempeture); err == nil { + pointList = append(pointList, point) + } else { + influxLogger.Debugf("id %d tempeture err %s", tempeture.Id, err) + } + } + + if len(pointList) == 0 { + return + } + + batchPoint.AddPoints(pointList) + if err := influxDbConn.Write(batchPoint); err != nil { + influxLogger.Warn(err) + } + +} func (m *influxMetric) getTempeturePoint(info processor.TempetureInfo) (*client.Point, error) { @@ -123,7 +140,7 @@ func (m *influxMetric) getTempeturePoint(info processor.TempetureInfo) (*client. "tempeture": info.Tempeture, } - return client.NewPoint("processor_tempeture", tags, fields, time.Now()) + return client.NewPoint("processor_tempeture", tags, fields, info.At) } func (m *influxMetric) getFanspeedPoint(info processor.FanspeedInfo) (*client.Point, error) { @@ -134,5 +151,5 @@ func (m *influxMetric) getFanspeedPoint(info processor.FanspeedInfo) (*client.Po "noob": info.FanSpeed, } - return client.NewPoint("processor_cooling_fanspeed", tags, fields, time.Now()) + return client.NewPoint("processor_cooling_fanspeed", tags, fields, info.At) } diff --git a/consumer/speed_controller.go b/consumer/speed_controller.go index ce9face..6ac46cb 100644 --- a/consumer/speed_controller.go +++ b/consumer/speed_controller.go @@ -49,46 +49,44 @@ func (c *fanControl) StartControl() { fanspeedLogger.Info("Fan control started") ticker := time.Tick(c.sampleDuration) - pastFanSpeedList := make([]int, c.processorCount) - newFanSpeedList := make([]int, c.processorCount) + pastFanSpeedList, newFanSpeedList := make([]int, c.processorCount), make([]int, c.processorCount) for { - checker: - for { - select { - case <-ticker: - break checker - case changedSpeed := <-c.fanSpeedConsumer: - if changedSpeed.Changed { - newFanSpeedList[changedSpeed.Id] = changedSpeed.FanSpeed - } - case <-c.handler.Done(): - return - } - } - if (!compareFanSpeed(pastFanSpeedList, newFanSpeedList)) { - copy(pastFanSpeedList, newFanSpeedList) - args := make([]string, 0) - args = append(args, "raw", "0x3a", "0x01", ) - for _, item := range newFanSpeedList { - args = append(args, fmt.Sprintf("0x%x", item)) - } - args = append(args, - "0x0", "0x0", "0x0", "0x0", "0x0", "0x0", - ) - cmd := exec.Command("ipmitool", args...) - if err := cmd.Run(); err != nil { - c.handler.NotifyError(err) - return - } - buf := bytes.NewBufferString("") - for _, item := range newFanSpeedList { - buf.WriteString(fmt.Sprintf("0x%x", item)) - buf.WriteRune(' ') - } - fanspeedLogger.Infof("Commit fan speed with %s", buf.String()) + select { + case <-ticker: + go c.applyFanspeed(pastFanSpeedList, newFanSpeedList) + pastFanSpeedList, newFanSpeedList = newFanSpeedList, make([]int, c.processorCount) + case changedSpeed := <-c.fanSpeedConsumer: + newFanSpeedList[changedSpeed.Id] = changedSpeed.FanSpeed + case <-c.handler.Done(): + return } } } +func (c *fanControl) applyFanspeed(pastFanSpeedList, newFanSpeedList []int) { + if !compareFanSpeed(pastFanSpeedList, newFanSpeedList) { + return + } + + args := make([]string, 0) + args = append(args, "raw", "0x3a", "0x01", ) + for _, item := range newFanSpeedList { + args = append(args, fmt.Sprintf("0x%x", item)) + } + args = append(args, + "0x0", "0x0", "0x0", "0x0", "0x0", "0x0", + ) + cmd := exec.Command("ipmitool", args...) + if err := cmd.Run(); err != nil { + c.handler.NotifyError(err) + return + } + buf := bytes.NewBufferString("") + for _, item := range newFanSpeedList { + buf.WriteString(fmt.Sprintf("0x%x", item)) + buf.WriteRune(' ') + } + fanspeedLogger.Infof("Commit fan speed with %s", buf.String()) +} func compareFanSpeed(old, new []int) bool { new = new[:len(old)] // this line is the key diff --git a/processor/processor.go b/processor/processor.go index 407bc56..d5b6051 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -33,13 +33,13 @@ type processor struct { type TempetureInfo struct { Id int Tempeture float64 - Changed bool + At time.Time } type FanspeedInfo struct { Id int FanSpeed int - Changed bool + At time.Time } type Processor interface { @@ -132,7 +132,7 @@ func (p *processor) StartMonitoring() { for { select { - case <-ticker: + case now := <-ticker: var ( highestTemp float64 fanspeed int @@ -147,7 +147,7 @@ func (p *processor) StartMonitoring() { case p.tempetureChanged <- TempetureInfo{ Id: p.id, Tempeture: highestTemp, - Changed: highestTemp != p.tempeture, + At: now, }: default: } @@ -165,6 +165,7 @@ func (p *processor) StartMonitoring() { Id: p.id, FanSpeed: fanspeed, Changed: fanspeed != p.fanSpeed, + At: now, }: default: }