diff --git a/air_monitor.service b/air_monitor.service index 0d12725..0e11331 100644 --- a/air_monitor.service +++ b/air_monitor.service @@ -10,7 +10,6 @@ TimeoutStartSec=5 User=nobody Group=nobody Restart=on-failure -KillSignal=SIGQUIT Type=notify StandardError=syslog NotifyAccess=all diff --git a/main.py b/main.py index f227add..9748e5d 100644 --- a/main.py +++ b/main.py @@ -1,20 +1,29 @@ -from miio.airpurifier import AirPurifier, OperationMode, LedBrightness -from miio.discovery import create_device -from collections import namedtuple -from datetime import datetime -import time -from influxdb import InfluxDBClient -import json from systemd import daemon +import errno +import logging +import signal import socket +import sys +import time +from collections import namedtuple +from multiprocessing import BoundedSemaphore, current_process, Process + +from influxdb import InfluxDBClient +from miio.airpurifier import AirPurifier + class Message(namedtuple('Message', ('measurement', 'time', 'tags', 'fields'))): __slots__ = () -class Atmosphere(namedtuple('Atmosphere', ('purifier_activated', 'purifier_fan_rpm', 'temperature', 'aqi', 'humidity'))): - __slots__ = () +class GracefulExit(Exception): + pass + +class Atmosphere( + namedtuple('Atmosphere', ('purifier_activated', 'purifier_fan_rpm', 'temperature', 'aqi', 'humidity'))): + __slots__ = () + def __new__(cls, status): return super(Atmosphere, cls).__new__(cls, status.power == 'on', @@ -25,26 +34,74 @@ class Atmosphere(namedtuple('Atmosphere', ('purifier_activated', 'purifier_fan_r ) -def getStat(dev): +def signal_handler(signum, frame): + raise GracefulExit() + + +def timerProvider(): while True: - time.sleep(1) - yield time.time(), Atmosphere(dev.status()) + yield time.time() + time.sleep(2.5) + + +def sendPoints(idx, at, tags, readSem, sendSem, metric, dev): + proc = current_process() + logger = logging.getLogger('iter %d proc name %s id %s' % (idx, proc.name, proc.pid)) + logger.info("start") + try: + readSem.acquire(timeout=15) + try: + fetch = dev.status() + info = Atmosphere(fetch) + message = Message('air', int(at * 1000000000), tags, info._asdict()) + finally: + readSem.release() + + sendSem.acquire(timeout=15) + + try: + metric.write_points([message._asdict()]) + finally: + sendSem.release() + except Exception as e: + logger.warning(e) + else: + logger.info('done') if __name__ == '__main__': + code = 0 try: - metric = InfluxDBClient('db', database='core', - use_udp=True, udp_port=8089) - air_addr=socket.gethostbyname("air") - #dev = create_device(air_addr,AirPurifier) - dev = AirPurifier(air_addr,'6e1e05b87d9f7cd10a3f8f43616896fa') + # Use signal handler to throw exception which can be caught to allow + # graceful exit. + logging.basicConfig(level=logging.INFO) + readSem = BoundedSemaphore(3) + sendSem = BoundedSemaphore(3) + + metric = InfluxDBClient('db', database='core', use_udp=True, udp_port=8089) + air_addr = socket.gethostbyname("air") + # dev = create_device(air_addr, AirPurifier) + dev = AirPurifier(air_addr, '6e1e05b87d9f7cd10a3f8f43616896fa') + + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) tags = { 'source': 'mi_air_2', 'position': 'center_room', } daemon.notify('READY=1') - for at, info in getStat(dev): - message = Message('air', int(at*1000000000), tags, info._asdict()) - metric.write_points([message._asdict()]) + for idx, at in enumerate(timerProvider()): + args = idx, at, tags, readSem, sendSem, metric, dev + p = Process(target=sendPoints, args=args) + p.daemon = True + p.start() + except GracefulExit as ge: + pass + except KeyboardInterrupt as ki: + pass + except Exception as e: + logging.error(e) + code = errno.EINVAL finally: daemon.notify('STOPPING=1') + sys.exit(code)