1
0
Fork 0

os metric 반영

This commit is contained in:
Sangbum Kim 2018-07-10 01:34:39 +09:00
parent be64e4fa34
commit 8203db85ce
5 changed files with 384 additions and 96 deletions

View File

@ -8,9 +8,11 @@ import (
"github.com/influxdata/influxdb/client/v2"
zlog "amuz.es/src/infra/goutils/logger/zap"
"go.uber.org/zap"
"unsafe"
"sync/atomic"
)
type data struct {
type processorData struct {
Tempeture float64
FanSpeed int
}
@ -21,12 +23,14 @@ type influxMetric struct {
handler *handler.Handler
fanSpeedConsumer chan producer.FanspeedInfo
tempetureConsumer chan producer.TempetureInfo
osMetricConsumer chan producer.OSMetricInfo
logger *zap.SugaredLogger
}
type InfluxMetric interface {
FanSpeedConsumer() chan<- producer.FanspeedInfo
TempetureConsumer() chan<- producer.TempetureInfo
OsMetricConsumer() chan<- producer.OSMetricInfo
StartLogging()
}
@ -37,12 +41,14 @@ func NewInfluxMetric(host string, processorCount int, handler *handler.Handler)
handler: handler,
fanSpeedConsumer: make(chan producer.FanspeedInfo, processorCount),
tempetureConsumer: make(chan producer.TempetureInfo, processorCount),
osMetricConsumer: make(chan producer.OSMetricInfo, 1),
logger: zlog.New(nil, "influx"),
}
}
func (m *influxMetric) FanSpeedConsumer() chan<- producer.FanspeedInfo { return m.fanSpeedConsumer }
func (m *influxMetric) TempetureConsumer() chan<- producer.TempetureInfo { return m.tempetureConsumer }
func (m *influxMetric) OsMetricConsumer() chan<- producer.OSMetricInfo { return m.osMetricConsumer }
func (m *influxMetric) StartLogging() {
defer m.handler.DecreaseWait()
@ -74,10 +80,12 @@ func (m *influxMetric) StartLogging() {
}
}
ticker := time.Tick(time.Second)
metricData := make([]data, m.processorCount)
sendData := make([]data, m.processorCount)
var (
ticker = time.Tick(time.Second)
osMetricPtr unsafe.Pointer
metricData = make([]processorData, m.processorCount)
sendData = make([]processorData, m.processorCount)
)
go func() {
for changedSpeed := range m.fanSpeedConsumer {
metricData[changedSpeed.Id].FanSpeed = changedSpeed.FanSpeed
@ -90,11 +98,18 @@ func (m *influxMetric) StartLogging() {
}
}()
go func() {
for sampledOsMetric := range m.osMetricConsumer {
atomic.StorePointer(&osMetricPtr, unsafe.Pointer(&sampledOsMetric))
}
}()
for {
select {
case <-ticker:
swapped := (*producer.OSMetricInfo)(atomic.SwapPointer(&osMetricPtr, nil))
copy(sendData, metricData)
go m.sendPoint(influxDbConn, sendData)
go m.sendPoint(influxDbConn, sendData, swapped)
case <-m.handler.Done():
return
}
@ -102,17 +117,14 @@ func (m *influxMetric) StartLogging() {
}
func (m *influxMetric) sendPoint(
influxDbConn client.Client,
datas []data) {
processorDatas []processorData,
osMetric *producer.OSMetricInfo) {
pointList := make([]*client.Point, 0, 0)
at := time.Now()
for id, data := range datas {
if point, err := m.getPoint(id, data, at); err == nil {
pointList = append(pointList, point)
} else {
m.logger.Debugf("id %d err %s", id, err)
}
}
m.getProcessorPoint(&pointList, processorDatas, time.Now())
if osMetric != nil {
m.getOsMetric(&pointList, osMetric)
}
if len(pointList) == 0 {
return
}
@ -133,14 +145,36 @@ func (m *influxMetric) sendPoint(
}
func (m *influxMetric) getPoint(id int, data data, at time.Time) (*client.Point, error) {
func (m *influxMetric) getOsMetric(points *[]*client.Point, info *producer.OSMetricInfo) {
for _, applier := range info.Applier() {
name, tagsSlice, fieldsSlice := applier()
for i := 0; i < len(tagsSlice); i++ {
var point *client.Point
tags, fields := tagsSlice[i], fieldsSlice[i]
point, err := client.NewPoint(name, tags, fields, info.At)
if err != nil {
m.logger.Debugf("os matric name %s err %s", name, err)
continue
}
*points = append(*points, point)
}
}
return
}
func (m *influxMetric) getProcessorPoint(points *[]*client.Point, datas []processorData, at time.Time) () {
for id, data := range datas {
// Create a point and add to batch
tags := map[string]string{"processor": strconv.Itoa(id)}
fields := map[string]interface{}{
"tempeture": data.Tempeture,
"fan": data.FanSpeed,
}
return client.NewPoint("processor", tags, fields, at)
point, err := client.NewPoint("processor", tags, fields, at)
if err != nil {
m.logger.Debugf("processor id %d err %s", id, err)
continue
}
*points = append(*points, point)
}
}

View File

@ -60,7 +60,18 @@ func (c *sampleOSLogger) StartControl() {
if swapped == nil {
continue
}
c.logger.Info("CHANGED !! ", (*producer.OSMetricInfo)(swapped))
for _, applier := range swapped.Applier() {
name, tagsSlice, fieldsSlice := applier()
for i := 0; i < len(tagsSlice); i++ {
tags, fields := tagsSlice[i], fieldsSlice[i]
c.logger.Info("os metric")
c.logger.Info("-> name: ", name)
c.logger.Info("-> tags: ", tags)
c.logger.Info("-> fields: ", fields)
c.logger.Info("-> at: ", swapped.At)
}
}
case <-c.handler.Done():
return
}

View File

@ -9,6 +9,7 @@ import (
"go.uber.org/zap"
"go.uber.org/multierr"
"context"
"os"
)
type osMetric struct {
@ -20,6 +21,8 @@ type osMetric struct {
}
type osMetricApplier func(*OSMetricInfo, chan<- error, *sync.WaitGroup)
type OSMetricPointGen func() (string, []map[string]string, []map[string]interface{})
type OsMetric interface {
StartMonitoring()
}
@ -68,6 +71,7 @@ func (p *osMetric) StartMonitoring() {
metricRecorders := p.availableMetrics()
p.info.Host, _ = os.Hostname()
for {
select {
case now := <-ticker:

View File

@ -12,44 +12,152 @@ import (
"time"
"go.uber.org/multierr"
"sync"
"runtime"
)
type NetIOInfo struct {
Name string
RxBytes,
TxBytes uint64
Name string `json:"name"`
RxBytes uint64 `json:"rx_bytes"`
TxBytes uint64 `json:"tx_bytes"`
}
type MemoryInfo struct {
Total,
Active,
Cached,
Free,
Inactive,
SwapFree,
SwapTotal,
SwapUsed,
Used uint64
Total uint64 `json:"total_bytes"`
Active uint64 `json:"active_bytes"`
Cached uint64 `json:"cached_bytes"`
Free uint64 `json:"free_bytes"`
Inactive uint64 `json:"inactive_bytes"`
SwapFree uint64 `json:"swap_free_bytes"`
SwapTotal uint64 `json:"swap_total_bytes"`
SwapUsed uint64 `json:"swap_used_bytes"`
Used uint64 `json:"used_bytes"`
}
type LoadInfo struct {
Avg1,
Avg5,
Avg15 float64
Avg1 float64 `json:"avg_1"`
Avg5 float64 `json:"avg_5"`
Avg15 float64 `json:"avg_15"`
}
type CPUInfo struct {
Idle,
Nice,
System,
User float64
Idle float64 `json:"idle"`
Nice float64 `json:"nice"`
System float64 `json:"system"`
User float64 `json:"user"`
}
type OSMetricInfo struct {
Memory MemoryInfo
Load LoadInfo
NetIO map[string]NetIOInfo
Memory MemoryInfo `json:"memory"`
Load LoadInfo `json:"load"`
NetIO map[string]NetIOInfo `json:"net_io"`
Uptime time.Duration
CPU CPUInfo
Uptime time.Duration `json:"uptime"`
CPU CPUInfo `json:"cpu"`
At time.Time
Host string `json:"host"`
At time.Time `json:"at"`
}
func (p *OSMetricInfo) MarshalUptime() (name string, tags []map[string]string, fields []map[string]interface{}) {
name, tags, fields =
"uptime",
[]map[string]string{
{
"os": runtime.GOOS,
},
},
[]map[string]interface{}{
{
"duration": p.Uptime,
},
}
return
}
func (p *OSMetricInfo) MarshalLoad() (name string, tags []map[string]string, fields []map[string]interface{}) {
name, tags, fields =
"load",
[]map[string]string{
{
"os": runtime.GOOS,
},
},
[]map[string]interface{}{
{
"avg_1": p.Load.Avg1,
"avg_5": p.Load.Avg5,
"avg_15": p.Load.Avg15,
},
}
return
}
func (p *OSMetricInfo) MarshalCPU() (name string, tags []map[string]string, fields []map[string]interface{}) {
name, tags, fields =
"cpu",
[]map[string]string{
{
"os": runtime.GOOS,
},
},
[]map[string]interface{}{
{
"idle": p.CPU.Idle,
"nice": p.CPU.Nice,
"system": p.CPU.System,
"user": p.CPU.User,
},
}
return
}
func (p *OSMetricInfo) MarshalNetworks() (name string, tags []map[string]string, fields []map[string]interface{}) {
name = "network"
tags = make([]map[string]string, 0)
fields = make([]map[string]interface{}, 0)
for name, info := range p.NetIO {
tags, fields =
append(tags, map[string]string{
"os": runtime.GOOS,
"name": name,
}),
append(fields, map[string]interface{}{
"rx_bytes": info.RxBytes,
"tx_bytes": info.TxBytes,
})
}
return
}
func (p *OSMetricInfo) MarshalMemory() (name string, tags []map[string]string, fields []map[string]interface{}) {
name, tags, fields =
"memory",
[]map[string]string{
{
"os": runtime.GOOS,
},
},
[]map[string]interface{}{
{
"total_bytes": p.Memory.Total,
"active_bytes": p.Memory.Active,
"cached_bytes": p.Memory.Cached,
"free_bytes": p.Memory.Free,
"inactive_bytes": p.Memory.Inactive,
"swap_free_bytes": p.Memory.SwapFree,
"swap_total_bytes": p.Memory.SwapTotal,
"swap_used_bytes": p.Memory.SwapUsed,
"used_bytes": p.Memory.Used,
},
}
return
}
func (p *OSMetricInfo) Applier() (generator []OSMetricPointGen) {
generator = append(
generator,
p.MarshalLoad,
p.MarshalCPU,
p.MarshalMemory,
p.MarshalNetworks,
p.MarshalUptime,
)
return
}
func (p *osMetric) readMemoryStat(info *OSMetricInfo, errChan chan<- error, waiter *sync.WaitGroup) {
@ -66,7 +174,7 @@ func (p *osMetric) readMemoryStat(info *OSMetricInfo, errChan chan<- error, wait
}
}()
if err !=nil{
if err != nil {
return
}
p.logger.Debugf("memory total: ", misc.FileSizeIEC(memoryInfo.Total))
@ -105,7 +213,7 @@ func (p *osMetric) readLoadStat(info *OSMetricInfo, errChan chan<- error, waiter
}
}()
if err !=nil{
if err != nil {
return
}
p.logger.Debugf("load Loadavg1: %f \n", load.Loadavg1)
@ -132,7 +240,7 @@ func (p *osMetric) readNetworkStat(info *OSMetricInfo, errChan chan<- error, wai
}
}()
if err !=nil{
if err != nil {
return
}
netIoMap := make(map[string]NetIOInfo)
@ -164,7 +272,7 @@ func (p *osMetric) readUptimeStat(info *OSMetricInfo, errChan chan<- error, wait
}
}()
if err !=nil{
if err != nil {
return
}
p.logger.Debugf("readUptimeStat: %s\n", durafmt.Parse(ut).String())
@ -187,7 +295,7 @@ func (p *osMetric) readCpuStat(info *OSMetricInfo, errChan chan<- error, waiter
}
}()
if err !=nil{
if err != nil {
return
}
p.logger.Debugf("readCpuStat: idle=%d\n", ct.Idle)

View File

@ -13,59 +13,190 @@ import (
"time"
"go.uber.org/multierr"
"sync"
"runtime"
)
type NetIOInfo struct {
Name string
RxBytes,
TxBytes uint64
Name string `json:"name"`
RxBytes uint64 `json:"rx_bytes"`
TxBytes uint64 `json:"tx_bytes"`
}
type DiskIOInfo struct {
Name string
ReadsCompletedBytes,
WritesCompletedBytes uint64
Name string `json:"name"`
ReadsCompletedBytes uint64 `json:"reads_completed_bytes"`
WritesCompletedBytes uint64 `json:"writes_completed_bytes"`
}
type MemoryInfo struct {
Total,
Used,
Buffers,
Cached,
Free,
Active,
Inactive,
SwapTotal,
SwapUsed,
SwapCached,
SwapFree uint64
Total uint64 `json:"total_bytes"`
Used uint64 `json:"used_bytes"`
Buffers uint64 `json:"buffers_bytes"`
Cached uint64 `json:"cached_bytes"`
Free uint64 `json:"free_bytes"`
Active uint64 `json:"active_bytes"`
Inactive uint64 `json:"inactive_bytes"`
SwapTotal uint64 `json:"swap_total_bytes"`
SwapUsed uint64 `json:"swap_used_bytes"`
SwapCached uint64 `json:"swap_cached_bytes"`
SwapFree uint64 `json:"swap_free_bytes"`
}
type LoadInfo struct {
Avg1,
Avg5,
Avg15 float64
Avg1 float64 `json:"avg_1"`
Avg5 float64 `json:"avg_5"`
Avg15 float64 `json:"avg_15"`
}
type CPUInfo struct {
User,
Nice,
System,
Idle,
Iowait,
Irq,
Softirq,
Steal float64
User float64 `json:"user"`
Nice float64 `json:"nice"`
System float64 `json:"system"`
Idle float64 `json:"idle"`
Iowait float64 `json:"iowait"`
Irq float64 `json:"irq"`
Softirq float64 `json:"softirq"`
Steal float64 `json:"steal"`
}
type OSMetricInfo struct {
Memory MemoryInfo
Load LoadInfo
NetIO map[string]NetIOInfo
Memory MemoryInfo `json:"memory"`
Load LoadInfo `json:"load"`
NetIO map[string]NetIOInfo `json:"net_io"`
Uptime time.Duration
CPU CPUInfo
DiskIO map[string]DiskIOInfo
At time.Time
Uptime time.Duration `json:"uptime"`
CPU CPUInfo `json:"cpu"`
DiskIO map[string]DiskIOInfo `json:"disk_io"`
Host string `json:"host"`
At time.Time `json:"at"`
}
func (p *OSMetricInfo) MarshalUptime() (name string, tags []map[string]string, fields []map[string]interface{}) {
name, tags, fields =
"uptime",
[]map[string]string{
{
"os": runtime.GOOS,
},
},
[]map[string]interface{}{
{
"duration": p.Uptime,
},
}
return
}
func (p *OSMetricInfo) MarshalLoad() (name string, tags []map[string]string, fields []map[string]interface{}) {
name, tags, fields =
"load",
[]map[string]string{
{
"os": runtime.GOOS,
},
},
[]map[string]interface{}{
{
"avg_1": p.Load.Avg1,
"avg_5": p.Load.Avg5,
"avg_15": p.Load.Avg15,
},
}
return
}
func (p *OSMetricInfo) MarshalCPU() (name string, tags []map[string]string, fields []map[string]interface{}) {
name, tags, fields =
"cpu",
[]map[string]string{
{
"os": runtime.GOOS,
},
},
[]map[string]interface{}{
{
"user": p.CPU.User,
"nice": p.CPU.Nice,
"system": p.CPU.System,
"idle": p.CPU.Idle,
"iowait": p.CPU.Iowait,
"irq": p.CPU.Irq,
"softirq": p.CPU.Softirq,
"steal": p.CPU.Steal,
},
}
return
}
func (p *OSMetricInfo) MarshalNetworks() (name string, tags []map[string]string, fields []map[string]interface{}) {
name = "network"
tags = make([]map[string]string, 0)
fields = make([]map[string]interface{}, 0)
for name, info := range p.NetIO {
tags, fields =
append(tags, map[string]string{
"os": runtime.GOOS,
"name": name,
}),
append(fields, map[string]interface{}{
"rx_bytes": info.RxBytes,
"tx_bytes": info.TxBytes,
})
}
return
}
func (p *OSMetricInfo) MarshalDisk() (name string, tags []map[string]string, fields []map[string]interface{}) {
name = "disk"
tags = make([]map[string]string, 0)
fields = make([]map[string]interface{}, 0)
for name, info := range p.DiskIO {
tags, fields =
append(tags, map[string]string{
"os": runtime.GOOS,
"name": name,
}),
append(fields, map[string]interface{}{
"reads_completed_bytes": info.ReadsCompletedBytes,
"writes_completed_bytes": info.WritesCompletedBytes,
})
}
return
}
func (p *OSMetricInfo) MarshalMemory() (name string, tags []map[string]string, fields []map[string]interface{}) {
name, tags, fields =
"memory",
[]map[string]string{
{
"os": runtime.GOOS,
},
},
[]map[string]interface{}{
{
"total_bytes": p.Memory.Total,
"used_bytes": p.Memory.Used,
"buffers_bytes": p.Memory.Buffers,
"cached_bytes": p.Memory.Cached,
"free_bytes": p.Memory.Free,
"active_bytes": p.Memory.Active,
"inactive_bytes": p.Memory.Inactive,
"swap_total_bytes": p.Memory.SwapTotal,
"swap_used_bytes": p.Memory.SwapUsed,
"swap_cached_bytes": p.Memory.SwapCached,
"swap_free_bytes": p.Memory.SwapFree,
},
}
return
}
func (p *OSMetricInfo) Applier() (generator []OSMetricPointGen) {
generator = append(
generator,
p.MarshalLoad,
p.MarshalCPU,
p.MarshalMemory,
p.MarshalNetworks,
p.MarshalDisk,
p.MarshalUptime,
)
return
}
func (p *osMetric) readMemoryStat(info *OSMetricInfo, errChan chan<- error, waiter *sync.WaitGroup) {
memoryInfo, err := memory.Get()
@ -80,7 +211,7 @@ func (p *osMetric) readMemoryStat(info *OSMetricInfo, errChan chan<- error, wait
}
}()
if err !=nil{
if err != nil {
return
}
p.logger.Debugf("memory total: ", misc.FileSizeIEC(memoryInfo.Total))
@ -122,7 +253,7 @@ func (p *osMetric) readLoadStat(info *OSMetricInfo, errChan chan<- error, waiter
}
}()
if err !=nil{
if err != nil {
return
}
p.logger.Debugf("load Loadavg1: %f \n", load.Loadavg1)
@ -149,7 +280,7 @@ func (p *osMetric) readNetworkStat(info *OSMetricInfo, errChan chan<- error, wai
}
}()
if err !=nil{
if err != nil {
return
}
netIoMap := make(map[string]NetIOInfo)
@ -181,7 +312,7 @@ func (p *osMetric) readUptimeStat(info *OSMetricInfo, errChan chan<- error, wait
}
}()
if err !=nil{
if err != nil {
return
}
p.logger.Debugf("readUptimeStat: %s\n", durafmt.Parse(ut).String())
@ -204,7 +335,7 @@ func (p *osMetric) readCpuStat(info *OSMetricInfo, errChan chan<- error, waiter
}
}()
if err !=nil{
if err != nil {
return
}
p.logger.Debugf("cpu user: %d\n", ct.User)
@ -250,7 +381,7 @@ func (p *osMetric) readDiskStat(info *OSMetricInfo, errChan chan<- error, waiter
errChan <- err
}
}()
if err !=nil{
if err != nil {
return
}