from systemd import daemon import errno import logging import signal import socket import sys import time from collections import namedtuple from multiprocessing import BoundedSemaphore, current_process, Process from influxdb import InfluxDBClient from miio.airpurifier import AirPurifier class Message(namedtuple('Message', ('measurement', 'time', 'tags', 'fields'))): __slots__ = () class GracefulExit(Exception): pass class Atmosphere( namedtuple('Atmosphere', ('purifier_activated', 'purifier_fan_rpm', 'temperature', 'aqi', 'humidity'))): __slots__ = () def __new__(cls, status): return super(Atmosphere, cls).__new__(cls, status.power == 'on', int(status.motor_speed), float(status.temperature), int(status.aqi), status.humidity / 100.0 ) def signal_handler(signum, frame): raise GracefulExit() def timerProvider(): while True: yield time.time() time.sleep(2.5) def sendPoints(idx, at, tags, readSem, sendSem, metric, dev): proc = current_process() logger = logging.getLogger('iter %d proc name %s id %s' % (idx, proc.name, proc.pid)) logger.info("start") try: readSem.acquire(timeout=15) try: fetch = dev.status() info = Atmosphere(fetch) message = Message('air', int(at * 1000000000), tags, info._asdict()) finally: readSem.release() sendSem.acquire(timeout=15) try: metric.write_points([message._asdict()]) finally: sendSem.release() except Exception as e: logger.warning(e) else: logger.info('done') if __name__ == '__main__': code = 0 try: # Use signal handler to throw exception which can be caught to allow # graceful exit. logging.basicConfig(level=logging.WARN) readSem = BoundedSemaphore(3) sendSem = BoundedSemaphore(3) metric = InfluxDBClient('db', database='core', use_udp=True, udp_port=8089) air_addr = socket.gethostbyname("air") # dev = create_device(air_addr, AirPurifier) dev = AirPurifier(air_addr, 'b370cd7d111f74949f2cc7023d9c3bea') signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) tags = { 'source': 'mi_air_2', 'position': 'center_room', } daemon.notify('READY=1') for idx, at in enumerate(timerProvider()): args = idx, at, tags, readSem, sendSem, metric, dev p = Process(target=sendPoints, args=args) p.daemon = True p.start() except GracefulExit as ge: pass except KeyboardInterrupt as ki: pass except Exception as e: logging.error(e) code = errno.EINVAL finally: daemon.notify('STOPPING=1') sys.exit(code)