1
0
Fork 0
bill_man/handler.py

275 lines
10 KiB
Python

import csv
import ujson
from collections import defaultdict
from datetime import date, datetime
from decimal import Decimal
from functools import reduce
from gzip import GzipFile
from io import TextIOWrapper
from itertools import groupby
from os import path
import requests
from dateutil.relativedelta import relativedelta
from models import ObjEntry, Cost
from util import UTCTZ, KSTTZ, memoized
requests.models.json = ujson
from babel.numbers import format_currency
from botocore.exceptions import ClientError
from botocore.session import get_session
from requests import adapters
import logging
class Session:
def __init__(
self,
loop,
aws_access_key_id,
aws_secret_access_key,
aws_region_name,
s3_bucket_name,
s3_bucket_key,
s3_manifest_name,
slack_webhook_url,
):
self.log = logging.getLogger('bill_man.session')
self.loop = loop
self.webhook_url = slack_webhook_url
self.s3 = self.s3_client(aws_access_key_id, aws_secret_access_key, aws_region_name)
self.region_name = aws_region_name
self.bucket_name = s3_bucket_name
self.bucket_path = s3_bucket_key
self.manifest_name = s3_manifest_name
self.handler = self.request_hander()
def request_hander(self):
# 커넥션 관리 max_retries?
adapter = adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100)
# 이것은 알아서 cookie를 관리해준다.
handler = requests.Session()
# handler.stream = True
handler.mount("http://", adapter)
handler.mount("https://", adapter)
handler.headers.update({
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3514.2 Safari/537.36'
})
return handler
def close(self):
if self.handler is not None:
self.handler.close()
self.handler = None
def s3_client(self, aws_access_key_id, aws_secret_access_key, aws_region_name):
return get_session().create_client(
's3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=aws_region_name,
)
async def send_msg(self, messages):
self.log.info("sending a message")
req_args = (
self.webhook_url,
{
'headers': {'Content-Type': 'application/json'},
'json': messages
}
)
response = await self.loop.run_in_executor(
None,
lambda url, kwargs: self.handler.post(url, **kwargs),
*req_args
)
if response.status_code != 200:
raise ValueError(
'Request to slack returned an error %s, the response is:\n%s'
% (response.status_code, response.text)
)
self.log.info("message sent")
async def get_resources(self, standard_time):
billing_url = 'https://console.aws.amazon.com/billing/home?region=%s#/' % (self.region_name)
now = datetime.utcnow().replace(tzinfo=UTCTZ).astimezone(KSTTZ)
today = date.fromordinal(now.toordinal())
if datetime.combine(today, standard_time) >= now:
today -= relativedelta(days=1)
report_time = datetime.combine(today, standard_time)
start_month = today.replace(day=1)
report_file_path = await self.get_current_manifest(standard_date=today)
costs = filter(
lambda cost: cost.end < report_time,
await self.get_current_cost_csv(report_file_path)
)
'''
{
"fallback": "Required plain-text summary of the attachment.",
"color": "#36a64f",
"pretext": "Optional text that appears above the attachment block",
"author_name": "Bobby Tables",
"author_link": "http://flickr.com/bobby/",
"author_icon": "http://flickr.com/icons/bobby.jpg",
"title": "Slack API Documentation",
"title_link": "https://api.slack.com/",
"text": "Optional text that appears within the attachment",
"fields": [
{
"title": "Priority",
"value": "High",
"short": False
}
],
"image_url": "http://my-website.com/path/to/image.jpg",
"thumb_url": "http://example.com/path/to/thumb.png",
"footer": "Slack API",
"footer_icon": "https://platform.slack-edge.com/img/default_application_icon.png",
"ts": 123456789
},
'''
def service_formetter(summeries):
total_amount = defaultdict(Decimal)
for service_type, type_costs, service_amount in sorted(summeries, key=lambda service: service[2],
reverse=True):
message = '\n'.join(
'%s: %s' % (type, format_currency(cost, service_type.currency, decimal_quantization=False))
for type, cost in type_costs
)
service_name = '%s(%s) : %s ' % (
service_type.service,
service_type.currency,
format_currency(service_amount, service_type.currency, decimal_quantization=False)
)
yield {
'fallback': '%s\n%s' % (service_name, message),
"color": "#ffffff",
'title': service_name,
'text': message
}
total_amount[service_type.currency] += service_amount
# totals
for currency, total in total_amount.items():
total_amount_text = format_currency(total, currency, decimal_quantization=False)
message = 'TOT(%s): %s' % (currency, total_amount_text)
yield {
'fallback': message,
"color": "#ffffff",
'title': message,
}
slack_data = {
'text': "Daily AWS Billing Report :macos: from %s to %s" % (
start_month.strftime('%y-%m-%d'), today.strftime('%y-%m-%d')),
"attachments": [
*service_formetter(self.summery_costs(costs)),
{
"fallback": "Check out more at %s" % billing_url,
"color": "#eeeeee",
"title": "Check out more detils from AWS billing site",
"title_link": billing_url,
"footer": "Linus Billing Bot :confused_parrot:",
}
]
}
await self.send_msg(slack_data)
@memoized
async def get_current_manifest(
self,
standard_date=date.today(),
modified_entry=None
):
duration = '%s-%s' % (standard_date.replace(day=1).strftime('%Y%m%d'),
(standard_date.replace(day=1) + relativedelta(months=1)).strftime('%Y%m%d'))
request_path = path.join(self.bucket_path, duration, self.manifest_name)
try:
req_args = (
self.bucket_name,
request_path,
dict(modified_entry.params(self.bucket_name, request_path)) if modified_entry is not None else {}
)
response = await self.loop.run_in_executor(
None,
lambda name, key, kwargs: self.s3.get_object(Bucket=name, Key=key, **kwargs),
*req_args
)
obj = ujson.load(response['Body'])
return (
obj['reportKeys'][0],
ObjEntry(self.bucket_name, request_path, response['ETag'], response['LastModified'])
)
except ClientError as e:
if e.response['Error']['Code'] == '304':
return None, modified_entry
raise e
@memoized
async def get_current_cost_csv(self, report_path, modified_entry=None):
try:
req_args = (
self.bucket_name,
report_path,
dict(modified_entry.params(self.bucket_name, report_path)) if modified_entry is not None else {}
)
response = await self.loop.run_in_executor(
None,
lambda name, key, kwargs: self.s3.get_object(Bucket=name, Key=key, **kwargs),
*req_args
)
unzipped = TextIOWrapper(GzipFile(mode='rb', fileobj=response['Body']), encoding='utf-8')
cost_reader = csv.reader(unzipped)
keys = next(cost_reader)
return (
[
item for item in filter(
lambda cost: cost.amount != 0,
map(
lambda usage: Cost(**{k: v for k, v in zip(keys, usage)}),
cost_reader
)
)
],
ObjEntry(self.bucket_name, report_path, response['ETag'], response['LastModified'])
)
except ClientError as e:
if e.response['Error']['Code'] == '304':
return None, modified_entry
raise e
def summery_costs(self, costs):
def cost_summation(amounts, cost):
amounts[cost.key] += cost.amount
return amounts
amount_sumation = reduce(cost_summation, costs, defaultdict(Decimal))
def subcost_exteact(type_cost_item):
for vk, amt in type_cost_item:
yield vk.type, amt
for k, v in groupby(sorted(amount_sumation.items(), key=lambda item: item[0]), key=lambda item: item[0].key):
sv = sorted(subcost_exteact(v), key=lambda item: item[1], reverse=True)
yield (
k,
sv,
sum(map(lambda v: v[1], sv))
)