#!/usr/bin/env python3 import csv import functools import ujson from collections import namedtuple, defaultdict from datetime import date, datetime, time from decimal import Decimal from functools import reduce from gzip import GzipFile from io import TextIOWrapper from itertools import groupby from os import path, kill import pytz import requests from dateutil.relativedelta import relativedelta from util import EnteringLoop from util.log import wrap requests.models.json = ujson from babel.numbers import format_currency from botocore.exceptions import ClientError from botocore.session import get_session from requests import adapters from apscheduler.util import undefined from apscheduler.executors.asyncio import AsyncIOExecutor from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.schedulers.asyncio import AsyncIOScheduler import argparse import logging from util import install_except_hook from signal import SIGQUIT, SIGINT, SIGABRT, SIGTERM UTCTZ = pytz.utc # localize 안하면 timezone이 canonical하게 표현되지 않는다 . KSTTZ = pytz.timezone('Asia/Seoul').localize(datetime.now()).tzinfo class ObjEntry(namedtuple('ModifiedEntry', ('bucket', 'path', 'etags', 'modified', )) ): def __new__(cls, *args, **kwargs): item = super(ObjEntry, cls).__new__(cls, *args, **kwargs) return item def params(self, bucket, path): if (self.bucket, self.path) != (bucket, path): return () if self.etags is not None: yield ('IfNoneMatch', self.etags) if self.modified is not None: yield ('IfModifiedSince', self.modified) class Cost(namedtuple('Cost', ('amount', 'currency', 'service', 'type', 'start', 'end' )) ): __slots__ = () def __new__(cls, *args, **kwargs): argdict = defaultdict(lambda: None) argdict.update(**kwargs) parse_date = lambda item: datetime.strptime(item, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=UTCTZ).astimezone(KSTTZ) args = ( (Decimal(argdict['lineItem/BlendedCost']) if argdict['lineItem/BlendedCost'] else None), (argdict['lineItem/CurrencyCode'] if argdict['lineItem/CurrencyCode'] else None), (argdict['product/ProductName'] if argdict['product/ProductName'] else None), (argdict['lineItem/UsageType'] if argdict['lineItem/UsageType'] else None), (parse_date(argdict['lineItem/UsageStartDate']) if argdict['lineItem/UsageStartDate'] else None), (parse_date(argdict['lineItem/UsageEndDate']) if argdict['lineItem/UsageEndDate'] else None) ) item = super(Cost, cls).__new__(cls, *args) return item @property def key(self): return CostSummery(self.currency, self.service, self.type) class CostSubItem(namedtuple('CostSubItem', ('currency', 'service', )) ): __slots__ = () def __new__(cls, *args): item = super(CostSubItem, cls).__new__(cls, *args) return item class CostSummery(namedtuple('CostSummery', ('currency', 'service', 'type' )) ): __slots__ = () def __new__(cls, *args): item = super(CostSummery, cls).__new__(cls, *args) return item @property def key(self): return CostSubItem(self.currency, self.service) def memoized(func): cache = None entry = None async def callee(*args, **kwargs): nonlocal entry, cache, func kwargs['modified_entry'] = entry returned, entry = await func(*args, **kwargs) if returned is not None: cache = returned return returned elif cache is None: raise RuntimeError('no ret value') else: return cache # @functools.wraps 대신 functools.update_wrapper(callee, func) return callee class Session: def __init__( self, loop, aws_access_key_id, aws_secret_access_key, aws_region_name, s3_bucket_name, s3_bucket_key, s3_manifest_name, slack_webhook_url, ): self.log = logging.getLogger('bill_man.session') self.loop = loop self.webhook_url = slack_webhook_url self.s3 = self.s3_client(aws_access_key_id, aws_secret_access_key, aws_region_name) self.region_name = aws_region_name self.bucket_name = s3_bucket_name self.bucket_path = s3_bucket_key self.manifest_name = s3_manifest_name self.handler = self.request_hander() def request_hander(self): # 커넥션 관리 max_retries? adapter = adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100) # 이것은 알아서 cookie를 관리해준다. handler = requests.Session() # handler.stream = True handler.mount("http://", adapter) handler.mount("https://", adapter) handler.headers.update({ 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3514.2 Safari/537.36' }) return handler def close(self): if self.handler is not None: self.handler.close() self.handler = None def s3_client(self, aws_access_key_id, aws_secret_access_key, aws_region_name): return get_session().create_client( 's3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=aws_region_name, ) async def send_msg(self, messages): self.log.info("sending a message") req_args = ( self.webhook_url, { 'headers': {'Content-Type': 'application/json'}, 'json': messages } ) response = await self.loop.run_in_executor( None, lambda url, kwargs: self.handler.post(url, **kwargs), *req_args ) if response.status_code != 200: raise ValueError( 'Request to slack returned an error %s, the response is:\n%s' % (response.status_code, response.text) ) self.log.info("message sent") async def get_resources(self, standard_time): billing_url = 'https://console.aws.amazon.com/billing/home?region=%s#/' % (self.region_name) now=datetime.utcnow().replace(tzinfo=UTCTZ).astimezone(KSTTZ) today = date.fromordinal(now.toordinal()) if datetime.combine(today, standard_time) >= now: today -= relativedelta(days=1) report_time = datetime.combine(today, standard_time) start_month = today.replace(day=1) report_file_path = await self.get_current_manifest(standard_date=today) costs = filter( lambda cost: cost.end < report_time, await self.get_current_cost_csv(report_file_path) ) ''' { "fallback": "Required plain-text summary of the attachment.", "color": "#36a64f", "pretext": "Optional text that appears above the attachment block", "author_name": "Bobby Tables", "author_link": "http://flickr.com/bobby/", "author_icon": "http://flickr.com/icons/bobby.jpg", "title": "Slack API Documentation", "title_link": "https://api.slack.com/", "text": "Optional text that appears within the attachment", "fields": [ { "title": "Priority", "value": "High", "short": False } ], "image_url": "http://my-website.com/path/to/image.jpg", "thumb_url": "http://example.com/path/to/thumb.png", "footer": "Slack API", "footer_icon": "https://platform.slack-edge.com/img/default_application_icon.png", "ts": 123456789 }, ''' def service_formetter(summeries): total_amount = defaultdict(Decimal) for service_type, type_costs, service_amount in sorted(summeries, key=lambda service: service[2], reverse=True): message = '\n'.join( '%s: %s' % (type, format_currency(cost, service_type.currency, decimal_quantization=False)) for type, cost in type_costs ) service_name = '%s(%s) : %s ' % ( service_type.service, service_type.currency, format_currency(service_amount, service_type.currency, decimal_quantization=False) ) yield { 'fallback': '%s\n%s' % (service_name, message), "color": "#ffffff", 'title': service_name, 'text': message } total_amount[service_type.currency] += service_amount # totals for currency, total in total_amount.items(): total_amount_text = format_currency(total, currency, decimal_quantization=False) message = 'TOT(%s): %s' % (currency, total_amount_text) yield { 'fallback': message, "color": "#ffffff", 'title': message, } slack_data = { 'text': "Daily AWS Billing Report :macos: from %s to %s" % ( start_month.strftime('%y-%m-%d'), today.strftime('%y-%m-%d')), "attachments": [ *service_formetter(self.summery_costs(costs)), { "fallback": "Check out more at %s" % billing_url, "color": "#eeeeee", "title": "Check out more detils from AWS billing site", "title_link": billing_url, "footer": "Linus Billing Bot :confused_parrot:", } ] } await self.send_msg(slack_data) @memoized async def get_current_manifest( self, standard_date=date.today(), modified_entry=None ): duration = '%s-%s' % (standard_date.replace(day=1).strftime('%Y%m%d'), (standard_date.replace(day=1) + relativedelta(months=1)).strftime('%Y%m%d')) request_path = path.join(self.bucket_path, duration, self.manifest_name) try: req_args = ( self.bucket_name, request_path, dict(modified_entry.params(self.bucket_name, request_path)) if modified_entry is not None else {} ) response = await self.loop.run_in_executor( None, lambda name, key, kwargs: self.s3.get_object(Bucket=name, Key=key, **kwargs), *req_args ) obj = ujson.load(response['Body']) return ( obj['reportKeys'][0], ObjEntry(self.bucket_name, request_path, response['ETag'], response['LastModified']) ) except ClientError as e: if e.response['Error']['Code'] == '304': return None, modified_entry raise e @memoized async def get_current_cost_csv(self, report_path, modified_entry=None): try: req_args = ( self.bucket_name, report_path, dict(modified_entry.params(self.bucket_name, report_path)) if modified_entry is not None else {} ) response = await self.loop.run_in_executor( None, lambda name, key, kwargs: self.s3.get_object(Bucket=name, Key=key, **kwargs), *req_args ) unzipped = TextIOWrapper(GzipFile(mode='rb', fileobj=response['Body']), encoding='utf-8') cost_reader = csv.reader(unzipped) keys = next(cost_reader) return ( [ item for item in filter( lambda cost: cost.amount != 0, map( lambda usage: Cost(**{k: v for k, v in zip(keys, usage)}), cost_reader ) ) ], ObjEntry(self.bucket_name, report_path, response['ETag'], response['LastModified']) ) except ClientError as e: if e.response['Error']['Code'] == '304': return None, modified_entry raise e def summery_costs(self, costs): def cost_summation(amounts, cost): amounts[cost.key] += cost.amount return amounts amount_sumation = reduce(cost_summation, costs, defaultdict(Decimal)) def subcost_exteact(type_cost_item): for vk, amt in type_cost_item: yield vk.type, amt for k, v in groupby(sorted(amount_sumation.items(), key=lambda item: item[0]), key=lambda item: item[0].key): sv = sorted(subcost_exteact(v), key=lambda item: item[1], reverse=True) yield ( k, sv, sum(map(lambda v: v[1], sv)) ) def handle_daemon(name, action, args): log = logging.getLogger("main_loop") if action == 'start': # load config with EnteringLoop( name, log_dir=args.log_location, log_level=args.log_level, is_forground=args.foreground, pid_path=args.pid_path ) as loop: jobstores = { 'default': MemoryJobStore() } executors = { 'default': AsyncIOExecutor(), } job_defaults = { 'coalesce': True, 'max_instances': 2 } scheduler = AsyncIOScheduler( jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=KSTTZ, event_loop=loop) with open(args.config) as cfgFile: config = ujson.load(cfgFile) sess = Session(loop=loop, **config) del config job = scheduler.add_job(sess.get_resources, name="charger", trigger='cron', day_of_week='mon-fri', hour=args.standard_time.hour, minute=args.standard_time.minute, second=args.standard_time.second, args=(args.standard_time,), next_run_time=datetime.utcnow().replace(tzinfo=UTCTZ).astimezone(KSTTZ) if args.immediately else undefined ) scheduler.start() if not args.immediately: log.info("job(%s) may be start in %s", job.name, job.next_run_time) def stopme(*_): scheduler.shutdown() sess.close() log.info("handler closed") loop.stop() log.info("loop closed") loop.add_signal_handler(SIGQUIT, stopme) loop.add_signal_handler(SIGTERM, stopme) loop.add_signal_handler(SIGINT, stopme) loop.add_signal_handler(SIGABRT, stopme) loop.run_forever() log.info("bye!") elif action == 'stop': wrap(name, level=args.log_level, stderr=True) if not path.exists(args.pid_path): log.warning("cannot find pidfile(%s)", args.pid_path) return with open(args.pid_path, 'r') as pidFile: pid = int(pidFile.readline()) kill(pid, SIGTERM) log.warning("pid(%d) sigterm!", pid) else: raise NotImplementedError() if __name__ == '__main__': install_except_hook() # 실행 플래그 파싱 parser = argparse.ArgumentParser( prog='bill_man', epilog='contact @spi-ca.', description='report aws billing .', formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--pid_path', help='specify pidPath', default='bill_man.pid') parser.add_argument('--log_level', help='set logging level', type=lambda level: logging._nameToLevel[level.upper()], choices=logging._nameToLevel.keys(), default='INFO') parser.set_defaults(func=lambda *_: parser.print_help()) sp = parser.add_subparsers() sp_start = sp.add_parser('start', help='Starts %(prog)s daemon', formatter_class=argparse.ArgumentDefaultsHelpFormatter) sp_start.add_argument('--config', help='specify config file path', default='bill_man.json') sp_start.add_argument('--foreground', help='Don\'t daemonize!', default=False, action='store_true') sp_start.add_argument('--immediately', help='run batch now!', default=False, action='store_true') sp_start.add_argument('--log_location', help='specify location of logs!', default='log') sp_start.add_argument('--standard_time', help='set standard time/HHMMSS', type=lambda ti: time( hour=int(ti[0:2]), minute=int(ti[2:4]), second=int(ti[4:6]), tzinfo=KSTTZ ), default='120000') sp_start.set_defaults(func=functools.partial(handle_daemon, parser.prog, 'start')) sp_stop = sp.add_parser('stop', help='Stop %(prog)s daemon', formatter_class=argparse.ArgumentDefaultsHelpFormatter) sp_stop.set_defaults(func=functools.partial(handle_daemon, parser.prog, 'stop')) args = parser.parse_args() args.func(args)