1
0
Fork 0
air/main.py

108 lines
3.1 KiB
Python

from systemd import daemon
import errno
import logging
import signal
import socket
import sys
import time
from collections import namedtuple
from multiprocessing import BoundedSemaphore, current_process, Process
from influxdb import InfluxDBClient
from miio.airpurifier import AirPurifier
class Message(namedtuple('Message', ('measurement', 'time', 'tags', 'fields'))):
__slots__ = ()
class GracefulExit(Exception):
pass
class Atmosphere(
namedtuple('Atmosphere', ('purifier_activated', 'purifier_fan_rpm', 'temperature', 'aqi', 'humidity'))):
__slots__ = ()
def __new__(cls, status):
return super(Atmosphere, cls).__new__(cls,
status.power == 'on',
int(status.motor_speed),
float(status.temperature),
int(status.aqi),
status.humidity / 100.0
)
def signal_handler(signum, frame):
raise GracefulExit()
def timerProvider():
while True:
yield time.time()
time.sleep(2.5)
def sendPoints(idx, at, tags, readSem, sendSem, metric, dev):
proc = current_process()
logger = logging.getLogger('iter %d proc name %s id %s' % (idx, proc.name, proc.pid))
logger.info("start")
try:
readSem.acquire(timeout=15)
try:
fetch = dev.status()
info = Atmosphere(fetch)
message = Message('air', int(at * 1000000000), tags, info._asdict())
finally:
readSem.release()
sendSem.acquire(timeout=15)
try:
metric.write_points([message._asdict()])
finally:
sendSem.release()
except Exception as e:
logger.warning(e)
else:
logger.info('done')
if __name__ == '__main__':
code = 0
try:
# Use signal handler to throw exception which can be caught to allow
# graceful exit.
logging.basicConfig(level=logging.WARN)
readSem = BoundedSemaphore(3)
sendSem = BoundedSemaphore(3)
metric = InfluxDBClient('db', database='core', use_udp=True, udp_port=8089)
air_addr = socket.gethostbyname("air")
# dev = create_device(air_addr, AirPurifier)
dev = AirPurifier(air_addr, 'b370cd7d111f74949f2cc7023d9c3bea')
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
tags = {
'source': 'mi_air_2',
'position': 'center_room',
}
daemon.notify('READY=1')
for idx, at in enumerate(timerProvider()):
args = idx, at, tags, readSem, sendSem, metric, dev
p = Process(target=sendPoints, args=args)
p.daemon = True
p.start()
except GracefulExit as ge:
pass
except KeyboardInterrupt as ki:
pass
except Exception as e:
logging.error(e)
code = errno.EINVAL
finally:
daemon.notify('STOPPING=1')
sys.exit(code)