commit c515e857edddcc748a7c11ec181b50747f39ad11 Author: Sangbum Kim Date: Tue Aug 21 10:27:31 2018 +0900 added initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d96aa11 --- /dev/null +++ b/.gitignore @@ -0,0 +1,141 @@ +# use glob syntax +syntax: glob + + +# Packages # +############ +# it's better to unpack these files and commit the raw source +# git has its own built in compression methods +*.7z +*.dmg +*.gz +*.iso +*.jar +*.rar +*.tar +*.zip +*.apk +*.ipa +*.mgl + +# Logs and databases # +###################### +*.log +*.sql +*.sqlite + +# OS generated files # +###################### +._.DS_Store +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +Icon? +ehthumbs.db +Thumbs.db + +#Ignore files build by Visual Studio +# Compiled source # +################### +*.com +*.class +*.dll +*.o +*.so +*.obj +*.exe +*.pdb +*.user +*.aps +*.pch +*.vspscc +*_i.c +*_p.c +*.ncb +*.suo +*.tlb +*.tlh +*.bak +*.cache +*.ilk + +[Bb]uild[Ll]og.* +*.[Pp]ublish.xml +*.lib +*.sbr +*.scc +*.sdf +*.opensdf +[Bb]in +[Dd]ebug*/ +obj/ +[Rr]elease*/ +_ReSharper*/ +[Tt]est[Rr]esult* +ipch/ +doc/ +moglue/settings/__init__.py +########daniel added +.orig +.hg* +*.tmproj +*.ser +#vim +*.swn +*.swo +*.swp +*~ +tags +.rvmrc +*.rdb +#vimend + +#python +*.py[cod] +#python packages +*.egg +*.egg-info +dist +build +eggs +parts +bin +var +sdist +develop-eggs +.installed.cfg +lib +lib64 +MANIFEST + +# Installer logs +pip-log.txt + +# Unit test / coverage reports +.coverage +.tox +nosetests.xml + +# Translations +*.mo + +# Mr Developer +.mr.developer.cfg +.project +.pydevproject +.idea + +#misc +*.pid + + + +/*.iml +/out +/venv +/log +.history +.vscode +bill_man.json \ No newline at end of file diff --git a/bill_man.json.example b/bill_man.json.example new file mode 100644 index 0000000..24182de --- /dev/null +++ b/bill_man.json.example @@ -0,0 +1,9 @@ +{ + "aws_access_key_id": "AWS_ACCESS_ID", + "aws_secret_access_key": "AWS_ACCESS_KEY", + "aws_region_name": "ap-northeast-2", + "s3_bucket_name": "BUCKET_NAME", + "s3_bucket_key": "BUCKET_PATH", + "s3_manifest_name": "MANIFEST_NAME", + "slack_webhook_url": "WEBHOOK_URL" +} diff --git a/bill_man.py b/bill_man.py new file mode 100755 index 0000000..3454ce1 --- /dev/null +++ b/bill_man.py @@ -0,0 +1,523 @@ +#!/usr/bin/env python3 + +import csv +import functools +import ujson +from collections import namedtuple, defaultdict +from datetime import date, datetime, time +from decimal import Decimal +from functools import reduce +from gzip import GzipFile +from io import TextIOWrapper +from itertools import groupby +from os import path, kill + +import pytz +import requests +from dateutil.relativedelta import relativedelta + +from util import EnteringLoop +from util.log import wrap + +requests.models.json = ujson + +from babel.numbers import format_currency +from botocore.exceptions import ClientError +from botocore.session import get_session +from requests import adapters +from apscheduler.util import undefined +from apscheduler.executors.asyncio import AsyncIOExecutor +from apscheduler.jobstores.memory import MemoryJobStore +from apscheduler.schedulers.asyncio import AsyncIOScheduler + +import argparse +import logging +from util import install_except_hook +from signal import SIGQUIT, SIGINT, SIGABRT, SIGTERM + +UTCTZ = pytz.utc +# localize 안하면 timezone이 canonical하게 표현되지 않는다 . +KSTTZ = pytz.timezone('Asia/Seoul').localize(datetime.now()).tzinfo + + +class ObjEntry(namedtuple('ModifiedEntry', + ('bucket', + 'path', + 'etags', + 'modified', + )) + ): + + def __new__(cls, *args, **kwargs): + item = super(ObjEntry, cls).__new__(cls, *args, **kwargs) + return item + + def params(self, bucket, path): + if (self.bucket, self.path) != (bucket, path): + return () + if self.etags is not None: + yield ('IfNoneMatch', self.etags) + if self.modified is not None: + yield ('IfModifiedSince', self.modified) + + +class Cost(namedtuple('Cost', + ('amount', + 'currency', + 'service', + 'type', + 'start', + 'end' + )) + ): + __slots__ = () + + def __new__(cls, *args, **kwargs): + argdict = defaultdict(lambda: None) + argdict.update(**kwargs) + parse_date = lambda item: datetime.strptime(item, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=UTCTZ).astimezone(KSTTZ) + args = ( + (Decimal(argdict['lineItem/BlendedCost']) if argdict['lineItem/BlendedCost'] else None), + (argdict['lineItem/CurrencyCode'] if argdict['lineItem/CurrencyCode'] else None), + (argdict['product/ProductName'] if argdict['product/ProductName'] else None), + (argdict['lineItem/UsageType'] if argdict['lineItem/UsageType'] else None), + (parse_date(argdict['lineItem/UsageStartDate']) if argdict['lineItem/UsageStartDate'] else None), + (parse_date(argdict['lineItem/UsageEndDate']) if argdict['lineItem/UsageEndDate'] else None) + ) + item = super(Cost, cls).__new__(cls, *args) + return item + + @property + def key(self): + return CostSummery(self.currency, self.service, self.type) + + +class CostSubItem(namedtuple('CostSubItem', + ('currency', + 'service', + )) + ): + __slots__ = () + + def __new__(cls, *args): + item = super(CostSubItem, cls).__new__(cls, *args) + return item + + +class CostSummery(namedtuple('CostSummery', + ('currency', + 'service', + 'type' + )) + ): + __slots__ = () + + def __new__(cls, *args): + item = super(CostSummery, cls).__new__(cls, *args) + return item + + @property + def key(self): + return CostSubItem(self.currency, self.service) + + +def memoized(func): + cache = None + entry = None + + async def callee(*args, **kwargs): + nonlocal entry, cache, func + kwargs['modified_entry'] = entry + returned, entry = await func(*args, **kwargs) + if returned is not None: + cache = returned + return returned + elif cache is None: + raise RuntimeError('no ret value') + else: + return cache + + # @functools.wraps 대신 + functools.update_wrapper(callee, func) + return callee + + +class Session: + def __init__( + self, + loop, + aws_access_key_id, + aws_secret_access_key, + aws_region_name, + s3_bucket_name, + s3_bucket_key, + s3_manifest_name, + slack_webhook_url, + ): + self.log = logging.getLogger('bill_man.session') + self.loop = loop + self.webhook_url = slack_webhook_url + self.s3 = self.s3_client(aws_access_key_id, aws_secret_access_key, aws_region_name) + self.region_name = aws_region_name + self.bucket_name = s3_bucket_name + self.bucket_path = s3_bucket_key + self.manifest_name = s3_manifest_name + + self.handler = self.request_hander() + + def request_hander(self): + # 커넥션 관리 max_retries? + adapter = adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100) + # 이것은 알아서 cookie를 관리해준다. + handler = requests.Session() + # handler.stream = True + handler.mount("http://", adapter) + handler.mount("https://", adapter) + + handler.headers.update({ + 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3514.2 Safari/537.36' + }) + return handler + + def close(self): + if self.handler is not None: + self.handler.close() + self.handler = None + + def s3_client(self, aws_access_key_id, aws_secret_access_key, aws_region_name): + return get_session().create_client( + 's3', + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + region_name=aws_region_name, + ) + + async def send_msg(self, messages): + self.log.info("sending a message") + req_args = ( + self.webhook_url, + { + 'headers': {'Content-Type': 'application/json'}, + 'json': messages + } + ) + response = await self.loop.run_in_executor( + None, + lambda url, kwargs: self.handler.post(url, **kwargs), + *req_args + ) + + if response.status_code != 200: + raise ValueError( + 'Request to slack returned an error %s, the response is:\n%s' + % (response.status_code, response.text) + ) + self.log.info("message sent") + + async def get_resources(self, standard_time): + billing_url = 'https://console.aws.amazon.com/billing/home?region=%s#/' % (self.region_name) + + now=datetime.utcnow().replace(tzinfo=UTCTZ).astimezone(KSTTZ) + today = date.fromordinal(now.toordinal()) + + if datetime.combine(today, standard_time) >= now: + today -= relativedelta(days=1) + report_time = datetime.combine(today, standard_time) + start_month = today.replace(day=1) + + report_file_path = await self.get_current_manifest(standard_date=today) + + costs = filter( + lambda cost: cost.end < report_time, + await self.get_current_cost_csv(report_file_path) + ) + + ''' + { + "fallback": "Required plain-text summary of the attachment.", + "color": "#36a64f", + "pretext": "Optional text that appears above the attachment block", + "author_name": "Bobby Tables", + "author_link": "http://flickr.com/bobby/", + "author_icon": "http://flickr.com/icons/bobby.jpg", + "title": "Slack API Documentation", + "title_link": "https://api.slack.com/", + "text": "Optional text that appears within the attachment", + "fields": [ + { + "title": "Priority", + "value": "High", + "short": False + } + ], + "image_url": "http://my-website.com/path/to/image.jpg", + "thumb_url": "http://example.com/path/to/thumb.png", + "footer": "Slack API", + "footer_icon": "https://platform.slack-edge.com/img/default_application_icon.png", + "ts": 123456789 + }, + ''' + + def service_formetter(summeries): + total_amount = defaultdict(Decimal) + for service_type, type_costs, service_amount in sorted(summeries, key=lambda service: service[2], + reverse=True): + message = '\n'.join( + '%s: %s' % (type, format_currency(cost, service_type.currency, decimal_quantization=False)) + for type, cost in type_costs + ) + service_name = '%s(%s) : %s ' % ( + service_type.service, + service_type.currency, + format_currency(service_amount, service_type.currency, decimal_quantization=False) + ) + yield { + 'fallback': '%s\n%s' % (service_name, message), + "color": "#ffffff", + 'title': service_name, + 'text': message + } + total_amount[service_type.currency] += service_amount + # totals + for currency, total in total_amount.items(): + total_amount_text = format_currency(total, currency, decimal_quantization=False) + message = 'TOT(%s): %s' % (currency, total_amount_text) + yield { + 'fallback': message, + "color": "#ffffff", + 'title': message, + } + + slack_data = { + 'text': "Daily AWS Billing Report :macos: from %s to %s" % ( + start_month.strftime('%y-%m-%d'), today.strftime('%y-%m-%d')), + "attachments": [ + *service_formetter(self.summery_costs(costs)), + { + "fallback": "Check out more at %s" % billing_url, + "color": "#eeeeee", + "title": "Check out more detils from AWS billing site", + "title_link": billing_url, + "footer": "Linus Billing Bot :confused_parrot:", + } + ] + } + await self.send_msg(slack_data) + + @memoized + async def get_current_manifest( + self, + standard_date=date.today(), + modified_entry=None + ): + duration = '%s-%s' % (standard_date.replace(day=1).strftime('%Y%m%d'), + (standard_date.replace(day=1) + relativedelta(months=1)).strftime('%Y%m%d')) + request_path = path.join(self.bucket_path, duration, self.manifest_name) + try: + req_args = ( + self.bucket_name, + request_path, + dict(modified_entry.params(self.bucket_name, request_path)) if modified_entry is not None else {} + ) + response = await self.loop.run_in_executor( + None, + lambda name, key, kwargs: self.s3.get_object(Bucket=name, Key=key, **kwargs), + *req_args + ) + obj = ujson.load(response['Body']) + return ( + obj['reportKeys'][0], + ObjEntry(self.bucket_name, request_path, response['ETag'], response['LastModified']) + ) + + except ClientError as e: + if e.response['Error']['Code'] == '304': + return None, modified_entry + raise e + + @memoized + async def get_current_cost_csv(self, report_path, modified_entry=None): + try: + + req_args = ( + self.bucket_name, + report_path, + dict(modified_entry.params(self.bucket_name, report_path)) if modified_entry is not None else {} + ) + response = await self.loop.run_in_executor( + None, + lambda name, key, kwargs: self.s3.get_object(Bucket=name, Key=key, **kwargs), + *req_args + ) + + unzipped = TextIOWrapper(GzipFile(mode='rb', fileobj=response['Body']), encoding='utf-8') + cost_reader = csv.reader(unzipped) + keys = next(cost_reader) + return ( + [ + item for item in filter( + lambda cost: cost.amount != 0, + map( + lambda usage: Cost(**{k: v for k, v in zip(keys, usage)}), + cost_reader + ) + ) + ], + ObjEntry(self.bucket_name, report_path, response['ETag'], response['LastModified']) + ) + + except ClientError as e: + if e.response['Error']['Code'] == '304': + return None, modified_entry + raise e + + def summery_costs(self, costs): + def cost_summation(amounts, cost): + amounts[cost.key] += cost.amount + return amounts + + amount_sumation = reduce(cost_summation, costs, defaultdict(Decimal)) + + def subcost_exteact(type_cost_item): + for vk, amt in type_cost_item: + yield vk.type, amt + + for k, v in groupby(sorted(amount_sumation.items(), key=lambda item: item[0]), key=lambda item: item[0].key): + sv = sorted(subcost_exteact(v), key=lambda item: item[1], reverse=True) + yield ( + k, + sv, + sum(map(lambda v: v[1], sv)) + ) + + +def handle_daemon(name, action, args): + log = logging.getLogger("main_loop") + if action == 'start': + # load config + with EnteringLoop( + name, + log_dir=args.log_location, + log_level=args.log_level, + is_forground=args.foreground, + pid_path=args.pid_path + ) as loop: + jobstores = { + 'default': MemoryJobStore() + } + executors = { + 'default': AsyncIOExecutor(), + } + job_defaults = { + 'coalesce': True, + 'max_instances': 2 + } + scheduler = AsyncIOScheduler( + jobstores=jobstores, + executors=executors, + job_defaults=job_defaults, + timezone=KSTTZ, + event_loop=loop) + + with open(args.config) as cfgFile: + config = ujson.load(cfgFile) + sess = Session(loop=loop, **config) + del config + job = scheduler.add_job(sess.get_resources, + name="charger", + trigger='cron', + day_of_week='mon-fri', + hour=args.standard_time.hour, + minute=args.standard_time.minute, + second=args.standard_time.second, + args=(args.standard_time,), + next_run_time=datetime.utcnow().replace(tzinfo=UTCTZ).astimezone(KSTTZ) if args.immediately else undefined + ) + scheduler.start() + + if not args.immediately: + log.info("job(%s) may be start in %s", job.name, job.next_run_time) + + def stopme(*_): + scheduler.shutdown() + sess.close() + log.info("handler closed") + loop.stop() + log.info("loop closed") + + loop.add_signal_handler(SIGQUIT, stopme) + loop.add_signal_handler(SIGTERM, stopme) + loop.add_signal_handler(SIGINT, stopme) + loop.add_signal_handler(SIGABRT, stopme) + loop.run_forever() + log.info("bye!") + elif action == 'stop': + wrap(name, level=args.log_level, stderr=True) + if not path.exists(args.pid_path): + log.warning("cannot find pidfile(%s)", args.pid_path) + return + with open(args.pid_path, 'r') as pidFile: + pid = int(pidFile.readline()) + kill(pid, SIGTERM) + log.warning("pid(%d) sigterm!", pid) + else: + raise NotImplementedError() + + +if __name__ == '__main__': + install_except_hook() + + # 실행 플래그 파싱 + parser = argparse.ArgumentParser( + prog='bill_man', + epilog='contact @spi-ca.', + description='report aws billing .', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + + parser.add_argument('--pid_path', + help='specify pidPath', + default='bill_man.pid') + parser.add_argument('--log_level', + help='set logging level', + type=lambda level: logging._nameToLevel[level.upper()], + choices=logging._nameToLevel.keys(), + default='INFO') + + parser.set_defaults(func=lambda *_: parser.print_help()) + + sp = parser.add_subparsers() + + sp_start = sp.add_parser('start', help='Starts %(prog)s daemon', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + sp_start.add_argument('--config', + help='specify config file path', + default='bill_man.json') + sp_start.add_argument('--foreground', + help='Don\'t daemonize!', + default=False, + action='store_true') + sp_start.add_argument('--immediately', + help='run batch now!', + default=False, + action='store_true') + sp_start.add_argument('--log_location', + help='specify location of logs!', + default='log') + sp_start.add_argument('--standard_time', + help='set standard time/HHMMSS', + type=lambda ti: time( + hour=int(ti[0:2]), + minute=int(ti[2:4]), + second=int(ti[4:6]), + tzinfo=KSTTZ + ), + default='120000') + + sp_start.set_defaults(func=functools.partial(handle_daemon, parser.prog, 'start')) + sp_stop = sp.add_parser('stop', help='Stop %(prog)s daemon', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + sp_stop.set_defaults(func=functools.partial(handle_daemon, parser.prog, 'stop')) + + args = parser.parse_args() + args.func(args) + diff --git a/prepare b/prepare new file mode 100644 index 0000000..1738b66 --- /dev/null +++ b/prepare @@ -0,0 +1,5 @@ +sudo apt install -y python3 python3-venv python3-dev +python3 -m venv --clear venv +. venv/bin/activate +pip install --upgrade pip +pip install -r requirements.txt diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..02f8488 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +requests +tzlocal +apscheduler +uvloop +botocore +babel +ujson +cchardet diff --git a/util/__init__.py b/util/__init__.py new file mode 100644 index 0000000..191899f --- /dev/null +++ b/util/__init__.py @@ -0,0 +1,13 @@ +from .file import to_module_path, find_executable_of_specify, makedirs +from .hook import install_except_hook +from .log import wrap as wrap_log +from .asyn import EnteringLoop + +__all__ = ( + 'install_except_hook', + 'find_executable_of_specify', + 'wrap_log', + 'EnteringLoop', + 'to_module_path', + 'makedirs' +) diff --git a/util/asyn.py b/util/asyn.py new file mode 100644 index 0000000..962ba19 --- /dev/null +++ b/util/asyn.py @@ -0,0 +1,48 @@ +import asyncio + +import uvloop + +asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) + +import multiprocessing +import os +from concurrent.futures.thread import ThreadPoolExecutor +from contextlib import ContextDecorator + +from util.daemon import PidFile, DaemonContext +from .log import wrap + + +class EnteringLoop(ContextDecorator): + def __init__(self, name, log_dir, log_level, is_forground=True, pid_path=None): + self.name = name + self.log_dir = log_dir + self.log_level = log_level + self.is_forground = is_forground + self.pid_path = pid_path + self.wrapper = None + + def __enter__(self): + if self.is_forground: + wrap(self.name, level=self.log_level, stderr=True) + self.wrapper = PidFile(self.pid_path) + else: + keep_fds = wrap(self.name, log_dir=self.log_dir, level=self.log_level) + self.wrapper = DaemonContext( + working_directory=os.getcwd(), + files_preserve=keep_fds, + pidfile=PidFile(self.pid_path) + ) + self.wrapper.__enter__() + loop = asyncio.new_event_loop() + loop.set_default_executor(ThreadPoolExecutor(max_workers=multiprocessing.cpu_count() * 2 + 1)) + + asyncio.set_event_loop(loop) + self.loop = loop + return loop + + def __exit__(self, exc_type, exc, exc_tb): + if self.loop is not None: + self.loop.close() + if self.wrapper is not None: + self.wrapper.__exit__(exc_type, exc, exc_tb) diff --git a/util/daemon/__init__.py b/util/daemon/__init__.py new file mode 100644 index 0000000..b32d4e7 --- /dev/null +++ b/util/daemon/__init__.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +""" +pep3143daemon is a implementation of the PEP 3143, describing a well behaving +Unix daemon, as documented in Stevens 'Unix Network Programming' + +Copyright (c) 2014, Stephan Schultchen. + +License: MIT (see LICENSE for details) +""" + + +from .daemon import DaemonContext, DaemonError +from .pidfile import PidFile + +__all__ = ( + "DaemonContext", + "DaemonError", + "PidFile", +) \ No newline at end of file diff --git a/util/daemon/daemon.py b/util/daemon/daemon.py new file mode 100644 index 0000000..0f3a871 --- /dev/null +++ b/util/daemon/daemon.py @@ -0,0 +1,427 @@ +# -*- coding: utf-8 -*- +"""Implementation of PEP 3143 DaemonContext""" +__author__ = 'schlitzer' + +import errno +import os +import resource +import signal +import socket +import sys + +# PY2 / PY3 gap +PY3 = sys.version_info[0] == 3 +if PY3: + string_types = str, +else: + string_types = basestring, + + +class DaemonError(Exception): + """ Exception raised by DaemonContext""" + pass + + +class DaemonContext(object): + """ Implementation of PEP 3143 DaemonContext class + + This class should be instantiated only once in every program that + has to become a Unix Daemon. Typically you should call its open method + after you have done everything that may require root privileges. + For example opening port <= 1024. + + Each option can be passed as a keyword argument to the constructor, but + can also be changed by assigning a new value to the corresponding attribute + on the instance. + + Altering attributes after open() is called, will have no effect. + In future versions, trying to do so, will may raise a DaemonError. + + :param chroot_directory: + Full path to the directory that should be set as effective root + directory. If None, the root directory is not changed. + :type chroot_directory: str + + :param working_directory: + Full Path to the working directory to which to change to. + If chroot_directory is not None, and working_directory is not + starting with chroot_directory, working directory is prefixed + with chroot_directory. + :type working_directory: str. + + :param umask: + File access creation mask for this daemon after start + :type umask: int. + + :param uid: + Effective user id after daemon start. + :type uid: int. + + :param gid: + Effective group id after daemon start. + :type gid: int. + + :param prevent_core: + Prevent core file generation. + :type prevent_core: bool. + + :param detach_process: + If True, do the double fork magic. If the process was started + by inet or an init like program, you may don´t need to detach. + If not set, we try to figure out if forking is needed. + :type detach_process: bool. + + :param files_preserve: + List of integers, or objects with a fileno method, that + represent files that should not be closed while daemoninzing. + :type files_preserve: list + + :param pidfile: + Instance that implements a pidfile, while daemonizing its + acquire method will be called. + :type pidfile: Instance of Class that implements a pidfile behaviour + + :param stdin: + Redirect stdin to this file, if None, redirect to /dev/null. + :type stdin: file object. + + :param stdout: + Redirect stdout to this file, if None, redirect to /dev/null. + :type stdout: file object. + + :param stderr: + Redirect stderr to this file, if None, redirect to /dev/null. + :type stderr: file object. + + :param signal_map: + Mapping from operating system signal to callback actions. + :type signal_map: instance of dict + """ + + def __init__( + self, chroot_directory=None, working_directory='/', + umask=0, uid=None, gid=None, prevent_core=True, + detach_process=None, files_preserve=None, pidfile=None, + stdin=None, stdout=None, stderr=None, signal_map=None): + """ Initialize a new Instance + + """ + self._is_open = False + self._working_directory = None + self.chroot_directory = chroot_directory + self.umask = umask + self.uid = uid if uid else os.getuid() + self.gid = gid if gid else os.getgid() + if detach_process is None: + self.detach_process = detach_required() + else: + self.detach_process = detach_process + self.signal_map = signal_map if signal_map else default_signal_map() + self.files_preserve = files_preserve + self.pidfile = pidfile + self.prevent_core = prevent_core + self.stdin = stdin + self.stdout = stdout + self.stderr = stderr + self.working_directory = working_directory + + def __enter__(self): + """ Context Handler, wrapping self.open() + + :return: self + """ + self.open() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """ Context Handler, wrapping self.close() + + :return: None + """ + self.close() + + def _get_signal_handler(self, handler): + """ get the callback function for handler + + If the handler is None, returns signal.SIG_IGN. + If the handler is a string, return the matching attribute of this + instance if possible. + Else return the handler itself. + + :param handler: + :type handler: str, None, function + :return: function + """ + if not handler: + result = signal.SIG_IGN + elif isinstance(handler, string_types): + result = getattr(self, handler) + else: + result = handler + return result + + @property + def _files_preserve(self): + """ create a set of protected files + + create a set of files, based on self.files_preserve and + self.stdin, self,stdout and self.stderr, that should not get + closed while daemonizing. + + :return: set + """ + result = set() + files = [] if not self.files_preserve else self.files_preserve + files.extend([self.stdin, self.stdout, self.stderr]) + for item in files: + if hasattr(item, 'fileno'): + result.add(item.fileno()) + if isinstance(item, int): + result.add(item) + return result + + @property + def _signal_handler_map(self): + """ Create the signal handler map + + create a dictionary with signal:handler mapping based on + self.signal_map + + :return: dict + """ + result = {} + for signum, handler in self.signal_map.items(): + result[signum] = self._get_signal_handler(handler) + return result + + @property + def working_directory(self): + """ The working_directory property + + :return: str + """ + if self.chroot_directory and not \ + self._working_directory.startswith(self.chroot_directory): + return self.chroot_directory + self._working_directory + else: + return self._working_directory + + @working_directory.setter + def working_directory(self, value): + """ Set working directory + + New value is ignored if already daemonized. + + :param value: str + :return: + """ + self._working_directory = value + + @property + def is_open(self): + """ True when this instances open method was called + + :return: bool + """ + return self._is_open + + def close(self): + """ Dummy function""" + pass + + def open(self): + """ Daemonize this process + + Do everything that is needed to become a Unix daemon. + + :return: None + :raise: DaemonError + """ + if self.is_open: + return + try: + os.chdir(self.working_directory) + if self.chroot_directory: + os.chroot(self.chroot_directory) + os.setgid(self.gid) + os.setuid(self.uid) + os.umask(self.umask) + except OSError as err: + raise DaemonError('Setting up Environment failed: {0}' + .format(err)) + + if self.prevent_core: + try: + resource.setrlimit(resource.RLIMIT_CORE, (0, 0)) + except Exception as err: + raise DaemonError('Could not disable core files: {0}' + .format(err)) + + if self.detach_process: + try: + if os.fork() > 0: + os._exit(0) + except OSError as err: + raise DaemonError('First fork failed: {0}'.format(err)) + os.setsid() + try: + if os.fork() > 0: + os._exit(0) + except OSError as err: + raise DaemonError('Second fork failed: {0}'.format(err)) + + for (signal_number, handler) in self._signal_handler_map.items(): + signal.signal(signal_number, handler) + + close_filenos(self._files_preserve) + + redirect_stream(sys.stdin, self.stdin) + redirect_stream(sys.stdout, self.stdout) + redirect_stream(sys.stderr, self.stderr) + + if self.pidfile: + self.pidfile.acquire() + + self._is_open = True + + def terminate(self, signal_number, stack_frame): + """ Terminate this process + + Simply terminate this process by raising SystemExit. + This method is called if signal.SIGTERM was received. + + Check carefully if this really is what you want! + + Most likely it is not! + + You should implement a function/method that is able to cleanly + shutdown you daemon. Like gracefully terminating child processes, + threads. or closing files. + + You can create a custom handler by overriding this method, ot + setting a custom handler via the signal_map. It is also possible + to set the signal handlers directly via signal.signal(). + + :return: None + :raise: SystemExit + """ + raise SystemExit('Terminating on signal {0}'.format(signal_number)) + + +def close_filenos(preserve): + """ Close unprotected file descriptors + + Close all open file descriptors that are not in preserve. + + If ulimit -nofile is "unlimited", all is defined filenos <= 4096, + else all is <= the output of resource.getrlimit(). + + :param preserve: set with protected files + :type preserve: set + + :return: None + """ + maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1] + if maxfd == resource.RLIM_INFINITY: + maxfd = 4096 + for fileno in range(maxfd): + if fileno not in preserve: + try: + os.close(fileno) + except OSError as err: + if not err.errno == errno.EBADF: + raise DaemonError( + 'Failed to close file descriptor {0}: {1}' + .format(fileno, err)) + + +def default_signal_map(): + """ Create the default signal map for this system. + + :return: dict + """ + name_map = { + 'SIGTSTP': None, + 'SIGTTIN': None, + 'SIGTTOU': None, + 'SIGTERM': 'terminate'} + signal_map = {} + for name, target in name_map.items(): + if hasattr(signal, name): + signal_map[getattr(signal, name)] = target + return signal_map + + +def parent_is_init(): + """ Check if parent is Init + + Check if the parent process is init, or something else that + owns PID 1. + + :return: bool + """ + if os.getppid() == 1: + return True + return False + + +def parent_is_inet(): + """ Check if parent is inet + + Check if our parent seems ot be a superserver, aka inetd/xinetd. + + This is done by checking if sys.__stdin__ is a network socket. + + :return: bool + """ + result = False + sock = socket.fromfd( + sys.__stdin__.fileno(), + socket.AF_INET, + socket.SOCK_RAW) + try: + sock.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE) + result = True + except (OSError, socket.error) as err: + if not err.args[0] == errno.ENOTSOCK: + result = True + return result + + +def detach_required(): + """ Check if detaching is required + + This is done by collecting the results of parent_is_inet and + parent_is_init. If one of them is True, detaching, aka the daemoninzing, + aka the double fork magic, is not required, and can be skipped. + + :return: bool + """ + if parent_is_inet() or parent_is_init(): + return False + return True + + +def redirect_stream(system, target): + """ Redirect Unix streams + + If None, redirect Stream to /dev/null, else redirect to target. + + :param system: ether sys.stdin, sys.stdout, or sys.stderr + :type system: file object + + :param target: File like object, or None + :type target: None, File Object + + :return: None + :raise: DaemonError + """ + if target is None: + target_fd = os.open(os.devnull, os.O_RDWR) + else: + target_fd = target.fileno() + try: + os.dup2(target_fd, system.fileno()) + except OSError as err: + raise DaemonError('Could not redirect {0} to {1}: {2}' + .format(system, target, err)) diff --git a/util/daemon/pidfile.py b/util/daemon/pidfile.py new file mode 100644 index 0000000..a122031 --- /dev/null +++ b/util/daemon/pidfile.py @@ -0,0 +1,82 @@ +# -*- coding: utf-8 -*- +""" +Simple PidFile Module for a pep3143 daemon implementation. + +""" +__author__ = 'schlitzer' + + +import atexit +import fcntl +import os + + +class PidFile(object): + """ + PidFile implementation for PEP 3143 Daemon. + + This Class can also be used with pythons 'with' + statement. + + :param pidfile: + filename to be used as pidfile, including path + :type pidfile: str + """ + + def __init__(self, pidfile): + """ + Create a new instance + """ + self._pidfile = pidfile + self.pidfile = None + + def __enter__(self): + self.acquire() + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + if exc_type is not None: + self.release() + return False + self.release() + return True + + def acquire(self): + """Acquire the pidfile. + + Create the pidfile, lock it, write the pid into it + and register the release with atexit. + + + :return: None + :raise: SystemExit + """ + try: + pidfile = open(self._pidfile, "a") + except IOError as err: + raise SystemExit(err) + try: + fcntl.flock(pidfile.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + except IOError: + raise SystemExit('Already running according to ' + self._pidfile) + pidfile.seek(0) + pidfile.truncate() + pidfile.write(str(os.getpid()) + '\n') + pidfile.flush() + self.pidfile = pidfile + atexit.register(self.release) + + def release(self): + """Release the pidfile. + + Close and delete the Pidfile. + + + :return: None + """ + try: + self.pidfile.close() + os.remove(self._pidfile) + except OSError as err: + if err.errno != 2: + raise \ No newline at end of file diff --git a/util/file.py b/util/file.py new file mode 100644 index 0000000..347d098 --- /dev/null +++ b/util/file.py @@ -0,0 +1,56 @@ +import errno + +import os + + +def to_module_path(*rest_path): + """ + 현재 모듈위치에서 파일 경로 계산 + :param rest_path: + :return: + """ + return os.path.abspath(os.path.normpath(os.path.join(*rest_path))) + + +def makedirs(path): + """ + python2에서도 makedirs 수행시 이미경로가 있을 경우 그냥 지나가는 함수 + :param path: + :return: + """ + try: + os.makedirs(path) + except OSError as e: + if e.errno != errno.EEXIST: + raise e + +def find_executable_of_specify(find_exe_name, location=None): + """ + 지정된 실행파일이 정상인지 보거나 지정되지 않았을 경우 PATH environment에서 실행파일 찾기 + :param find_exe_name: + :param location: + :return: + """ + + # 람다 : 파일인지 ? 실행 파일인지? + def is_executable(fpath): + return os.path.isfile( + fpath) and os.access(fpath, os.X_OK) + + if location is None: + # 경로가 지정되지 않았으면 PATH environ에서 뒤져본다. + for pathItem in os.environ["PATH"].split(os.pathsep): + pathItem = pathItem.strip('"') + exe_path = os.path.join(pathItem, find_exe_name) + if is_executable(exe_path): + location = exe_path + break + # 없으면 끝 + if location is None: + raise IOError("error: cannot find %s from PATH!" % find_exe_name) + elif not is_executable(to_module_path(location)): + raise IOError("error: %s is not executable!" % find_exe_name) + else: + location = to_module_path(location) + return location + diff --git a/util/hook.py b/util/hook.py new file mode 100644 index 0000000..50bd39d --- /dev/null +++ b/util/hook.py @@ -0,0 +1,41 @@ +import logging +import threading + +import sys + + +def install_except_hook(): + """ + Workaround for sys.excepthook thread bug + From +http://spyced.blogspot.com/2007/06/workaround-for-sysexcepthook-bug.html + +(https://sourceforge.net/tracker/?func=detail&atid=105470&aid=1230540&group_id=5470). + Call once from __main__ before creating any threads. + If using psyco, call psyco.cannotcompile(threading.Thread.run) + since this replaces a new-style class method. + """ + + def traceback_log(excType, excValue, traceback, logger=logging.root): + logger.critical("Logging an uncaught exception", + exc_info=(excType, excValue, traceback)) + + sys.excepthook = traceback_log + + init_old = threading.Thread.__init__ + + def init(self, *args, **kwargs): + init_old(self, *args, **kwargs) + run_old = self.run + + def run_with_except_hook(*args, **kw): + try: + run_old(*args, **kw) + except (KeyboardInterrupt, SystemExit): + raise + except: + traceback_log(*sys.exc_info()) + + self.run = run_with_except_hook + + threading.Thread.__init__ = init diff --git a/util/log.py b/util/log.py new file mode 100644 index 0000000..85e1545 --- /dev/null +++ b/util/log.py @@ -0,0 +1,33 @@ +import logging +from logging import handlers as loggingHandlers +from os import path + +# 로거 설정 +from .file import to_module_path,makedirs + + +def wrap(name, level=logging.INFO, log_dir=None, stderr=False): + keep_fds = [] + if log_dir is not None: + makedirs(to_module_path(log_dir)) + formatter = logging.Formatter( + '[%(asctime)s] [%(process)d] [%(levelname)s] (%(name)s) > %(message)s', + datefmt="%Y%m%d%H%M%S %z" + ) + + if stderr: + log_handler = logging.StreamHandler() + else: + log_handler = loggingHandlers.TimedRotatingFileHandler( + path.join(log_dir, '%s.log' % name), + when='d', + backupCount=3 + ) + keep_fds.append(log_handler.stream.fileno()) + log_handler.setFormatter(formatter) + + root_logger = logging.root + root_logger.setLevel(level) + root_logger.addHandler(log_handler) + + return keep_fds