1
0
Fork 0

연결오류라도 파이썬 죽지 않게 함

This commit is contained in:
Sangbum Kim 2018-03-04 21:20:31 +09:00
parent a105112f5f
commit 8af84541ef
2 changed files with 77 additions and 21 deletions

View File

@ -10,7 +10,6 @@ TimeoutStartSec=5
User=nobody User=nobody
Group=nobody Group=nobody
Restart=on-failure Restart=on-failure
KillSignal=SIGQUIT
Type=notify Type=notify
StandardError=syslog StandardError=syslog
NotifyAccess=all NotifyAccess=all

97
main.py
View File

@ -1,20 +1,29 @@
from miio.airpurifier import AirPurifier, OperationMode, LedBrightness
from miio.discovery import create_device
from collections import namedtuple
from datetime import datetime
import time
from influxdb import InfluxDBClient
import json
from systemd import daemon from systemd import daemon
import errno
import logging
import signal
import socket import socket
import sys
import time
from collections import namedtuple
from multiprocessing import BoundedSemaphore, current_process, Process
from influxdb import InfluxDBClient
from miio.airpurifier import AirPurifier
class Message(namedtuple('Message', ('measurement', 'time', 'tags', 'fields'))): class Message(namedtuple('Message', ('measurement', 'time', 'tags', 'fields'))):
__slots__ = () __slots__ = ()
class Atmosphere(namedtuple('Atmosphere', ('purifier_activated', 'purifier_fan_rpm', 'temperature', 'aqi', 'humidity'))): class GracefulExit(Exception):
__slots__ = () pass
class Atmosphere(
namedtuple('Atmosphere', ('purifier_activated', 'purifier_fan_rpm', 'temperature', 'aqi', 'humidity'))):
__slots__ = ()
def __new__(cls, status): def __new__(cls, status):
return super(Atmosphere, cls).__new__(cls, return super(Atmosphere, cls).__new__(cls,
status.power == 'on', status.power == 'on',
@ -25,26 +34,74 @@ class Atmosphere(namedtuple('Atmosphere', ('purifier_activated', 'purifier_fan_r
) )
def getStat(dev): def signal_handler(signum, frame):
raise GracefulExit()
def timerProvider():
while True: while True:
time.sleep(1) yield time.time()
yield time.time(), Atmosphere(dev.status()) time.sleep(2.5)
def sendPoints(idx, at, tags, readSem, sendSem, metric, dev):
proc = current_process()
logger = logging.getLogger('iter %d proc name %s id %s' % (idx, proc.name, proc.pid))
logger.info("start")
try:
readSem.acquire(timeout=15)
try:
fetch = dev.status()
info = Atmosphere(fetch)
message = Message('air', int(at * 1000000000), tags, info._asdict())
finally:
readSem.release()
sendSem.acquire(timeout=15)
try:
metric.write_points([message._asdict()])
finally:
sendSem.release()
except Exception as e:
logger.warning(e)
else:
logger.info('done')
if __name__ == '__main__': if __name__ == '__main__':
code = 0
try: try:
metric = InfluxDBClient('db', database='core', # Use signal handler to throw exception which can be caught to allow
use_udp=True, udp_port=8089) # graceful exit.
air_addr=socket.gethostbyname("air") logging.basicConfig(level=logging.INFO)
#dev = create_device(air_addr,AirPurifier) readSem = BoundedSemaphore(3)
dev = AirPurifier(air_addr,'6e1e05b87d9f7cd10a3f8f43616896fa') sendSem = BoundedSemaphore(3)
metric = InfluxDBClient('db', database='core', use_udp=True, udp_port=8089)
air_addr = socket.gethostbyname("air")
# dev = create_device(air_addr, AirPurifier)
dev = AirPurifier(air_addr, '6e1e05b87d9f7cd10a3f8f43616896fa')
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
tags = { tags = {
'source': 'mi_air_2', 'source': 'mi_air_2',
'position': 'center_room', 'position': 'center_room',
} }
daemon.notify('READY=1') daemon.notify('READY=1')
for at, info in getStat(dev): for idx, at in enumerate(timerProvider()):
message = Message('air', int(at*1000000000), tags, info._asdict()) args = idx, at, tags, readSem, sendSem, metric, dev
metric.write_points([message._asdict()]) p = Process(target=sendPoints, args=args)
p.daemon = True
p.start()
except GracefulExit as ge:
pass
except KeyboardInterrupt as ki:
pass
except Exception as e:
logging.error(e)
code = errno.EINVAL
finally: finally:
daemon.notify('STOPPING=1') daemon.notify('STOPPING=1')
sys.exit(code)