1
0
Fork 0

프로세서 모델 리팩토링

This commit is contained in:
Sangbum Kim 2017-09-10 16:28:50 +09:00
parent 50957705cb
commit f681f3dd60
7 changed files with 509 additions and 192 deletions

1
.gitignore vendored
View File

@ -122,3 +122,4 @@ $RECYCLE.BIN/
build/
vendor/
cpu_ctrl

14
daemon/daemon.go Normal file
View File

@ -0,0 +1,14 @@
package daemon
import "github.com/coreos/go-systemd/daemon"
const (
DaemonStarted = notifyType("READY=1")
DaemonStopping = notifyType("STOPPING=1")
)
type notifyType string
func NotifyDaemon(status notifyType) {
daemon.SdNotify(false, string(status))
}

109
logger/logger.go Normal file
View File

@ -0,0 +1,109 @@
package logger
import (
"io"
"os"
"path"
"github.com/sirupsen/logrus"
"github.com/x-cray/logrus-prefixed-formatter"
"gopkg.in/natefinch/lumberjack.v2"
)
var (
logger = logrus.StandardLogger()
rotaters []*lumberjack.Logger
logDir string
formatter = prefixed.TextFormatter{}
)
type Config struct {
FileName string
MaxSizeMb int
MaxBackup int
MaxDay int
}
type Logger interface {
Debugf(format string, args ...interface{})
Infof(format string, args ...interface{})
Printf(format string, args ...interface{})
Warnf(format string, args ...interface{})
Warningf(format string, args ...interface{})
Errorf(format string, args ...interface{})
Fatalf(format string, args ...interface{})
Panicf(format string, args ...interface{})
Debug(args ...interface{})
Info(args ...interface{})
Print(args ...interface{})
Warn(args ...interface{})
Warning(args ...interface{})
Error(args ...interface{})
Fatal(args ...interface{})
Panic(args ...interface{})
Debugln(args ...interface{})
Infoln(args ...interface{})
Println(args ...interface{})
Warnln(args ...interface{})
Warningln(args ...interface{})
Errorln(args ...interface{})
Fatalln(args ...interface{})
Panicln(args ...interface{})
}
func init() {
logger.Formatter = &formatter
}
func LoggerIsStd() bool {
return !formatter.DisableColors
}
func InitLogger(verbose bool, logDirArg string, config *Config) {
logDir = logDirArg
colorSupport, writer := NewLogWriter(config)
formatter.DisableColors = !colorSupport
logger.Out = writer
if verbose {
logger.Level = logrus.DebugLevel
} else {
logger.Level = logrus.InfoLevel
}
}
func NewLogger(prefix string) Logger {
return logrus.NewEntry(logger).WithField("prefix", prefix)
}
func NewLogWriter(config *Config) (bool, io.Writer) {
switch config.FileName {
case "Stdout":
return true, os.Stdout
case "Stderr":
return true, os.Stderr
default:
logpath := config.FileName
if logDir != "" {
logpath = path.Join(logDir, config.FileName)
}
logger.Info(" Attention!! log writes to ", config.FileName)
rotater := &lumberjack.Logger{
Filename: logpath,
MaxSize: config.MaxSizeMb, // megabytes
MaxBackups: config.MaxBackup,
MaxAge: config.MaxDay, //days
LocalTime: true,
Compress: true,
}
rotaters = append(rotaters, rotater)
return false, rotater
}
}
func RotateLogger() {
if len(rotaters) == 0 {
return
}
logger.Info("rotating logger")
for _, rotater := range rotaters {
rotater.Rotate()
}
logger.Info("rotated")
}

245
main.go
View File

@ -1,228 +1,89 @@
package main
import (
"context"
"errors"
"fmt"
"io/ioutil"
"os"
"os/exec"
"os/signal"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"runtime"
"syscall"
"time"
"amuz.es/src/infra/cpu_ctrl/pid"
"github.com/coreos/go-systemd/daemon"
"github.com/shirou/gopsutil/cpu"
)
const (
auto = 0x0
min = 0x04
max = 0x64
"amuz.es/src/infra/cpu_ctrl/daemon"
"amuz.es/src/infra/cpu_ctrl/logger"
"amuz.es/src/infra/cpu_ctrl/processor"
"amuz.es/src/infra/cpu_ctrl/util"
)
var (
prefix = [...]int{0x3a, 0x01}
suffix = [...]int{auto, auto, auto, auto, auto, auto}
log = logger.NewLogger("cpu_ctrl")
)
type notifyType string
const (
DaemonStarted = notifyType("READY=1")
DaemonStopping = notifyType("STOPPING=1")
)
func NotifyDaemon(status notifyType) {
daemon.SdNotify(false, string(status))
func init() {
logger.InitLogger(true, "", &logger.Config{FileName: "Stderr"})
setMaxProcs()
}
func getProcessorCount() int {
var maxcpu int
stat, err := cpu.Info()
if err != nil {
panic(err)
}
for _, info := range stat {
physicalId, err := strconv.Atoi(info.PhysicalID)
if err != nil {
panic(err)
} else if maxcpu < physicalId {
maxcpu = physicalId
}
}
return maxcpu + 1
}
func setMaxProcs() {
// TODO(vmarmol): Consider limiting if we have a CPU mask in effect.
// Allow as many threads as we have cores unless the user specified a value.
var numProcs int
// if *maxProcs < 1 {
numProcs = runtime.NumCPU()
// } else {
// numProcs = *maxProcs
// }
runtime.GOMAXPROCS(numProcs)
type Processor struct {
Id int
TempeturePath string
}
type TempetureChange struct {
Id int
Tempeture float64
}
func getProcessorInfo(processorId int) (*Processor, error) {
if matches, err := filepath.Glob(fmt.Sprintf("/sys/devices/platform/coretemp.%d/hwmon/hwmon?", processorId)); err != nil {
return nil, err
} else if matches == nil {
return nil, errors.New("hwmon not found!")
} else {
return &Processor{
Id: processorId,
TempeturePath: matches[0],
}, nil
}
}
func ReadTempeture(path string, senseChan chan<- float64, errorChan chan<- error, waiter *sync.WaitGroup) {
defer waiter.Done()
if dat, err := ioutil.ReadFile(path); err != nil {
errorChan <- err
} else if tempetureSense, err := strconv.Atoi(strings.TrimSpace(string(dat))); err != nil {
errorChan <- err
} else {
senseChan <- float64(tempetureSense) / 1000.0
}
}
func CpuTempetureMonitoring(info *Processor, sampleDuration time.Duration, notifier chan<- TempetureChange, errorChan chan<- error, ctx context.Context, waiter *sync.WaitGroup) {
defer waiter.Done()
tempeturePathGlob := path.Join(info.TempeturePath, "temp?_input")
ticker := time.Tick(sampleDuration)
for {
select {
case <-ticker:
matches, err := filepath.Glob(tempeturePathGlob)
if err != nil {
panic(err)
}
tempetureReadWaiter := &sync.WaitGroup{}
queue := make(chan float64, len(matches))
// exclude package temp
for _, path := range matches[1:] {
tempetureReadWaiter.Add(1)
go ReadTempeture(path, queue, errorChan, tempetureReadWaiter)
}
tempetureReadWaiter.Wait()
close(queue)
var tempeture float64
for sense := range queue {
if tempeture < sense {
tempeture = sense
}
}
notifier <- TempetureChange{
Id: info.Id,
Tempeture: tempeture,
}
case <-ctx.Done():
return
}
}
}
func CpuTempetureScraper(processorCount int, notifier <-chan TempetureChange, errorChan chan<- error, ctx context.Context, waiter *sync.WaitGroup) {
defer waiter.Done()
var (
P, I, D = 1.5, 0.4, 2.0
SetPoint = 38.0
SampleTime = time.Second
maxNoob, minNoob = 0x64, 0x4
WindupGuard = float64(maxNoob - minNoob)
)
noobs := make([]int, processorCount)
controllers := make([]pid.Controller, 0, processorCount)
for i := 0; i < processorCount; i++ {
controller := pid.New(P, I, D)
controller.SetSetPoint(SetPoint)
controller.SetSampleTime(SampleTime)
controller.SetWindupGuard(WindupGuard)
controllers = append(controllers, controller)
}
for {
select {
case change := <-notifier:
controller := controllers[change.Id]
adj_noob := int(-controller.Update(change.Tempeture))
if adj_noob < minNoob {
adj_noob = minNoob
} else if adj_noob > maxNoob {
adj_noob = maxNoob
}
if noobs[change.Id] == adj_noob {
continue
}
noobs[change.Id] = adj_noob
fmt.Printf("cpu %d fan 0x%x\n", change.Id, adj_noob)
args := make([]string, 0)
args = append(args,
"raw",
"0x3a", "0x01",
)
for _, item := range noobs {
args = append(args, fmt.Sprintf("0x%x", item))
}
args = append(args,
"0x0", "0x0", "0x0", "0x0", "0x0", "0x0",
)
cmd := exec.Command("ipmitool", args...)
if err := cmd.Run(); err != nil {
errorChan <- err
return
}
case <-ctx.Done():
return
}
// Check if the setting was successful.
actualNumProcs := runtime.GOMAXPROCS(0)
if actualNumProcs != numProcs {
log.Printf("Specified max procs of %v but using %v\n", numProcs, actualNumProcs)
}
}
func main() {
var (
processorCount = getProcessorCount()
ctx, canceled = context.WithCancel(context.Background())
waiter = &sync.WaitGroup{}
exitSignal = make(chan os.Signal, 1)
errorChan = make(chan error, 1)
tempetureChange = make(chan TempetureChange)
sampleDuration = time.Second
processorCount = processor.GetProcessorCount()
processors []processor.Processor
exitSignal = make(chan os.Signal, 1)
handler = util.NewHandler()
sampleDuration = time.Second
)
log.Info("Cpu fan controller v0.2")
if processorCount == 0 {
errorChan <- errors.New("cpu not found!")
handler.NotifyError(errors.New("cpu not found!"))
}
processors = make([]processor.Processor, 0, processorCount)
for i := 0; i < processorCount; i++ {
if info, err := getProcessorInfo(i); err != nil {
errorChan <- err
if info, err := processor.NewProcessorInfo(handler, i, sampleDuration,
1.5, 0.4, 2.0, 38.0, 0x64, 0x4,
);
err != nil {
handler.NotifyError(err)
} else {
waiter.Add(1)
go CpuTempetureMonitoring(info, sampleDuration, tempetureChange, errorChan, ctx, waiter)
processors = append(processors, info)
handler.IncreaseWait()
go info.StartMonitoring()
}
}
waiter.Add(1)
go CpuTempetureScraper(processorCount, tempetureChange, errorChan, ctx, waiter)
defer waiter.Wait()
handler.IncreaseWait()
go TempetureControl(processors, tempetureChange, errorChan, ctx, waiter)
signal.Notify(exitSignal, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
NotifyDaemon(DaemonStarted)
defer NotifyDaemon(DaemonStopping)
daemon.NotifyDaemon(daemon.DaemonStarted)
defer daemon.NotifyDaemon(daemon.DaemonStopping)
defer close(exitSignal)
defer handler.GracefullWait()
select {
case <-ctx.Done():
fmt.Println("Service request to close this application")
case err := <-errorChan:
canceled()
fmt.Printf("error! %s\n", err.Error())
case <-handler.Done():
log.Infoln("Service request to close this application")
case err := <-handler.Error():
log.Errorf("%s", err)
case sysSignal := <-exitSignal:
canceled()
fmt.Printf("SYSCALL! %s\n", sysSignal.String())
log.Warnf("SYSCALL! %s", sysSignal.String())
}
}

193
processor/processor.go Normal file
View File

@ -0,0 +1,193 @@
package processor
import (
"github.com/shirou/gopsutil/cpu"
"strconv"
"path/filepath"
"fmt"
"errors"
"time"
"sync"
"path"
"amuz.es/src/infra/cpu_ctrl/logger"
"io/ioutil"
"strings"
"amuz.es/src/infra/cpu_ctrl/pid"
"amuz.es/src/infra/cpu_ctrl/util"
)
type processor struct {
handler util.Handler
id int
tempeturePath string
tempeture float64
tempetureChanged chan float64
sampleDuration time.Duration
fanController pid.Controller
fanSpeed int
fanMaxSpeed int
fanMinSpeed int
fanSpeedChanged chan int
}
type Processor interface {
Id() int
Tempeture() float64
FanSpeed() int
FanMaxSpeed() int
FanMinSpeed() int
StartMonitoring()
TempetureNotifier() <-chan float64
FanSpeedNotifier() <-chan int
getMaxTempetureOnProcessor(string) (float64)
readTempeture(string, chan<- float64, *sync.WaitGroup)
normalizeFanspeed(float64) (int)
}
var (
log = logger.NewLogger("processor")
)
func GetProcessorCount() (maxcpu int) {
stat, err := cpu.Info()
if err != nil {
panic(err)
}
for _, info := range stat {
physicalId, err := strconv.Atoi(info.PhysicalID)
if err != nil {
return
} else if maxcpu < physicalId {
maxcpu = physicalId
}
}
maxcpu += 1
return
}
func NewProcessorInfo(
handler util.Handler,
processorId int,
sampleDuration time.Duration,
P, I, D,
setPoint float64,
maxNoob, minNoob int,
) (Processor, error) {
if matches, err := filepath.Glob(fmt.Sprintf("/sys/devices/platform/coretemp.%d/hwmon/hwmon?", processorId)); err != nil {
return nil, err
} else if matches == nil {
return nil, errors.New("hwmon not found!")
} else {
windupGuard := float64(maxNoob - minNoob)
controller := pid.New(P, I, D)
controller.SetSetPoint(setPoint)
controller.SetSampleTime(sampleDuration)
controller.SetWindupGuard(windupGuard)
return &processor{
handler: handler,
id: processorId,
tempeturePath: matches[0],
tempetureChanged: make(chan float64),
sampleDuration: sampleDuration,
fanController: controller,
fanSpeedChanged: make(chan int),
fanMaxSpeed: maxNoob,
fanMinSpeed: minNoob,
}, nil
}
}
func (p *processor) Id() int { return p.id }
func (p *processor) Tempeture() float64 { return p.tempeture }
func (p *processor) FanSpeed() int { return p.fanSpeed }
func (p *processor) FanMaxSpeed() int { return p.fanMaxSpeed }
func (p *processor) FanMinSpeed() int { return p.fanMinSpeed }
func (p *processor) TempetureNotifier() <-chan float64 { return p.tempetureChanged }
func (p *processor) FanSpeedNotifier() <-chan int { return p.fanSpeedChanged }
func (p *processor) StartMonitoring() {
defer p.handler.DecreaseWait()
defer func() {
if err := recover(); err != nil {
p.handler.NotifyError(err.(error))
}
}()
defer close(p.tempetureChanged)
defer close(p.fanSpeedChanged)
tempeturePathGlob := path.Join(p.tempeturePath, "temp?_input")
ticker := time.Tick(p.sampleDuration)
log.Infof("Processor %d monitor started with %s", p.id, p.sampleDuration)
for {
select {
case <-ticker:
var (
highestTemp float64
fanspeed int
)
highestTemp = p.getMaxTempetureOnProcessor(tempeturePathGlob)
if highestTemp != p.tempeture {
p.tempetureChanged <- highestTemp
p.tempeture = highestTemp
}
fanspeed = p.normalizeFanspeed(p.fanController.Update(highestTemp))
if fanspeed != p.fanSpeed {
p.fanSpeedChanged <- fanspeed
p.fanSpeed = fanspeed
}
case <-p.handler.Done():
return
}
}
}
func (p *processor) getMaxTempetureOnProcessor(tempeturePathGlob string) (maxTempeture float64) {
matches, err := filepath.Glob(tempeturePathGlob)
if err != nil {
panic(err)
}
tempetureReadWaiter := &sync.WaitGroup{}
queue := make(chan float64, len(matches))
defer close(queue)
// exclude package temp
for _, tempetureFilePath := range matches[1:] {
tempetureReadWaiter.Add(1)
go p.readTempeture(tempetureFilePath, queue, tempetureReadWaiter)
}
tempetureReadWaiter.Wait()
for sense := range queue {
if maxTempeture < sense {
maxTempeture = sense
}
}
return
}
func (p *processor) readTempeture(path string, senseChan chan<- float64, waiter *sync.WaitGroup) {
defer waiter.Done()
if dat, err := ioutil.ReadFile(path); err != nil {
p.handler.NotifyError(err)
} else if tempetureSense, err := strconv.Atoi(strings.TrimSpace(string(dat))); err != nil {
p.handler.NotifyError(err)
} else {
senseChan <- float64(tempetureSense) / 1000.0
}
}
func (p *processor) normalizeFanspeed(response float64) (adjusted int) {
adjusted = int(-response)
if adjusted < p.fanMinSpeed {
adjusted = p.fanMinSpeed
} else if adjusted > p.fanMaxSpeed {
adjusted = p.fanMaxSpeed
}
return
}

View File

@ -0,0 +1,85 @@
package speed_ctrl
import (
"errors"
"os"
"os/signal"
"runtime"
"syscall"
"time"
"amuz.es/src/infra/cpu_ctrl/daemon"
"amuz.es/src/infra/cpu_ctrl/logger"
"amuz.es/src/infra/cpu_ctrl/processor"
"fmt"
"os/exec"
"amuz.es/src/infra/cpu_ctrl/pid"
"amuz.es/src/infra/cpu_ctrl/util"
)
var (
log = logger.NewLogger("speed_ctrl")
)
func TempetureControl(
handler util.Handler,
processors []processor.Processor,
) {
defer handler.DecreaseWait()
var (
P, I, D = 1.5, 0.4, 2.0
SetPoint = 38.0
SampleTime = time.Second
maxNoob, minNoob = 0x64, 0x4
WindupGuard = float64(maxNoob - minNoob)
)
noobs := make([]int, processorCount)
controllers := make([]pid.Controller, 0, processorCount)
for i := 0; i < processorCount; i++ {
controller := pid.New(P, I, D)
controller.SetSetPoint(SetPoint)
controller.SetSampleTime(SampleTime)
controller.SetWindupGuard(WindupGuard)
controllers = append(controllers, controller)
}
for {
select {
case change := <-notifier:
controller := controllers[change.Id()]
adj_noob := int(-controller.Update(change.Tempeture()))
if adj_noob < minNoob {
adj_noob = minNoob
} else if adj_noob > maxNoob {
adj_noob = maxNoob
}
if noobs[change.Id()] == adj_noob {
continue
}
noobs[change.Id()] = adj_noob
log.Printf("Processor %d fan 0x%x\n", change.Id, adj_noob)
args := make([]string, 0)
args = append(args,
"raw",
"0x3a", "0x01",
)
for _, item := range noobs {
args = append(args, fmt.Sprintf("0x%x", item))
}
args = append(args,
"0x0", "0x0", "0x0", "0x0", "0x0", "0x0",
)
cmd := exec.Command("ipmitool", args...)
if err := cmd.Run(); err != nil {
errorChan <- err
return
}
case <-ctx.Done():
return
}
}
}

54
util/handler.go Normal file
View File

@ -0,0 +1,54 @@
package util
import (
"sync"
"context"
"amuz.es/src/infra/cpu_ctrl/logger"
)
var (
log = logger.NewLogger("util")
)
type handler struct {
errorChan chan error
ctx context.Context
canceler context.CancelFunc
waiter *sync.WaitGroup
}
type Handler interface {
NotifyError(err error)
Error() <-chan error
Done() <-chan struct{}
GracefullWait()
IncreaseWait()
DecreaseWait()
}
func NewHandler() Handler {
ctx, canceler := context.WithCancel(context.Background())
return &handler{
ctx: ctx,
canceler: canceler,
waiter: &sync.WaitGroup{},
errorChan: make(chan error),
}
}
func (h *handler) NotifyError(err error) { h.errorChan <- err }
func (h *handler) Error() <-chan error { return h.errorChan }
func (h *handler) Done() <-chan struct{} { return h.ctx.Done() }
func (h *handler) GracefullWait() {
if h.ctx.Err() == nil {
h.canceler()
}
h.waiter.Wait()
close(h.errorChan)
for remainError := range h.errorChan {
log.Errorf("%s", remainError)
}
}
func (h *handler) IncreaseWait() { h.waiter.Add(1) }
func (h *handler) DecreaseWait() { h.waiter.Done() }