1
0
Fork 0

broadcaster 구현

This commit is contained in:
Sangbum Kim 2017-09-11 07:41:59 +09:00
parent 3160a5f198
commit c784010593
2 changed files with 95 additions and 94 deletions

View File

@ -1,15 +1,11 @@
package consumer package consumer
import ( import (
"fmt"
"os/exec"
"amuz.es/src/infra/cpu_ctrl/util" "amuz.es/src/infra/cpu_ctrl/util"
"time" //"time"
"amuz.es/src/infra/cpu_ctrl/processor" "amuz.es/src/infra/cpu_ctrl/processor"
"amuz.es/src/infra/cpu_ctrl/logger" "amuz.es/src/infra/cpu_ctrl/logger"
"bytes" //"github.com/influxdata/influxdb/client/v2"
"log"
"github.com/influxdata/influxdb/client/v2"
) )
var ( var (
@ -18,6 +14,7 @@ var (
type influxMetric struct { type influxMetric struct {
host string host string
processorCount int
handler util.Handler handler util.Handler
fanSpeedConsumer chan processor.FanspeedInfo fanSpeedConsumer chan processor.FanspeedInfo
tempetureConsumer chan processor.TempetureInfo tempetureConsumer chan processor.TempetureInfo
@ -26,12 +23,13 @@ type influxMetric struct {
type InfluxMetric interface { type InfluxMetric interface {
FanSpeedConsumer() chan<- processor.FanspeedInfo FanSpeedConsumer() chan<- processor.FanspeedInfo
TempetureConsumer() chan<- processor.TempetureInfo TempetureConsumer() chan<- processor.TempetureInfo
StartControl() StartLogging()
} }
func NewInfluxMetric(host string, handler util.Handler) InfluxMetric { func NewInfluxMetric(host string, processorCount int, handler util.Handler) InfluxMetric {
return &influxMetric{ return &influxMetric{
host: host, host: host,
processorCount: processorCount,
handler: handler, handler: handler,
fanSpeedConsumer: make(chan processor.FanspeedInfo, 1), fanSpeedConsumer: make(chan processor.FanspeedInfo, 1),
tempetureConsumer: make(chan processor.TempetureInfo, 1), tempetureConsumer: make(chan processor.TempetureInfo, 1),
@ -41,7 +39,7 @@ func NewInfluxMetric(host string, handler util.Handler) InfluxMetric {
func (m *influxMetric) FanSpeedConsumer() chan<- processor.FanspeedInfo { return m.fanSpeedConsumer } func (m *influxMetric) FanSpeedConsumer() chan<- processor.FanspeedInfo { return m.fanSpeedConsumer }
func (m *influxMetric) TempetureConsumer() chan<- processor.TempetureInfo { return m.tempetureConsumer } func (m *influxMetric) TempetureConsumer() chan<- processor.TempetureInfo { return m.tempetureConsumer }
func (m *influxMetric) StartControl() { func (m *influxMetric) StartLogging() {
defer m.handler.DecreaseWait() defer m.handler.DecreaseWait()
defer func() { defer func() {
@ -49,93 +47,61 @@ func (m *influxMetric) StartControl() {
m.handler.NotifyError(err.(error)) m.handler.NotifyError(err.(error))
} }
}() }()
var influxDbConn client.Client //var influxDbConn client.Client
//for {
// conn, err := client.NewUDPClient(client.UDPConfig{Addr: m.host,})
// if err != nil {
// influxLogger.Error(err)
// } else {
// influxDbConn = conn
// break
// }
// cont := time.After(3 * time.Second)
// select {
// case <-cont:
// continue
// case <-m.handler.Done():
// return
// }
//}
//
//// Create a new point batch
//batchPoint, err := client.NewBatchPoints(client.BatchPointsConfig{
// Database: "core",
// Precision: "s",
//})
//if err != nil {
// panic(err)
//}
defer close(m.fanSpeedConsumer)
defer close(m.tempetureConsumer)
for { for {
conn, err := client.NewUDPClient(client.UDPConfig{Addr: m.host,})
if err != nil {
influxLogger.Error("cannot connect influxdb %s", err)
} else {
influxDbConn = conn
break
}
cont := time.After(3 * time.Second)
select { select {
case <-cont: case changedSpeed := <-m.fanSpeedConsumer:
continue influxLogger.Debugf("id %d speed %d",changedSpeed.Id,changedSpeed.FanSpeed)
case changedTempeture := <-m.tempetureConsumer:
influxLogger.Debugf("id %d temp %f",changedTempeture.Id,changedTempeture.Tempeture)
case <-m.handler.Done(): case <-m.handler.Done():
return return
} }
} }
//// Create a point and add to batch
// Create a new point batch //tags := map[string]string{"cpu": "cpu-total"}
batchPoint, err := client.NewBatchPoints(client.BatchPointsConfig{ //fields := map[string]interface{}{
Database: "core", // "idle": 10.1,
Precision: "s", // "system": 53.3,
}) // "user": 46.6,
if err != nil { //}
panic(err)
}
// Create a point and add to batch
tags := map[string]string{"cpu": "cpu-total"}
fields := map[string]interface{}{
"idle": 10.1,
"system": 53.3,
"user": 46.6,
}
pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now())
if err != nil {
log.Fatal(err)
}
batchPoint.AddPoint(pt)
// Write the batch
if err := influxDbConn.Write(batchPoint); err != nil {
log.Fatal(err)
}
//defer close(c.fanSpeedConsumer)
//defer log.Info("Fan control stopped")
//log.Info("Fan control started")
// //
//ticker := time.Tick(c.sampleDuration) //pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now())
//pastFanSpeedList := make([]int, c.processorCount) //if err != nil {
//newFanSpeedList := make([]int, c.processorCount) // log.Fatal(err)
//for { //}
//checker: //batchPoint.AddPoint(pt)
// for { //
// select { //// Write the batch
// case <-ticker: //if err := influxDbConn.Write(batchPoint); err != nil {
// break checker // log.Fatal(err)
// case changedSpeed := <-c.fanSpeedConsumer:
// newFanSpeedList[changedSpeed.Id] = changedSpeed.FanSpeed
// case <-c.handler.Done():
// return
// }
// }
// if (!compareFanSpeed(pastFanSpeedList, newFanSpeedList)) {
// copy(pastFanSpeedList, newFanSpeedList)
// args := make([]string, 0)
// args = append(args, "raw", "0x3a", "0x01", )
// for _, item := range newFanSpeedList {
// args = append(args, fmt.Sprintf("0x%x", item))
// }
// args = append(args,
// "0x0", "0x0", "0x0", "0x0", "0x0", "0x0",
// )
// cmd := exec.Command("ipmitool", args...)
// if err := cmd.Run(); err != nil {
// c.handler.NotifyError(err)
// return
// }
// buf := bytes.NewBufferString("")
// for _, item := range newFanSpeedList {
// buf.WriteString(fmt.Sprintf("0x%x", item))
// buf.WriteRune(' ')
// }
// log.Infof("Commit fan speed with %s", buf.String())
// }
//} //}
} }

43
main.go
View File

@ -56,8 +56,30 @@ func setMaxProcs() {
} }
} }
func FanoutSpeed(sender <-chan processor.FanspeedInfo, handler util.Handler, receivers ...chan<- processor.FanspeedInfo) {
defer handler.DecreaseWait()
defer func() {
if err := recover(); err != nil {
handler.NotifyError(err.(error))
}
}()
for {
select {
case tempeture := <-sender:
for _, receiver := range receivers {
select {
case receiver <- tempeture:
default:
log.Warn("Some Tempeture consumer blocked!")
}
}
case <-handler.Done():
return
}
}
}
func FanoutTempeture(sender <-chan processor.TempetureInfo, handler util.Handler, receivers ...chan<- processor.TempetureInfo) { func FanoutTempeture(sender <-chan processor.TempetureInfo, handler util.Handler, receivers ...chan<- processor.TempetureInfo) {
handler.IncreaseWait()
defer handler.DecreaseWait() defer handler.DecreaseWait()
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
@ -94,14 +116,18 @@ func main() {
if processorCount == 0 { if processorCount == 0 {
handler.NotifyError(errors.New("cpu not found!")) handler.NotifyError(errors.New("cpu not found!"))
} }
var (
fanController := consumer.NewFanControl(processorCount, sampleDuration, handler) tempetureChannel = make(chan processor.TempetureInfo)
fanspeedChannel = make(chan processor.FanspeedInfo)
)
defer close(tempetureChannel)
defer close(fanspeedChannel)
processors = make([]processor.Processor, 0, processorCount) processors = make([]processor.Processor, 0, processorCount)
for i := 0; i < processorCount; i++ { for i := 0; i < processorCount; i++ {
if info, err := processor.NewProcessorInfo(handler, i, sampleDuration, if info, err := processor.NewProcessorInfo(handler, i, sampleDuration,
*P, *I, *D, *SetPoint, 0x64, 0x4, *P, *I, *D, *SetPoint, 0x64, 0x4,
nil, fanController.Consumer(), tempetureChannel, fanspeedChannel,
); );
err != nil { err != nil {
handler.NotifyError(err) handler.NotifyError(err)
@ -112,8 +138,17 @@ func main() {
} }
} }
fanController := consumer.NewFanControl(processorCount, sampleDuration, handler)
metricLogger := consumer.NewInfluxMetric("", processorCount, handler)
handler.IncreaseWait()
go FanoutTempeture(tempetureChannel, handler, metricLogger.TempetureConsumer())
handler.IncreaseWait()
go FanoutSpeed(fanspeedChannel, handler, fanController.Consumer(), metricLogger.FanSpeedConsumer())
handler.IncreaseWait() handler.IncreaseWait()
go fanController.StartControl() go fanController.StartControl()
handler.IncreaseWait()
go metricLogger.StartLogging()
signal.Notify(exitSignal, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) signal.Notify(exitSignal, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
daemon.NotifyDaemon(daemon.DaemonStarted) daemon.NotifyDaemon(daemon.DaemonStarted)