diff --git a/consumer/influx_stat.go b/consumer/influx_stat.go index 9269051..7ee1324 100644 --- a/consumer/influx_stat.go +++ b/consumer/influx_stat.go @@ -8,9 +8,11 @@ import ( "github.com/influxdata/influxdb/client/v2" zlog "amuz.es/src/infra/goutils/logger/zap" "go.uber.org/zap" + "unsafe" + "sync/atomic" ) -type data struct { +type processorData struct { Tempeture float64 FanSpeed int } @@ -21,12 +23,14 @@ type influxMetric struct { handler *handler.Handler fanSpeedConsumer chan producer.FanspeedInfo tempetureConsumer chan producer.TempetureInfo + osMetricConsumer chan producer.OSMetricInfo logger *zap.SugaredLogger } type InfluxMetric interface { FanSpeedConsumer() chan<- producer.FanspeedInfo TempetureConsumer() chan<- producer.TempetureInfo + OsMetricConsumer() chan<- producer.OSMetricInfo StartLogging() } @@ -37,12 +41,14 @@ func NewInfluxMetric(host string, processorCount int, handler *handler.Handler) handler: handler, fanSpeedConsumer: make(chan producer.FanspeedInfo, processorCount), tempetureConsumer: make(chan producer.TempetureInfo, processorCount), + osMetricConsumer: make(chan producer.OSMetricInfo, 1), logger: zlog.New(nil, "influx"), } } func (m *influxMetric) FanSpeedConsumer() chan<- producer.FanspeedInfo { return m.fanSpeedConsumer } func (m *influxMetric) TempetureConsumer() chan<- producer.TempetureInfo { return m.tempetureConsumer } +func (m *influxMetric) OsMetricConsumer() chan<- producer.OSMetricInfo { return m.osMetricConsumer } func (m *influxMetric) StartLogging() { defer m.handler.DecreaseWait() @@ -74,10 +80,12 @@ func (m *influxMetric) StartLogging() { } } - ticker := time.Tick(time.Second) - metricData := make([]data, m.processorCount) - sendData := make([]data, m.processorCount) - + var ( + ticker = time.Tick(time.Second) + osMetricPtr unsafe.Pointer + metricData = make([]processorData, m.processorCount) + sendData = make([]processorData, m.processorCount) + ) go func() { for changedSpeed := range m.fanSpeedConsumer { metricData[changedSpeed.Id].FanSpeed = changedSpeed.FanSpeed @@ -90,11 +98,18 @@ func (m *influxMetric) StartLogging() { } }() + go func() { + for sampledOsMetric := range m.osMetricConsumer { + atomic.StorePointer(&osMetricPtr, unsafe.Pointer(&sampledOsMetric)) + } + }() + for { select { case <-ticker: + swapped := (*producer.OSMetricInfo)(atomic.SwapPointer(&osMetricPtr, nil)) copy(sendData, metricData) - go m.sendPoint(influxDbConn, sendData) + go m.sendPoint(influxDbConn, sendData, swapped) case <-m.handler.Done(): return } @@ -102,17 +117,14 @@ func (m *influxMetric) StartLogging() { } func (m *influxMetric) sendPoint( influxDbConn client.Client, - datas []data) { + processorDatas []processorData, + osMetric *producer.OSMetricInfo) { pointList := make([]*client.Point, 0, 0) - at := time.Now() - for id, data := range datas { - if point, err := m.getPoint(id, data, at); err == nil { - pointList = append(pointList, point) - } else { - m.logger.Debugf("id %d err %s", id, err) - } - } + m.getProcessorPoint(&pointList, processorDatas, time.Now()) + if osMetric != nil { + m.getOsMetric(&pointList, osMetric) + } if len(pointList) == 0 { return } @@ -133,14 +145,36 @@ func (m *influxMetric) sendPoint( } -func (m *influxMetric) getPoint(id int, data data, at time.Time) (*client.Point, error) { - - // Create a point and add to batch - tags := map[string]string{"processor": strconv.Itoa(id)} - fields := map[string]interface{}{ - "tempeture": data.Tempeture, - "fan": data.FanSpeed, +func (m *influxMetric) getOsMetric(points *[]*client.Point, info *producer.OSMetricInfo) { + for _, applier := range info.Applier() { + name, tagsSlice, fieldsSlice := applier() + for i := 0; i < len(tagsSlice); i++ { + var point *client.Point + tags, fields := tagsSlice[i], fieldsSlice[i] + point, err := client.NewPoint(name, tags, fields, info.At) + if err != nil { + m.logger.Debugf("os matric name %s err %s", name, err) + continue + } + *points = append(*points, point) + } + } + return +} + +func (m *influxMetric) getProcessorPoint(points *[]*client.Point, datas []processorData, at time.Time) () { + for id, data := range datas { + // Create a point and add to batch + tags := map[string]string{"processor": strconv.Itoa(id)} + fields := map[string]interface{}{ + "tempeture": data.Tempeture, + "fan": data.FanSpeed, + } + point, err := client.NewPoint("processor", tags, fields, at) + if err != nil { + m.logger.Debugf("processor id %d err %s", id, err) + continue + } + *points = append(*points, point) } - - return client.NewPoint("processor", tags, fields, at) } diff --git a/consumer/testLogging.go b/consumer/testLogging.go index 135dc6c..3771b58 100644 --- a/consumer/testLogging.go +++ b/consumer/testLogging.go @@ -60,7 +60,18 @@ func (c *sampleOSLogger) StartControl() { if swapped == nil { continue } - c.logger.Info("CHANGED !! ", (*producer.OSMetricInfo)(swapped)) + for _, applier := range swapped.Applier() { + name, tagsSlice, fieldsSlice := applier() + for i := 0; i < len(tagsSlice); i++ { + tags, fields := tagsSlice[i], fieldsSlice[i] + + c.logger.Info("os metric") + c.logger.Info("-> name: ", name) + c.logger.Info("-> tags: ", tags) + c.logger.Info("-> fields: ", fields) + c.logger.Info("-> at: ", swapped.At) + } + } case <-c.handler.Done(): return } diff --git a/producer/os.go b/producer/os.go index 9d0ca9f..0d66d66 100644 --- a/producer/os.go +++ b/producer/os.go @@ -9,6 +9,7 @@ import ( "go.uber.org/zap" "go.uber.org/multierr" "context" + "os" ) type osMetric struct { @@ -20,6 +21,8 @@ type osMetric struct { } type osMetricApplier func(*OSMetricInfo, chan<- error, *sync.WaitGroup) +type OSMetricPointGen func() (string, []map[string]string, []map[string]interface{}) + type OsMetric interface { StartMonitoring() } @@ -68,6 +71,7 @@ func (p *osMetric) StartMonitoring() { metricRecorders := p.availableMetrics() + p.info.Host, _ = os.Hostname() for { select { case now := <-ticker: diff --git a/producer/os_darwin.go b/producer/os_darwin.go index 82caf22..64e16d8 100644 --- a/producer/os_darwin.go +++ b/producer/os_darwin.go @@ -12,44 +12,152 @@ import ( "time" "go.uber.org/multierr" "sync" + "runtime" ) type NetIOInfo struct { - Name string - RxBytes, - TxBytes uint64 + Name string `json:"name"` + RxBytes uint64 `json:"rx_bytes"` + TxBytes uint64 `json:"tx_bytes"` } + type MemoryInfo struct { - Total, - Active, - Cached, - Free, - Inactive, - SwapFree, - SwapTotal, - SwapUsed, - Used uint64 + Total uint64 `json:"total_bytes"` + Active uint64 `json:"active_bytes"` + Cached uint64 `json:"cached_bytes"` + Free uint64 `json:"free_bytes"` + Inactive uint64 `json:"inactive_bytes"` + SwapFree uint64 `json:"swap_free_bytes"` + SwapTotal uint64 `json:"swap_total_bytes"` + SwapUsed uint64 `json:"swap_used_bytes"` + Used uint64 `json:"used_bytes"` } + type LoadInfo struct { - Avg1, - Avg5, - Avg15 float64 + Avg1 float64 `json:"avg_1"` + Avg5 float64 `json:"avg_5"` + Avg15 float64 `json:"avg_15"` } type CPUInfo struct { - Idle, - Nice, - System, - User float64 + Idle float64 `json:"idle"` + Nice float64 `json:"nice"` + System float64 `json:"system"` + User float64 `json:"user"` } type OSMetricInfo struct { - Memory MemoryInfo - Load LoadInfo - NetIO map[string]NetIOInfo + Memory MemoryInfo `json:"memory"` + Load LoadInfo `json:"load"` + NetIO map[string]NetIOInfo `json:"net_io"` - Uptime time.Duration - CPU CPUInfo + Uptime time.Duration `json:"uptime"` + CPU CPUInfo `json:"cpu"` - At time.Time + Host string `json:"host"` + At time.Time `json:"at"` +} + +func (p *OSMetricInfo) MarshalUptime() (name string, tags []map[string]string, fields []map[string]interface{}) { + name, tags, fields = + "uptime", + []map[string]string{ + { + "os": runtime.GOOS, + }, + }, + []map[string]interface{}{ + { + "duration": p.Uptime, + }, + } + return +} +func (p *OSMetricInfo) MarshalLoad() (name string, tags []map[string]string, fields []map[string]interface{}) { + name, tags, fields = + "load", + []map[string]string{ + { + "os": runtime.GOOS, + }, + }, + []map[string]interface{}{ + { + "avg_1": p.Load.Avg1, + "avg_5": p.Load.Avg5, + "avg_15": p.Load.Avg15, + }, + } + return +} +func (p *OSMetricInfo) MarshalCPU() (name string, tags []map[string]string, fields []map[string]interface{}) { + name, tags, fields = + "cpu", + []map[string]string{ + { + "os": runtime.GOOS, + }, + }, + []map[string]interface{}{ + { + "idle": p.CPU.Idle, + "nice": p.CPU.Nice, + "system": p.CPU.System, + "user": p.CPU.User, + }, + } + return +} + +func (p *OSMetricInfo) MarshalNetworks() (name string, tags []map[string]string, fields []map[string]interface{}) { + name = "network" + tags = make([]map[string]string, 0) + fields = make([]map[string]interface{}, 0) + for name, info := range p.NetIO { + tags, fields = + append(tags, map[string]string{ + "os": runtime.GOOS, + "name": name, + }), + append(fields, map[string]interface{}{ + "rx_bytes": info.RxBytes, + "tx_bytes": info.TxBytes, + }) + } + return +} + +func (p *OSMetricInfo) MarshalMemory() (name string, tags []map[string]string, fields []map[string]interface{}) { + name, tags, fields = + "memory", + []map[string]string{ + { + "os": runtime.GOOS, + }, + }, + []map[string]interface{}{ + { + "total_bytes": p.Memory.Total, + "active_bytes": p.Memory.Active, + "cached_bytes": p.Memory.Cached, + "free_bytes": p.Memory.Free, + "inactive_bytes": p.Memory.Inactive, + "swap_free_bytes": p.Memory.SwapFree, + "swap_total_bytes": p.Memory.SwapTotal, + "swap_used_bytes": p.Memory.SwapUsed, + "used_bytes": p.Memory.Used, + }, + } + return +} +func (p *OSMetricInfo) Applier() (generator []OSMetricPointGen) { + generator = append( + generator, + p.MarshalLoad, + p.MarshalCPU, + p.MarshalMemory, + p.MarshalNetworks, + p.MarshalUptime, + ) + return } func (p *osMetric) readMemoryStat(info *OSMetricInfo, errChan chan<- error, waiter *sync.WaitGroup) { @@ -66,7 +174,7 @@ func (p *osMetric) readMemoryStat(info *OSMetricInfo, errChan chan<- error, wait } }() - if err !=nil{ + if err != nil { return } p.logger.Debugf("memory total: ", misc.FileSizeIEC(memoryInfo.Total)) @@ -105,7 +213,7 @@ func (p *osMetric) readLoadStat(info *OSMetricInfo, errChan chan<- error, waiter } }() - if err !=nil{ + if err != nil { return } p.logger.Debugf("load Loadavg1: %f \n", load.Loadavg1) @@ -132,7 +240,7 @@ func (p *osMetric) readNetworkStat(info *OSMetricInfo, errChan chan<- error, wai } }() - if err !=nil{ + if err != nil { return } netIoMap := make(map[string]NetIOInfo) @@ -164,7 +272,7 @@ func (p *osMetric) readUptimeStat(info *OSMetricInfo, errChan chan<- error, wait } }() - if err !=nil{ + if err != nil { return } p.logger.Debugf("readUptimeStat: %s\n", durafmt.Parse(ut).String()) @@ -187,7 +295,7 @@ func (p *osMetric) readCpuStat(info *OSMetricInfo, errChan chan<- error, waiter } }() - if err !=nil{ + if err != nil { return } p.logger.Debugf("readCpuStat: idle=%d\n", ct.Idle) diff --git a/producer/os_linux.go b/producer/os_linux.go index 5cb49bd..81ab794 100644 --- a/producer/os_linux.go +++ b/producer/os_linux.go @@ -13,59 +13,190 @@ import ( "time" "go.uber.org/multierr" "sync" + "runtime" ) type NetIOInfo struct { - Name string - RxBytes, - TxBytes uint64 + Name string `json:"name"` + RxBytes uint64 `json:"rx_bytes"` + TxBytes uint64 `json:"tx_bytes"` } type DiskIOInfo struct { - Name string - ReadsCompletedBytes, - WritesCompletedBytes uint64 + Name string `json:"name"` + ReadsCompletedBytes uint64 `json:"reads_completed_bytes"` + WritesCompletedBytes uint64 `json:"writes_completed_bytes"` } type MemoryInfo struct { - Total, - Used, - Buffers, - Cached, - Free, - Active, - Inactive, - SwapTotal, - SwapUsed, - SwapCached, - SwapFree uint64 + Total uint64 `json:"total_bytes"` + Used uint64 `json:"used_bytes"` + Buffers uint64 `json:"buffers_bytes"` + Cached uint64 `json:"cached_bytes"` + Free uint64 `json:"free_bytes"` + Active uint64 `json:"active_bytes"` + Inactive uint64 `json:"inactive_bytes"` + SwapTotal uint64 `json:"swap_total_bytes"` + SwapUsed uint64 `json:"swap_used_bytes"` + SwapCached uint64 `json:"swap_cached_bytes"` + SwapFree uint64 `json:"swap_free_bytes"` } type LoadInfo struct { - Avg1, - Avg5, - Avg15 float64 + Avg1 float64 `json:"avg_1"` + Avg5 float64 `json:"avg_5"` + Avg15 float64 `json:"avg_15"` } type CPUInfo struct { - User, - Nice, - System, - Idle, - Iowait, - Irq, - Softirq, - Steal float64 + User float64 `json:"user"` + Nice float64 `json:"nice"` + System float64 `json:"system"` + Idle float64 `json:"idle"` + Iowait float64 `json:"iowait"` + Irq float64 `json:"irq"` + Softirq float64 `json:"softirq"` + Steal float64 `json:"steal"` } type OSMetricInfo struct { - Memory MemoryInfo - Load LoadInfo - NetIO map[string]NetIOInfo + Memory MemoryInfo `json:"memory"` + Load LoadInfo `json:"load"` + NetIO map[string]NetIOInfo `json:"net_io"` - Uptime time.Duration - CPU CPUInfo - DiskIO map[string]DiskIOInfo - At time.Time + Uptime time.Duration `json:"uptime"` + CPU CPUInfo `json:"cpu"` + DiskIO map[string]DiskIOInfo `json:"disk_io"` + + Host string `json:"host"` + At time.Time `json:"at"` } +func (p *OSMetricInfo) MarshalUptime() (name string, tags []map[string]string, fields []map[string]interface{}) { + name, tags, fields = + "uptime", + []map[string]string{ + { + "os": runtime.GOOS, + }, + }, + []map[string]interface{}{ + { + "duration": p.Uptime, + }, + } + return +} +func (p *OSMetricInfo) MarshalLoad() (name string, tags []map[string]string, fields []map[string]interface{}) { + name, tags, fields = + "load", + []map[string]string{ + { + "os": runtime.GOOS, + }, + }, + []map[string]interface{}{ + { + "avg_1": p.Load.Avg1, + "avg_5": p.Load.Avg5, + "avg_15": p.Load.Avg15, + }, + } + return +} +func (p *OSMetricInfo) MarshalCPU() (name string, tags []map[string]string, fields []map[string]interface{}) { + name, tags, fields = + "cpu", + []map[string]string{ + { + "os": runtime.GOOS, + }, + }, + []map[string]interface{}{ + { + "user": p.CPU.User, + "nice": p.CPU.Nice, + "system": p.CPU.System, + "idle": p.CPU.Idle, + "iowait": p.CPU.Iowait, + "irq": p.CPU.Irq, + "softirq": p.CPU.Softirq, + "steal": p.CPU.Steal, + }, + } + return +} + +func (p *OSMetricInfo) MarshalNetworks() (name string, tags []map[string]string, fields []map[string]interface{}) { + name = "network" + tags = make([]map[string]string, 0) + fields = make([]map[string]interface{}, 0) + for name, info := range p.NetIO { + tags, fields = + append(tags, map[string]string{ + "os": runtime.GOOS, + "name": name, + }), + append(fields, map[string]interface{}{ + "rx_bytes": info.RxBytes, + "tx_bytes": info.TxBytes, + }) + } + return +} + +func (p *OSMetricInfo) MarshalDisk() (name string, tags []map[string]string, fields []map[string]interface{}) { + name = "disk" + tags = make([]map[string]string, 0) + fields = make([]map[string]interface{}, 0) + for name, info := range p.DiskIO { + tags, fields = + append(tags, map[string]string{ + "os": runtime.GOOS, + "name": name, + }), + append(fields, map[string]interface{}{ + "reads_completed_bytes": info.ReadsCompletedBytes, + "writes_completed_bytes": info.WritesCompletedBytes, + }) + } + return +} + +func (p *OSMetricInfo) MarshalMemory() (name string, tags []map[string]string, fields []map[string]interface{}) { + name, tags, fields = + "memory", + []map[string]string{ + { + "os": runtime.GOOS, + }, + }, + []map[string]interface{}{ + { + "total_bytes": p.Memory.Total, + "used_bytes": p.Memory.Used, + "buffers_bytes": p.Memory.Buffers, + "cached_bytes": p.Memory.Cached, + "free_bytes": p.Memory.Free, + "active_bytes": p.Memory.Active, + "inactive_bytes": p.Memory.Inactive, + "swap_total_bytes": p.Memory.SwapTotal, + "swap_used_bytes": p.Memory.SwapUsed, + "swap_cached_bytes": p.Memory.SwapCached, + "swap_free_bytes": p.Memory.SwapFree, + }, + } + return +} +func (p *OSMetricInfo) Applier() (generator []OSMetricPointGen) { + generator = append( + generator, + p.MarshalLoad, + p.MarshalCPU, + p.MarshalMemory, + p.MarshalNetworks, + p.MarshalDisk, + p.MarshalUptime, + ) + return +} func (p *osMetric) readMemoryStat(info *OSMetricInfo, errChan chan<- error, waiter *sync.WaitGroup) { memoryInfo, err := memory.Get() @@ -80,7 +211,7 @@ func (p *osMetric) readMemoryStat(info *OSMetricInfo, errChan chan<- error, wait } }() - if err !=nil{ + if err != nil { return } p.logger.Debugf("memory total: ", misc.FileSizeIEC(memoryInfo.Total)) @@ -122,7 +253,7 @@ func (p *osMetric) readLoadStat(info *OSMetricInfo, errChan chan<- error, waiter } }() - if err !=nil{ + if err != nil { return } p.logger.Debugf("load Loadavg1: %f \n", load.Loadavg1) @@ -149,7 +280,7 @@ func (p *osMetric) readNetworkStat(info *OSMetricInfo, errChan chan<- error, wai } }() - if err !=nil{ + if err != nil { return } netIoMap := make(map[string]NetIOInfo) @@ -181,7 +312,7 @@ func (p *osMetric) readUptimeStat(info *OSMetricInfo, errChan chan<- error, wait } }() - if err !=nil{ + if err != nil { return } p.logger.Debugf("readUptimeStat: %s\n", durafmt.Parse(ut).String()) @@ -204,7 +335,7 @@ func (p *osMetric) readCpuStat(info *OSMetricInfo, errChan chan<- error, waiter } }() - if err !=nil{ + if err != nil { return } p.logger.Debugf("cpu user: %d\n", ct.User) @@ -250,7 +381,7 @@ func (p *osMetric) readDiskStat(info *OSMetricInfo, errChan chan<- error, waiter errChan <- err } }() - if err !=nil{ + if err != nil { return }