diff --git a/bill_man.py b/bill_man.py index 3454ce1..01da548 100755 --- a/bill_man.py +++ b/bill_man.py @@ -1,400 +1,19 @@ #!/usr/bin/env python3 -import csv -import functools -import ujson -from collections import namedtuple, defaultdict -from datetime import date, datetime, time -from decimal import Decimal -from functools import reduce -from gzip import GzipFile -from io import TextIOWrapper -from itertools import groupby -from os import path, kill - -import pytz -import requests -from dateutil.relativedelta import relativedelta - -from util import EnteringLoop -from util.log import wrap - -requests.models.json = ujson - -from babel.numbers import format_currency -from botocore.exceptions import ClientError -from botocore.session import get_session -from requests import adapters -from apscheduler.util import undefined -from apscheduler.executors.asyncio import AsyncIOExecutor -from apscheduler.jobstores.memory import MemoryJobStore -from apscheduler.schedulers.asyncio import AsyncIOScheduler - -import argparse import logging -from util import install_except_hook -from signal import SIGQUIT, SIGINT, SIGABRT, SIGTERM +from datetime import time, datetime -UTCTZ = pytz.utc -# localize 안하면 timezone이 canonical하게 표현되지 않는다 . -KSTTZ = pytz.timezone('Asia/Seoul').localize(datetime.now()).tzinfo - - -class ObjEntry(namedtuple('ModifiedEntry', - ('bucket', - 'path', - 'etags', - 'modified', - )) - ): - - def __new__(cls, *args, **kwargs): - item = super(ObjEntry, cls).__new__(cls, *args, **kwargs) - return item - - def params(self, bucket, path): - if (self.bucket, self.path) != (bucket, path): - return () - if self.etags is not None: - yield ('IfNoneMatch', self.etags) - if self.modified is not None: - yield ('IfModifiedSince', self.modified) - - -class Cost(namedtuple('Cost', - ('amount', - 'currency', - 'service', - 'type', - 'start', - 'end' - )) - ): - __slots__ = () - - def __new__(cls, *args, **kwargs): - argdict = defaultdict(lambda: None) - argdict.update(**kwargs) - parse_date = lambda item: datetime.strptime(item, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=UTCTZ).astimezone(KSTTZ) - args = ( - (Decimal(argdict['lineItem/BlendedCost']) if argdict['lineItem/BlendedCost'] else None), - (argdict['lineItem/CurrencyCode'] if argdict['lineItem/CurrencyCode'] else None), - (argdict['product/ProductName'] if argdict['product/ProductName'] else None), - (argdict['lineItem/UsageType'] if argdict['lineItem/UsageType'] else None), - (parse_date(argdict['lineItem/UsageStartDate']) if argdict['lineItem/UsageStartDate'] else None), - (parse_date(argdict['lineItem/UsageEndDate']) if argdict['lineItem/UsageEndDate'] else None) - ) - item = super(Cost, cls).__new__(cls, *args) - return item - - @property - def key(self): - return CostSummery(self.currency, self.service, self.type) - - -class CostSubItem(namedtuple('CostSubItem', - ('currency', - 'service', - )) - ): - __slots__ = () - - def __new__(cls, *args): - item = super(CostSubItem, cls).__new__(cls, *args) - return item - - -class CostSummery(namedtuple('CostSummery', - ('currency', - 'service', - 'type' - )) - ): - __slots__ = () - - def __new__(cls, *args): - item = super(CostSummery, cls).__new__(cls, *args) - return item - - @property - def key(self): - return CostSubItem(self.currency, self.service) - - -def memoized(func): - cache = None - entry = None - - async def callee(*args, **kwargs): - nonlocal entry, cache, func - kwargs['modified_entry'] = entry - returned, entry = await func(*args, **kwargs) - if returned is not None: - cache = returned - return returned - elif cache is None: - raise RuntimeError('no ret value') - else: - return cache - - # @functools.wraps 대신 - functools.update_wrapper(callee, func) - return callee - - -class Session: - def __init__( - self, - loop, - aws_access_key_id, - aws_secret_access_key, - aws_region_name, - s3_bucket_name, - s3_bucket_key, - s3_manifest_name, - slack_webhook_url, - ): - self.log = logging.getLogger('bill_man.session') - self.loop = loop - self.webhook_url = slack_webhook_url - self.s3 = self.s3_client(aws_access_key_id, aws_secret_access_key, aws_region_name) - self.region_name = aws_region_name - self.bucket_name = s3_bucket_name - self.bucket_path = s3_bucket_key - self.manifest_name = s3_manifest_name - - self.handler = self.request_hander() - - def request_hander(self): - # 커넥션 관리 max_retries? - adapter = adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100) - # 이것은 알아서 cookie를 관리해준다. - handler = requests.Session() - # handler.stream = True - handler.mount("http://", adapter) - handler.mount("https://", adapter) - - handler.headers.update({ - 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3514.2 Safari/537.36' - }) - return handler - - def close(self): - if self.handler is not None: - self.handler.close() - self.handler = None - - def s3_client(self, aws_access_key_id, aws_secret_access_key, aws_region_name): - return get_session().create_client( - 's3', - aws_access_key_id=aws_access_key_id, - aws_secret_access_key=aws_secret_access_key, - region_name=aws_region_name, - ) - - async def send_msg(self, messages): - self.log.info("sending a message") - req_args = ( - self.webhook_url, - { - 'headers': {'Content-Type': 'application/json'}, - 'json': messages - } - ) - response = await self.loop.run_in_executor( - None, - lambda url, kwargs: self.handler.post(url, **kwargs), - *req_args - ) - - if response.status_code != 200: - raise ValueError( - 'Request to slack returned an error %s, the response is:\n%s' - % (response.status_code, response.text) - ) - self.log.info("message sent") - - async def get_resources(self, standard_time): - billing_url = 'https://console.aws.amazon.com/billing/home?region=%s#/' % (self.region_name) - - now=datetime.utcnow().replace(tzinfo=UTCTZ).astimezone(KSTTZ) - today = date.fromordinal(now.toordinal()) - - if datetime.combine(today, standard_time) >= now: - today -= relativedelta(days=1) - report_time = datetime.combine(today, standard_time) - start_month = today.replace(day=1) - - report_file_path = await self.get_current_manifest(standard_date=today) - - costs = filter( - lambda cost: cost.end < report_time, - await self.get_current_cost_csv(report_file_path) - ) - - ''' - { - "fallback": "Required plain-text summary of the attachment.", - "color": "#36a64f", - "pretext": "Optional text that appears above the attachment block", - "author_name": "Bobby Tables", - "author_link": "http://flickr.com/bobby/", - "author_icon": "http://flickr.com/icons/bobby.jpg", - "title": "Slack API Documentation", - "title_link": "https://api.slack.com/", - "text": "Optional text that appears within the attachment", - "fields": [ - { - "title": "Priority", - "value": "High", - "short": False - } - ], - "image_url": "http://my-website.com/path/to/image.jpg", - "thumb_url": "http://example.com/path/to/thumb.png", - "footer": "Slack API", - "footer_icon": "https://platform.slack-edge.com/img/default_application_icon.png", - "ts": 123456789 - }, - ''' - - def service_formetter(summeries): - total_amount = defaultdict(Decimal) - for service_type, type_costs, service_amount in sorted(summeries, key=lambda service: service[2], - reverse=True): - message = '\n'.join( - '%s: %s' % (type, format_currency(cost, service_type.currency, decimal_quantization=False)) - for type, cost in type_costs - ) - service_name = '%s(%s) : %s ' % ( - service_type.service, - service_type.currency, - format_currency(service_amount, service_type.currency, decimal_quantization=False) - ) - yield { - 'fallback': '%s\n%s' % (service_name, message), - "color": "#ffffff", - 'title': service_name, - 'text': message - } - total_amount[service_type.currency] += service_amount - # totals - for currency, total in total_amount.items(): - total_amount_text = format_currency(total, currency, decimal_quantization=False) - message = 'TOT(%s): %s' % (currency, total_amount_text) - yield { - 'fallback': message, - "color": "#ffffff", - 'title': message, - } - - slack_data = { - 'text': "Daily AWS Billing Report :macos: from %s to %s" % ( - start_month.strftime('%y-%m-%d'), today.strftime('%y-%m-%d')), - "attachments": [ - *service_formetter(self.summery_costs(costs)), - { - "fallback": "Check out more at %s" % billing_url, - "color": "#eeeeee", - "title": "Check out more detils from AWS billing site", - "title_link": billing_url, - "footer": "Linus Billing Bot :confused_parrot:", - } - ] - } - await self.send_msg(slack_data) - - @memoized - async def get_current_manifest( - self, - standard_date=date.today(), - modified_entry=None - ): - duration = '%s-%s' % (standard_date.replace(day=1).strftime('%Y%m%d'), - (standard_date.replace(day=1) + relativedelta(months=1)).strftime('%Y%m%d')) - request_path = path.join(self.bucket_path, duration, self.manifest_name) - try: - req_args = ( - self.bucket_name, - request_path, - dict(modified_entry.params(self.bucket_name, request_path)) if modified_entry is not None else {} - ) - response = await self.loop.run_in_executor( - None, - lambda name, key, kwargs: self.s3.get_object(Bucket=name, Key=key, **kwargs), - *req_args - ) - obj = ujson.load(response['Body']) - return ( - obj['reportKeys'][0], - ObjEntry(self.bucket_name, request_path, response['ETag'], response['LastModified']) - ) - - except ClientError as e: - if e.response['Error']['Code'] == '304': - return None, modified_entry - raise e - - @memoized - async def get_current_cost_csv(self, report_path, modified_entry=None): - try: - - req_args = ( - self.bucket_name, - report_path, - dict(modified_entry.params(self.bucket_name, report_path)) if modified_entry is not None else {} - ) - response = await self.loop.run_in_executor( - None, - lambda name, key, kwargs: self.s3.get_object(Bucket=name, Key=key, **kwargs), - *req_args - ) - - unzipped = TextIOWrapper(GzipFile(mode='rb', fileobj=response['Body']), encoding='utf-8') - cost_reader = csv.reader(unzipped) - keys = next(cost_reader) - return ( - [ - item for item in filter( - lambda cost: cost.amount != 0, - map( - lambda usage: Cost(**{k: v for k, v in zip(keys, usage)}), - cost_reader - ) - ) - ], - ObjEntry(self.bucket_name, report_path, response['ETag'], response['LastModified']) - ) - - except ClientError as e: - if e.response['Error']['Code'] == '304': - return None, modified_entry - raise e - - def summery_costs(self, costs): - def cost_summation(amounts, cost): - amounts[cost.key] += cost.amount - return amounts - - amount_sumation = reduce(cost_summation, costs, defaultdict(Decimal)) - - def subcost_exteact(type_cost_item): - for vk, amt in type_cost_item: - yield vk.type, amt - - for k, v in groupby(sorted(amount_sumation.items(), key=lambda item: item[0]), key=lambda item: item[0].key): - sv = sorted(subcost_exteact(v), key=lambda item: item[1], reverse=True) - yield ( - k, - sv, - sum(map(lambda v: v[1], sv)) - ) +from util import install_except_hook, KSTTZ, UTCTZ def handle_daemon(name, action, args): log = logging.getLogger("main_loop") + from signal import SIGTERM + from util.log import wrap + if action == 'start': # load config + from util import EnteringLoop with EnteringLoop( name, log_dir=args.log_location, @@ -402,6 +21,14 @@ def handle_daemon(name, action, args): is_forground=args.foreground, pid_path=args.pid_path ) as loop: + from apscheduler.jobstores.memory import MemoryJobStore + from apscheduler.executors.asyncio import AsyncIOExecutor + from apscheduler.schedulers.asyncio import AsyncIOScheduler + from apscheduler.util import undefined + from signal import SIGTERM, SIGABRT, SIGINT, SIGQUIT + from handler import Session + import ujson + jobstores = { 'default': MemoryJobStore() } @@ -418,7 +45,7 @@ def handle_daemon(name, action, args): job_defaults=job_defaults, timezone=KSTTZ, event_loop=loop) - + with open(args.config) as cfgFile: config = ujson.load(cfgFile) sess = Session(loop=loop, **config) @@ -431,20 +58,21 @@ def handle_daemon(name, action, args): minute=args.standard_time.minute, second=args.standard_time.second, args=(args.standard_time,), - next_run_time=datetime.utcnow().replace(tzinfo=UTCTZ).astimezone(KSTTZ) if args.immediately else undefined + next_run_time=datetime.utcnow().replace(tzinfo=UTCTZ).astimezone( + KSTTZ) if args.immediately else undefined ) scheduler.start() - + if not args.immediately: log.info("job(%s) may be start in %s", job.name, job.next_run_time) - + def stopme(*_): scheduler.shutdown() sess.close() log.info("handler closed") loop.stop() log.info("loop closed") - + loop.add_signal_handler(SIGQUIT, stopme) loop.add_signal_handler(SIGTERM, stopme) loop.add_signal_handler(SIGINT, stopme) @@ -452,6 +80,7 @@ def handle_daemon(name, action, args): loop.run_forever() log.info("bye!") elif action == 'stop': + from os import path, kill wrap(name, level=args.log_level, stderr=True) if not path.exists(args.pid_path): log.warning("cannot find pidfile(%s)", args.pid_path) @@ -465,15 +94,18 @@ def handle_daemon(name, action, args): if __name__ == '__main__': + import argparse + from functools import partial + install_except_hook() - + # 실행 플래그 파싱 parser = argparse.ArgumentParser( prog='bill_man', epilog='contact @spi-ca.', description='report aws billing .', formatter_class=argparse.ArgumentDefaultsHelpFormatter) - + parser.add_argument('--pid_path', help='specify pidPath', default='bill_man.pid') @@ -482,11 +114,11 @@ if __name__ == '__main__': type=lambda level: logging._nameToLevel[level.upper()], choices=logging._nameToLevel.keys(), default='INFO') - + parser.set_defaults(func=lambda *_: parser.print_help()) - + sp = parser.add_subparsers() - + sp_start = sp.add_parser('start', help='Starts %(prog)s daemon', formatter_class=argparse.ArgumentDefaultsHelpFormatter) sp_start.add_argument('--config', @@ -512,12 +144,11 @@ if __name__ == '__main__': tzinfo=KSTTZ ), default='120000') - - sp_start.set_defaults(func=functools.partial(handle_daemon, parser.prog, 'start')) + + sp_start.set_defaults(func=partial(handle_daemon, parser.prog, 'start')) sp_stop = sp.add_parser('stop', help='Stop %(prog)s daemon', formatter_class=argparse.ArgumentDefaultsHelpFormatter) - sp_stop.set_defaults(func=functools.partial(handle_daemon, parser.prog, 'stop')) - + sp_stop.set_defaults(func=partial(handle_daemon, parser.prog, 'stop')) + args = parser.parse_args() args.func(args) - diff --git a/handler.py b/handler.py new file mode 100644 index 0000000..ae7b3cb --- /dev/null +++ b/handler.py @@ -0,0 +1,274 @@ +import csv +import ujson +from collections import defaultdict +from datetime import date, datetime +from decimal import Decimal +from functools import reduce +from gzip import GzipFile +from io import TextIOWrapper +from itertools import groupby +from os import path + +import requests +from dateutil.relativedelta import relativedelta + +from models import ObjEntry, Cost +from util import UTCTZ, KSTTZ, memoized + +requests.models.json = ujson + +from babel.numbers import format_currency +from botocore.exceptions import ClientError +from botocore.session import get_session +from requests import adapters + +import logging + + +class Session: + def __init__( + self, + loop, + aws_access_key_id, + aws_secret_access_key, + aws_region_name, + s3_bucket_name, + s3_bucket_key, + s3_manifest_name, + slack_webhook_url, + ): + self.log = logging.getLogger('bill_man.session') + self.loop = loop + self.webhook_url = slack_webhook_url + self.s3 = self.s3_client(aws_access_key_id, aws_secret_access_key, aws_region_name) + self.region_name = aws_region_name + self.bucket_name = s3_bucket_name + self.bucket_path = s3_bucket_key + self.manifest_name = s3_manifest_name + + self.handler = self.request_hander() + + def request_hander(self): + # 커넥션 관리 max_retries? + adapter = adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100) + # 이것은 알아서 cookie를 관리해준다. + handler = requests.Session() + # handler.stream = True + handler.mount("http://", adapter) + handler.mount("https://", adapter) + + handler.headers.update({ + 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3514.2 Safari/537.36' + }) + return handler + + def close(self): + if self.handler is not None: + self.handler.close() + self.handler = None + + def s3_client(self, aws_access_key_id, aws_secret_access_key, aws_region_name): + return get_session().create_client( + 's3', + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + region_name=aws_region_name, + ) + + async def send_msg(self, messages): + self.log.info("sending a message") + req_args = ( + self.webhook_url, + { + 'headers': {'Content-Type': 'application/json'}, + 'json': messages + } + ) + response = await self.loop.run_in_executor( + None, + lambda url, kwargs: self.handler.post(url, **kwargs), + *req_args + ) + + if response.status_code != 200: + raise ValueError( + 'Request to slack returned an error %s, the response is:\n%s' + % (response.status_code, response.text) + ) + self.log.info("message sent") + + async def get_resources(self, standard_time): + billing_url = 'https://console.aws.amazon.com/billing/home?region=%s#/' % (self.region_name) + + now = datetime.utcnow().replace(tzinfo=UTCTZ).astimezone(KSTTZ) + today = date.fromordinal(now.toordinal()) + + if datetime.combine(today, standard_time) >= now: + today -= relativedelta(days=1) + report_time = datetime.combine(today, standard_time) + start_month = today.replace(day=1) + + report_file_path = await self.get_current_manifest(standard_date=today) + + costs = filter( + lambda cost: cost.end < report_time, + await self.get_current_cost_csv(report_file_path) + ) + + ''' + { + "fallback": "Required plain-text summary of the attachment.", + "color": "#36a64f", + "pretext": "Optional text that appears above the attachment block", + "author_name": "Bobby Tables", + "author_link": "http://flickr.com/bobby/", + "author_icon": "http://flickr.com/icons/bobby.jpg", + "title": "Slack API Documentation", + "title_link": "https://api.slack.com/", + "text": "Optional text that appears within the attachment", + "fields": [ + { + "title": "Priority", + "value": "High", + "short": False + } + ], + "image_url": "http://my-website.com/path/to/image.jpg", + "thumb_url": "http://example.com/path/to/thumb.png", + "footer": "Slack API", + "footer_icon": "https://platform.slack-edge.com/img/default_application_icon.png", + "ts": 123456789 + }, + ''' + + def service_formetter(summeries): + total_amount = defaultdict(Decimal) + for service_type, type_costs, service_amount in sorted(summeries, key=lambda service: service[2], + reverse=True): + message = '\n'.join( + '%s: %s' % (type, format_currency(cost, service_type.currency, decimal_quantization=False)) + for type, cost in type_costs + ) + service_name = '%s(%s) : %s ' % ( + service_type.service, + service_type.currency, + format_currency(service_amount, service_type.currency, decimal_quantization=False) + ) + yield { + 'fallback': '%s\n%s' % (service_name, message), + "color": "#ffffff", + 'title': service_name, + 'text': message + } + total_amount[service_type.currency] += service_amount + # totals + for currency, total in total_amount.items(): + total_amount_text = format_currency(total, currency, decimal_quantization=False) + message = 'TOT(%s): %s' % (currency, total_amount_text) + yield { + 'fallback': message, + "color": "#ffffff", + 'title': message, + } + + slack_data = { + 'text': "Daily AWS Billing Report :macos: from %s to %s" % ( + start_month.strftime('%y-%m-%d'), today.strftime('%y-%m-%d')), + "attachments": [ + *service_formetter(self.summery_costs(costs)), + { + "fallback": "Check out more at %s" % billing_url, + "color": "#eeeeee", + "title": "Check out more detils from AWS billing site", + "title_link": billing_url, + "footer": "Linus Billing Bot :confused_parrot:", + } + ] + } + await self.send_msg(slack_data) + + @memoized + async def get_current_manifest( + self, + standard_date=date.today(), + modified_entry=None + ): + duration = '%s-%s' % (standard_date.replace(day=1).strftime('%Y%m%d'), + (standard_date.replace(day=1) + relativedelta(months=1)).strftime('%Y%m%d')) + request_path = path.join(self.bucket_path, duration, self.manifest_name) + try: + req_args = ( + self.bucket_name, + request_path, + dict(modified_entry.params(self.bucket_name, request_path)) if modified_entry is not None else {} + ) + response = await self.loop.run_in_executor( + None, + lambda name, key, kwargs: self.s3.get_object(Bucket=name, Key=key, **kwargs), + *req_args + ) + obj = ujson.load(response['Body']) + return ( + obj['reportKeys'][0], + ObjEntry(self.bucket_name, request_path, response['ETag'], response['LastModified']) + ) + + except ClientError as e: + if e.response['Error']['Code'] == '304': + return None, modified_entry + raise e + + @memoized + async def get_current_cost_csv(self, report_path, modified_entry=None): + try: + + req_args = ( + self.bucket_name, + report_path, + dict(modified_entry.params(self.bucket_name, report_path)) if modified_entry is not None else {} + ) + response = await self.loop.run_in_executor( + None, + lambda name, key, kwargs: self.s3.get_object(Bucket=name, Key=key, **kwargs), + *req_args + ) + + unzipped = TextIOWrapper(GzipFile(mode='rb', fileobj=response['Body']), encoding='utf-8') + cost_reader = csv.reader(unzipped) + keys = next(cost_reader) + return ( + [ + item for item in filter( + lambda cost: cost.amount != 0, + map( + lambda usage: Cost(**{k: v for k, v in zip(keys, usage)}), + cost_reader + ) + ) + ], + ObjEntry(self.bucket_name, report_path, response['ETag'], response['LastModified']) + ) + + except ClientError as e: + if e.response['Error']['Code'] == '304': + return None, modified_entry + raise e + + def summery_costs(self, costs): + def cost_summation(amounts, cost): + amounts[cost.key] += cost.amount + return amounts + + amount_sumation = reduce(cost_summation, costs, defaultdict(Decimal)) + + def subcost_exteact(type_cost_item): + for vk, amt in type_cost_item: + yield vk.type, amt + + for k, v in groupby(sorted(amount_sumation.items(), key=lambda item: item[0]), key=lambda item: item[0].key): + sv = sorted(subcost_exteact(v), key=lambda item: item[1], reverse=True) + yield ( + k, + sv, + sum(map(lambda v: v[1], sv)) + ) diff --git a/models.py b/models.py new file mode 100755 index 0000000..7d7060d --- /dev/null +++ b/models.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 + +from collections import namedtuple, defaultdict +from datetime import datetime +from decimal import Decimal + +from util import UTCTZ, KSTTZ + + +class ObjEntry(namedtuple('ModifiedEntry', + ('bucket', + 'path', + 'etags', + 'modified', + )) + ): + + def __new__(cls, *args, **kwargs): + item = super(ObjEntry, cls).__new__(cls, *args, **kwargs) + return item + + def params(self, bucket, path): + if (self.bucket, self.path) != (bucket, path): + return () + if self.etags is not None: + yield ('IfNoneMatch', self.etags) + if self.modified is not None: + yield ('IfModifiedSince', self.modified) + + +class Cost(namedtuple('Cost', + ('amount', + 'currency', + 'service', + 'type', + 'start', + 'end' + )) + ): + __slots__ = () + + def __new__(cls, *args, **kwargs): + argdict = defaultdict(lambda: None) + argdict.update(**kwargs) + parse_date = lambda item: datetime.strptime(item, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=UTCTZ).astimezone(KSTTZ) + args = ( + (Decimal(argdict['lineItem/BlendedCost']) if argdict['lineItem/BlendedCost'] else None), + (argdict['lineItem/CurrencyCode'] if argdict['lineItem/CurrencyCode'] else None), + (argdict['product/ProductName'] if argdict['product/ProductName'] else None), + (argdict['lineItem/UsageType'] if argdict['lineItem/UsageType'] else None), + (parse_date(argdict['lineItem/UsageStartDate']) if argdict['lineItem/UsageStartDate'] else None), + (parse_date(argdict['lineItem/UsageEndDate']) if argdict['lineItem/UsageEndDate'] else None) + ) + item = super(Cost, cls).__new__(cls, *args) + return item + + @property + def key(self): + return CostSummery(self.currency, self.service, self.type) + + +class CostSubItem(namedtuple('CostSubItem', + ('currency', + 'service', + )) + ): + __slots__ = () + + def __new__(cls, *args): + item = super(CostSubItem, cls).__new__(cls, *args) + return item + + +class CostSummery(namedtuple('CostSummery', + ('currency', + 'service', + 'type' + )) + ): + __slots__ = () + + def __new__(cls, *args): + item = super(CostSummery, cls).__new__(cls, *args) + return item + + @property + def key(self): + return CostSubItem(self.currency, self.service) diff --git a/util/__init__.py b/util/__init__.py index 191899f..46837ec 100644 --- a/util/__init__.py +++ b/util/__init__.py @@ -1,7 +1,9 @@ +from .asyn import EnteringLoop from .file import to_module_path, find_executable_of_specify, makedirs from .hook import install_except_hook from .log import wrap as wrap_log -from .asyn import EnteringLoop +from .method import memoized +from .tz import UTCTZ, KSTTZ __all__ = ( 'install_except_hook', @@ -9,5 +11,8 @@ __all__ = ( 'wrap_log', 'EnteringLoop', 'to_module_path', - 'makedirs' + 'makedirs', + 'memoized', + 'UTCTZ', + 'KSTTZ' ) diff --git a/util/daemon/__init__.py b/util/daemon/__init__.py index b32d4e7..1c9e122 100644 --- a/util/daemon/__init__.py +++ b/util/daemon/__init__.py @@ -8,12 +8,11 @@ Copyright (c) 2014, Stephan Schultchen. License: MIT (see LICENSE for details) """ - from .daemon import DaemonContext, DaemonError from .pidfile import PidFile __all__ = ( "DaemonContext", "DaemonError", - "PidFile", -) \ No newline at end of file + "PidFile" +) diff --git a/util/method.py b/util/method.py new file mode 100644 index 0000000..80d6259 --- /dev/null +++ b/util/method.py @@ -0,0 +1,22 @@ +import functools + + +def memoized(func): + cache = None + entry = None + + async def callee(*args, **kwargs): + nonlocal entry, cache, func + kwargs['modified_entry'] = entry + returned, entry = await func(*args, **kwargs) + if returned is not None: + cache = returned + return returned + elif cache is None: + raise RuntimeError('no ret value') + else: + return cache + + # @functools.wraps 대신 + functools.update_wrapper(callee, func) + return callee diff --git a/util/tz.py b/util/tz.py new file mode 100644 index 0000000..65f5887 --- /dev/null +++ b/util/tz.py @@ -0,0 +1,8 @@ +from datetime import datetime + +import pytz + +UTCTZ = pytz.utc +# localize 안하면 timezone이 canonical하게 표현되지 않는다 . +KSTTZ = pytz.timezone('Asia/Seoul').localize(datetime.now()).tzinfo +