50 lines
1.6 KiB
Python
50 lines
1.6 KiB
Python
|
from mirobo.airpurifier import AirPurifier, OperationMode, LedBrightness
|
||
|
from collections import namedtuple
|
||
|
from datetime import datetime
|
||
|
import time
|
||
|
from influxdb import InfluxDBClient
|
||
|
import json
|
||
|
from systemd import daemon
|
||
|
|
||
|
|
||
|
class Message(namedtuple('Message', ('measurement', 'time', 'tags', 'fields'))):
|
||
|
__slots__ = ()
|
||
|
|
||
|
|
||
|
class Atmosphere(namedtuple('Atmosphere', ('purifier_activated', 'purifier_fan_rpm', 'temperature', 'aqi', 'humidity'))):
|
||
|
__slots__ = ()
|
||
|
|
||
|
def __new__(cls, status):
|
||
|
return super(Atmosphere, cls).__new__(cls,
|
||
|
status.power == 'on',
|
||
|
int(status.motor_speed),
|
||
|
float(status.temperature),
|
||
|
int(status.aqi),
|
||
|
status.humidity / 100.0
|
||
|
)
|
||
|
|
||
|
|
||
|
def getStat(dev):
|
||
|
while True:
|
||
|
time.sleep(1)
|
||
|
yield time.time(), Atmosphere(dev.status())
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
try:
|
||
|
metric = InfluxDBClient('db', database='core',
|
||
|
use_udp=True, udp_port=8089)
|
||
|
|
||
|
dev = AirPurifier('10.0.1.13', '7e538c9fb31f6f6bfcc3354f46c421ca')
|
||
|
tags = {
|
||
|
'source': 'mi_air_2',
|
||
|
'position': 'center_room',
|
||
|
}
|
||
|
daemon.notify('READY=1')
|
||
|
for at, info in getStat(dev):
|
||
|
message = Message('air', int(at*1000000000), tags, info._asdict())
|
||
|
print(json.dumps([message._asdict()]))
|
||
|
metric.write_points([message._asdict()])
|
||
|
finally:
|
||
|
daemon.notify('STOPPING=1')
|