import csv import ujson from collections import defaultdict from datetime import date, datetime from decimal import Decimal from functools import reduce from gzip import GzipFile from io import TextIOWrapper from itertools import groupby from os import path import requests from dateutil.relativedelta import relativedelta from models import ObjEntry, Cost from util import UTCTZ, KSTTZ, memoized requests.models.json = ujson from babel.numbers import format_currency from botocore.exceptions import ClientError from botocore.session import get_session from requests import adapters import logging class Session: def __init__( self, loop, aws_access_key_id, aws_secret_access_key, aws_region_name, s3_bucket_name, s3_bucket_key, s3_manifest_name, slack_webhook_url, ): self.log = logging.getLogger('bill_man.session') self.loop = loop self.webhook_url = slack_webhook_url self.s3 = self.s3_client(aws_access_key_id, aws_secret_access_key, aws_region_name) self.region_name = aws_region_name self.bucket_name = s3_bucket_name self.bucket_path = s3_bucket_key self.manifest_name = s3_manifest_name self.handler = self.request_hander() def request_hander(self): # 커넥션 관리 max_retries? adapter = adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100) # 이것은 알아서 cookie를 관리해준다. handler = requests.Session() # handler.stream = True handler.mount("http://", adapter) handler.mount("https://", adapter) handler.headers.update({ 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3514.2 Safari/537.36' }) return handler def close(self): if self.handler is not None: self.handler.close() self.handler = None def s3_client(self, aws_access_key_id, aws_secret_access_key, aws_region_name): return get_session().create_client( 's3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=aws_region_name, ) async def send_msg(self, messages): self.log.info("sending a message") req_args = ( self.webhook_url, { 'headers': {'Content-Type': 'application/json'}, 'json': messages } ) response = await self.loop.run_in_executor( None, lambda url, kwargs: self.handler.post(url, **kwargs), *req_args ) if response.status_code != 200: raise ValueError( 'Request to slack returned an error %s, the response is:\n%s' % (response.status_code, response.text) ) self.log.info("message sent") async def get_resources(self, standard_time): billing_url = 'https://console.aws.amazon.com/billing/home?region=%s#/' % (self.region_name) now = datetime.utcnow().replace(tzinfo=UTCTZ).astimezone(KSTTZ) today = date.fromordinal(now.toordinal()) if datetime.combine(today, standard_time) >= now: today -= relativedelta(days=1) report_time = datetime.combine(today, standard_time) start_month = today.replace(day=1) report_file_path = await self.get_current_manifest(standard_date=today) costs = filter( lambda cost: cost.end < report_time, await self.get_current_cost_csv(report_file_path) ) ''' { "fallback": "Required plain-text summary of the attachment.", "color": "#36a64f", "pretext": "Optional text that appears above the attachment block", "author_name": "Bobby Tables", "author_link": "http://flickr.com/bobby/", "author_icon": "http://flickr.com/icons/bobby.jpg", "title": "Slack API Documentation", "title_link": "https://api.slack.com/", "text": "Optional text that appears within the attachment", "fields": [ { "title": "Priority", "value": "High", "short": False } ], "image_url": "http://my-website.com/path/to/image.jpg", "thumb_url": "http://example.com/path/to/thumb.png", "footer": "Slack API", "footer_icon": "https://platform.slack-edge.com/img/default_application_icon.png", "ts": 123456789 }, ''' def service_formetter(summeries): total_amount = defaultdict(Decimal) for service_type, type_costs, service_amount in sorted(summeries, key=lambda service: service[2], reverse=True): message = '\n'.join( '%s: %s' % (type, format_currency(cost, service_type.currency, decimal_quantization=False)) for type, cost in type_costs ) service_name = '%s(%s) : %s ' % ( service_type.service, service_type.currency, format_currency(service_amount, service_type.currency, decimal_quantization=False) ) yield { 'fallback': '%s\n%s' % (service_name, message), "color": "#ffffff", 'title': service_name, 'text': message } total_amount[service_type.currency] += service_amount # totals for currency, total in total_amount.items(): total_amount_text = format_currency(total, currency, decimal_quantization=False) message = 'TOT(%s): %s' % (currency, total_amount_text) yield { 'fallback': message, "color": "#ffffff", 'title': message, } slack_data = { 'text': "Daily AWS Billing Report :macos: from %s to %s" % ( start_month.strftime('%y-%m-%d'), today.strftime('%y-%m-%d')), "attachments": [ *service_formetter(self.summery_costs(costs)), { "fallback": "Check out more at %s" % billing_url, "color": "#eeeeee", "title": "Check out more detils from AWS billing site", "title_link": billing_url, "footer": "Linus Billing Bot :confused_parrot:", } ] } await self.send_msg(slack_data) @memoized async def get_current_manifest( self, standard_date=date.today(), modified_entry=None ): duration = '%s-%s' % (standard_date.replace(day=1).strftime('%Y%m%d'), (standard_date.replace(day=1) + relativedelta(months=1)).strftime('%Y%m%d')) request_path = path.join(self.bucket_path, duration, self.manifest_name) try: req_args = ( self.bucket_name, request_path, dict(modified_entry.params(self.bucket_name, request_path)) if modified_entry is not None else {} ) response = await self.loop.run_in_executor( None, lambda name, key, kwargs: self.s3.get_object(Bucket=name, Key=key, **kwargs), *req_args ) obj = ujson.load(response['Body']) return ( obj['reportKeys'][0], ObjEntry(self.bucket_name, request_path, response['ETag'], response['LastModified']) ) except ClientError as e: if e.response['Error']['Code'] == '304': return None, modified_entry raise e @memoized async def get_current_cost_csv(self, report_path, modified_entry=None): try: req_args = ( self.bucket_name, report_path, dict(modified_entry.params(self.bucket_name, report_path)) if modified_entry is not None else {} ) response = await self.loop.run_in_executor( None, lambda name, key, kwargs: self.s3.get_object(Bucket=name, Key=key, **kwargs), *req_args ) unzipped = TextIOWrapper(GzipFile(mode='rb', fileobj=response['Body']), encoding='utf-8') cost_reader = csv.reader(unzipped) keys = next(cost_reader) return ( [ item for item in filter( lambda cost: cost.amount != 0, map( lambda usage: Cost(**{k: v for k, v in zip(keys, usage)}), cost_reader ) ) ], ObjEntry(self.bucket_name, report_path, response['ETag'], response['LastModified']) ) except ClientError as e: if e.response['Error']['Code'] == '304': return None, modified_entry raise e def summery_costs(self, costs): def cost_summation(amounts, cost): amounts[cost.key] += cost.amount return amounts amount_sumation = reduce(cost_summation, costs, defaultdict(Decimal)) def subcost_exteact(type_cost_item): for vk, amt in type_cost_item: yield vk.type, amt for k, v in groupby(sorted(amount_sumation.items(), key=lambda item: item[0]), key=lambda item: item[0].key): sv = sorted(subcost_exteact(v), key=lambda item: item[1], reverse=True) yield ( k, sv, sum(map(lambda v: v[1], sv)) )