275 lines
10 KiB
Python
275 lines
10 KiB
Python
import csv
|
|
import ujson
|
|
from collections import defaultdict
|
|
from datetime import date, datetime
|
|
from decimal import Decimal
|
|
from functools import reduce
|
|
from gzip import GzipFile
|
|
from io import TextIOWrapper
|
|
from itertools import groupby
|
|
from os import path
|
|
|
|
import requests
|
|
from dateutil.relativedelta import relativedelta
|
|
|
|
from models import ObjEntry, Cost
|
|
from util import UTCTZ, KSTTZ, memoized
|
|
|
|
requests.models.json = ujson
|
|
|
|
from babel.numbers import format_currency
|
|
from botocore.exceptions import ClientError
|
|
from botocore.session import get_session
|
|
from requests import adapters
|
|
|
|
import logging
|
|
|
|
|
|
class Session:
|
|
def __init__(
|
|
self,
|
|
loop,
|
|
aws_access_key_id,
|
|
aws_secret_access_key,
|
|
aws_region_name,
|
|
s3_bucket_name,
|
|
s3_bucket_key,
|
|
s3_manifest_name,
|
|
slack_webhook_url,
|
|
):
|
|
self.log = logging.getLogger('bill_man.session')
|
|
self.loop = loop
|
|
self.webhook_url = slack_webhook_url
|
|
self.s3 = self.s3_client(aws_access_key_id, aws_secret_access_key, aws_region_name)
|
|
self.region_name = aws_region_name
|
|
self.bucket_name = s3_bucket_name
|
|
self.bucket_path = s3_bucket_key
|
|
self.manifest_name = s3_manifest_name
|
|
|
|
self.handler = self.request_hander()
|
|
|
|
def request_hander(self):
|
|
# 커넥션 관리 max_retries?
|
|
adapter = adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100)
|
|
# 이것은 알아서 cookie를 관리해준다.
|
|
handler = requests.Session()
|
|
# handler.stream = True
|
|
handler.mount("http://", adapter)
|
|
handler.mount("https://", adapter)
|
|
|
|
handler.headers.update({
|
|
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3514.2 Safari/537.36'
|
|
})
|
|
return handler
|
|
|
|
def close(self):
|
|
if self.handler is not None:
|
|
self.handler.close()
|
|
self.handler = None
|
|
|
|
def s3_client(self, aws_access_key_id, aws_secret_access_key, aws_region_name):
|
|
return get_session().create_client(
|
|
's3',
|
|
aws_access_key_id=aws_access_key_id,
|
|
aws_secret_access_key=aws_secret_access_key,
|
|
region_name=aws_region_name,
|
|
)
|
|
|
|
async def send_msg(self, messages):
|
|
self.log.info("sending a message")
|
|
req_args = (
|
|
self.webhook_url,
|
|
{
|
|
'headers': {'Content-Type': 'application/json'},
|
|
'json': messages
|
|
}
|
|
)
|
|
response = await self.loop.run_in_executor(
|
|
None,
|
|
lambda url, kwargs: self.handler.post(url, **kwargs),
|
|
*req_args
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
raise ValueError(
|
|
'Request to slack returned an error %s, the response is:\n%s'
|
|
% (response.status_code, response.text)
|
|
)
|
|
self.log.info("message sent")
|
|
|
|
async def get_resources(self, standard_time):
|
|
billing_url = 'https://console.aws.amazon.com/billing/home?region=%s#/' % (self.region_name)
|
|
|
|
now = datetime.utcnow().replace(tzinfo=UTCTZ).astimezone(KSTTZ)
|
|
today = date.fromordinal(now.toordinal())
|
|
|
|
if datetime.combine(today, standard_time) >= now:
|
|
today -= relativedelta(days=1)
|
|
report_time = datetime.combine(today, standard_time)
|
|
start_month = today.replace(day=1)
|
|
|
|
report_file_path = await self.get_current_manifest(standard_date=today)
|
|
|
|
costs = filter(
|
|
lambda cost: cost.end < report_time,
|
|
await self.get_current_cost_csv(report_file_path)
|
|
)
|
|
|
|
'''
|
|
{
|
|
"fallback": "Required plain-text summary of the attachment.",
|
|
"color": "#36a64f",
|
|
"pretext": "Optional text that appears above the attachment block",
|
|
"author_name": "Bobby Tables",
|
|
"author_link": "http://flickr.com/bobby/",
|
|
"author_icon": "http://flickr.com/icons/bobby.jpg",
|
|
"title": "Slack API Documentation",
|
|
"title_link": "https://api.slack.com/",
|
|
"text": "Optional text that appears within the attachment",
|
|
"fields": [
|
|
{
|
|
"title": "Priority",
|
|
"value": "High",
|
|
"short": False
|
|
}
|
|
],
|
|
"image_url": "http://my-website.com/path/to/image.jpg",
|
|
"thumb_url": "http://example.com/path/to/thumb.png",
|
|
"footer": "Slack API",
|
|
"footer_icon": "https://platform.slack-edge.com/img/default_application_icon.png",
|
|
"ts": 123456789
|
|
},
|
|
'''
|
|
|
|
def service_formetter(summeries):
|
|
total_amount = defaultdict(Decimal)
|
|
for service_type, type_costs, service_amount in sorted(summeries, key=lambda service: service[2],
|
|
reverse=True):
|
|
message = '\n'.join(
|
|
'%s: %s' % (type, format_currency(cost, service_type.currency, decimal_quantization=False))
|
|
for type, cost in type_costs
|
|
)
|
|
service_name = '%s(%s) : %s ' % (
|
|
service_type.service,
|
|
service_type.currency,
|
|
format_currency(service_amount, service_type.currency, decimal_quantization=False)
|
|
)
|
|
yield {
|
|
'fallback': '%s\n%s' % (service_name, message),
|
|
"color": "#ffffff",
|
|
'title': service_name,
|
|
'text': message
|
|
}
|
|
total_amount[service_type.currency] += service_amount
|
|
# totals
|
|
for currency, total in total_amount.items():
|
|
total_amount_text = format_currency(total, currency, decimal_quantization=False)
|
|
message = 'TOT(%s): %s' % (currency, total_amount_text)
|
|
yield {
|
|
'fallback': message,
|
|
"color": "#ffffff",
|
|
'title': message,
|
|
}
|
|
|
|
slack_data = {
|
|
'text': "Daily AWS Billing Report :macos: from %s to %s" % (
|
|
start_month.strftime('%y-%m-%d'), today.strftime('%y-%m-%d')),
|
|
"attachments": [
|
|
*service_formetter(self.summery_costs(costs)),
|
|
{
|
|
"fallback": "Check out more at %s" % billing_url,
|
|
"color": "#eeeeee",
|
|
"title": "Check out more detils from AWS billing site",
|
|
"title_link": billing_url,
|
|
"footer": "Linus Billing Bot :confused_parrot:",
|
|
}
|
|
]
|
|
}
|
|
await self.send_msg(slack_data)
|
|
|
|
@memoized
|
|
async def get_current_manifest(
|
|
self,
|
|
standard_date=date.today(),
|
|
modified_entry=None
|
|
):
|
|
duration = '%s-%s' % (standard_date.replace(day=1).strftime('%Y%m%d'),
|
|
(standard_date.replace(day=1) + relativedelta(months=1)).strftime('%Y%m%d'))
|
|
request_path = path.join(self.bucket_path, duration, self.manifest_name)
|
|
try:
|
|
req_args = (
|
|
self.bucket_name,
|
|
request_path,
|
|
dict(modified_entry.params(self.bucket_name, request_path)) if modified_entry is not None else {}
|
|
)
|
|
response = await self.loop.run_in_executor(
|
|
None,
|
|
lambda name, key, kwargs: self.s3.get_object(Bucket=name, Key=key, **kwargs),
|
|
*req_args
|
|
)
|
|
obj = ujson.load(response['Body'])
|
|
return (
|
|
obj['reportKeys'][0],
|
|
ObjEntry(self.bucket_name, request_path, response['ETag'], response['LastModified'])
|
|
)
|
|
|
|
except ClientError as e:
|
|
if e.response['Error']['Code'] == '304':
|
|
return None, modified_entry
|
|
raise e
|
|
|
|
@memoized
|
|
async def get_current_cost_csv(self, report_path, modified_entry=None):
|
|
try:
|
|
|
|
req_args = (
|
|
self.bucket_name,
|
|
report_path,
|
|
dict(modified_entry.params(self.bucket_name, report_path)) if modified_entry is not None else {}
|
|
)
|
|
response = await self.loop.run_in_executor(
|
|
None,
|
|
lambda name, key, kwargs: self.s3.get_object(Bucket=name, Key=key, **kwargs),
|
|
*req_args
|
|
)
|
|
|
|
unzipped = TextIOWrapper(GzipFile(mode='rb', fileobj=response['Body']), encoding='utf-8')
|
|
cost_reader = csv.reader(unzipped)
|
|
keys = next(cost_reader)
|
|
return (
|
|
[
|
|
item for item in filter(
|
|
lambda cost: cost.amount != 0,
|
|
map(
|
|
lambda usage: Cost(**{k: v for k, v in zip(keys, usage)}),
|
|
cost_reader
|
|
)
|
|
)
|
|
],
|
|
ObjEntry(self.bucket_name, report_path, response['ETag'], response['LastModified'])
|
|
)
|
|
|
|
except ClientError as e:
|
|
if e.response['Error']['Code'] == '304':
|
|
return None, modified_entry
|
|
raise e
|
|
|
|
def summery_costs(self, costs):
|
|
def cost_summation(amounts, cost):
|
|
amounts[cost.key] += cost.amount
|
|
return amounts
|
|
|
|
amount_sumation = reduce(cost_summation, costs, defaultdict(Decimal))
|
|
|
|
def subcost_exteact(type_cost_item):
|
|
for vk, amt in type_cost_item:
|
|
yield vk.type, amt
|
|
|
|
for k, v in groupby(sorted(amount_sumation.items(), key=lambda item: item[0]), key=lambda item: item[0].key):
|
|
sv = sorted(subcost_exteact(v), key=lambda item: item[1], reverse=True)
|
|
yield (
|
|
k,
|
|
sv,
|
|
sum(map(lambda v: v[1], sv))
|
|
)
|