코드 정리
This commit is contained in:
parent
a15616b4a5
commit
4f99ba5f7d
413
bill_man.py
413
bill_man.py
|
@ -1,400 +1,19 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
import csv
|
|
||||||
import functools
|
|
||||||
import ujson
|
|
||||||
from collections import namedtuple, defaultdict
|
|
||||||
from datetime import date, datetime, time
|
|
||||||
from decimal import Decimal
|
|
||||||
from functools import reduce
|
|
||||||
from gzip import GzipFile
|
|
||||||
from io import TextIOWrapper
|
|
||||||
from itertools import groupby
|
|
||||||
from os import path, kill
|
|
||||||
|
|
||||||
import pytz
|
|
||||||
import requests
|
|
||||||
from dateutil.relativedelta import relativedelta
|
|
||||||
|
|
||||||
from util import EnteringLoop
|
|
||||||
from util.log import wrap
|
|
||||||
|
|
||||||
requests.models.json = ujson
|
|
||||||
|
|
||||||
from babel.numbers import format_currency
|
|
||||||
from botocore.exceptions import ClientError
|
|
||||||
from botocore.session import get_session
|
|
||||||
from requests import adapters
|
|
||||||
from apscheduler.util import undefined
|
|
||||||
from apscheduler.executors.asyncio import AsyncIOExecutor
|
|
||||||
from apscheduler.jobstores.memory import MemoryJobStore
|
|
||||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
||||||
|
|
||||||
import argparse
|
|
||||||
import logging
|
import logging
|
||||||
from util import install_except_hook
|
from datetime import time, datetime
|
||||||
from signal import SIGQUIT, SIGINT, SIGABRT, SIGTERM
|
|
||||||
|
|
||||||
UTCTZ = pytz.utc
|
from util import install_except_hook, KSTTZ, UTCTZ
|
||||||
# localize 안하면 timezone이 canonical하게 표현되지 않는다 .
|
|
||||||
KSTTZ = pytz.timezone('Asia/Seoul').localize(datetime.now()).tzinfo
|
|
||||||
|
|
||||||
|
|
||||||
class ObjEntry(namedtuple('ModifiedEntry',
|
|
||||||
('bucket',
|
|
||||||
'path',
|
|
||||||
'etags',
|
|
||||||
'modified',
|
|
||||||
))
|
|
||||||
):
|
|
||||||
|
|
||||||
def __new__(cls, *args, **kwargs):
|
|
||||||
item = super(ObjEntry, cls).__new__(cls, *args, **kwargs)
|
|
||||||
return item
|
|
||||||
|
|
||||||
def params(self, bucket, path):
|
|
||||||
if (self.bucket, self.path) != (bucket, path):
|
|
||||||
return ()
|
|
||||||
if self.etags is not None:
|
|
||||||
yield ('IfNoneMatch', self.etags)
|
|
||||||
if self.modified is not None:
|
|
||||||
yield ('IfModifiedSince', self.modified)
|
|
||||||
|
|
||||||
|
|
||||||
class Cost(namedtuple('Cost',
|
|
||||||
('amount',
|
|
||||||
'currency',
|
|
||||||
'service',
|
|
||||||
'type',
|
|
||||||
'start',
|
|
||||||
'end'
|
|
||||||
))
|
|
||||||
):
|
|
||||||
__slots__ = ()
|
|
||||||
|
|
||||||
def __new__(cls, *args, **kwargs):
|
|
||||||
argdict = defaultdict(lambda: None)
|
|
||||||
argdict.update(**kwargs)
|
|
||||||
parse_date = lambda item: datetime.strptime(item, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=UTCTZ).astimezone(KSTTZ)
|
|
||||||
args = (
|
|
||||||
(Decimal(argdict['lineItem/BlendedCost']) if argdict['lineItem/BlendedCost'] else None),
|
|
||||||
(argdict['lineItem/CurrencyCode'] if argdict['lineItem/CurrencyCode'] else None),
|
|
||||||
(argdict['product/ProductName'] if argdict['product/ProductName'] else None),
|
|
||||||
(argdict['lineItem/UsageType'] if argdict['lineItem/UsageType'] else None),
|
|
||||||
(parse_date(argdict['lineItem/UsageStartDate']) if argdict['lineItem/UsageStartDate'] else None),
|
|
||||||
(parse_date(argdict['lineItem/UsageEndDate']) if argdict['lineItem/UsageEndDate'] else None)
|
|
||||||
)
|
|
||||||
item = super(Cost, cls).__new__(cls, *args)
|
|
||||||
return item
|
|
||||||
|
|
||||||
@property
|
|
||||||
def key(self):
|
|
||||||
return CostSummery(self.currency, self.service, self.type)
|
|
||||||
|
|
||||||
|
|
||||||
class CostSubItem(namedtuple('CostSubItem',
|
|
||||||
('currency',
|
|
||||||
'service',
|
|
||||||
))
|
|
||||||
):
|
|
||||||
__slots__ = ()
|
|
||||||
|
|
||||||
def __new__(cls, *args):
|
|
||||||
item = super(CostSubItem, cls).__new__(cls, *args)
|
|
||||||
return item
|
|
||||||
|
|
||||||
|
|
||||||
class CostSummery(namedtuple('CostSummery',
|
|
||||||
('currency',
|
|
||||||
'service',
|
|
||||||
'type'
|
|
||||||
))
|
|
||||||
):
|
|
||||||
__slots__ = ()
|
|
||||||
|
|
||||||
def __new__(cls, *args):
|
|
||||||
item = super(CostSummery, cls).__new__(cls, *args)
|
|
||||||
return item
|
|
||||||
|
|
||||||
@property
|
|
||||||
def key(self):
|
|
||||||
return CostSubItem(self.currency, self.service)
|
|
||||||
|
|
||||||
|
|
||||||
def memoized(func):
|
|
||||||
cache = None
|
|
||||||
entry = None
|
|
||||||
|
|
||||||
async def callee(*args, **kwargs):
|
|
||||||
nonlocal entry, cache, func
|
|
||||||
kwargs['modified_entry'] = entry
|
|
||||||
returned, entry = await func(*args, **kwargs)
|
|
||||||
if returned is not None:
|
|
||||||
cache = returned
|
|
||||||
return returned
|
|
||||||
elif cache is None:
|
|
||||||
raise RuntimeError('no ret value')
|
|
||||||
else:
|
|
||||||
return cache
|
|
||||||
|
|
||||||
# @functools.wraps 대신
|
|
||||||
functools.update_wrapper(callee, func)
|
|
||||||
return callee
|
|
||||||
|
|
||||||
|
|
||||||
class Session:
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
loop,
|
|
||||||
aws_access_key_id,
|
|
||||||
aws_secret_access_key,
|
|
||||||
aws_region_name,
|
|
||||||
s3_bucket_name,
|
|
||||||
s3_bucket_key,
|
|
||||||
s3_manifest_name,
|
|
||||||
slack_webhook_url,
|
|
||||||
):
|
|
||||||
self.log = logging.getLogger('bill_man.session')
|
|
||||||
self.loop = loop
|
|
||||||
self.webhook_url = slack_webhook_url
|
|
||||||
self.s3 = self.s3_client(aws_access_key_id, aws_secret_access_key, aws_region_name)
|
|
||||||
self.region_name = aws_region_name
|
|
||||||
self.bucket_name = s3_bucket_name
|
|
||||||
self.bucket_path = s3_bucket_key
|
|
||||||
self.manifest_name = s3_manifest_name
|
|
||||||
|
|
||||||
self.handler = self.request_hander()
|
|
||||||
|
|
||||||
def request_hander(self):
|
|
||||||
# 커넥션 관리 max_retries?
|
|
||||||
adapter = adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100)
|
|
||||||
# 이것은 알아서 cookie를 관리해준다.
|
|
||||||
handler = requests.Session()
|
|
||||||
# handler.stream = True
|
|
||||||
handler.mount("http://", adapter)
|
|
||||||
handler.mount("https://", adapter)
|
|
||||||
|
|
||||||
handler.headers.update({
|
|
||||||
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3514.2 Safari/537.36'
|
|
||||||
})
|
|
||||||
return handler
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
if self.handler is not None:
|
|
||||||
self.handler.close()
|
|
||||||
self.handler = None
|
|
||||||
|
|
||||||
def s3_client(self, aws_access_key_id, aws_secret_access_key, aws_region_name):
|
|
||||||
return get_session().create_client(
|
|
||||||
's3',
|
|
||||||
aws_access_key_id=aws_access_key_id,
|
|
||||||
aws_secret_access_key=aws_secret_access_key,
|
|
||||||
region_name=aws_region_name,
|
|
||||||
)
|
|
||||||
|
|
||||||
async def send_msg(self, messages):
|
|
||||||
self.log.info("sending a message")
|
|
||||||
req_args = (
|
|
||||||
self.webhook_url,
|
|
||||||
{
|
|
||||||
'headers': {'Content-Type': 'application/json'},
|
|
||||||
'json': messages
|
|
||||||
}
|
|
||||||
)
|
|
||||||
response = await self.loop.run_in_executor(
|
|
||||||
None,
|
|
||||||
lambda url, kwargs: self.handler.post(url, **kwargs),
|
|
||||||
*req_args
|
|
||||||
)
|
|
||||||
|
|
||||||
if response.status_code != 200:
|
|
||||||
raise ValueError(
|
|
||||||
'Request to slack returned an error %s, the response is:\n%s'
|
|
||||||
% (response.status_code, response.text)
|
|
||||||
)
|
|
||||||
self.log.info("message sent")
|
|
||||||
|
|
||||||
async def get_resources(self, standard_time):
|
|
||||||
billing_url = 'https://console.aws.amazon.com/billing/home?region=%s#/' % (self.region_name)
|
|
||||||
|
|
||||||
now=datetime.utcnow().replace(tzinfo=UTCTZ).astimezone(KSTTZ)
|
|
||||||
today = date.fromordinal(now.toordinal())
|
|
||||||
|
|
||||||
if datetime.combine(today, standard_time) >= now:
|
|
||||||
today -= relativedelta(days=1)
|
|
||||||
report_time = datetime.combine(today, standard_time)
|
|
||||||
start_month = today.replace(day=1)
|
|
||||||
|
|
||||||
report_file_path = await self.get_current_manifest(standard_date=today)
|
|
||||||
|
|
||||||
costs = filter(
|
|
||||||
lambda cost: cost.end < report_time,
|
|
||||||
await self.get_current_cost_csv(report_file_path)
|
|
||||||
)
|
|
||||||
|
|
||||||
'''
|
|
||||||
{
|
|
||||||
"fallback": "Required plain-text summary of the attachment.",
|
|
||||||
"color": "#36a64f",
|
|
||||||
"pretext": "Optional text that appears above the attachment block",
|
|
||||||
"author_name": "Bobby Tables",
|
|
||||||
"author_link": "http://flickr.com/bobby/",
|
|
||||||
"author_icon": "http://flickr.com/icons/bobby.jpg",
|
|
||||||
"title": "Slack API Documentation",
|
|
||||||
"title_link": "https://api.slack.com/",
|
|
||||||
"text": "Optional text that appears within the attachment",
|
|
||||||
"fields": [
|
|
||||||
{
|
|
||||||
"title": "Priority",
|
|
||||||
"value": "High",
|
|
||||||
"short": False
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"image_url": "http://my-website.com/path/to/image.jpg",
|
|
||||||
"thumb_url": "http://example.com/path/to/thumb.png",
|
|
||||||
"footer": "Slack API",
|
|
||||||
"footer_icon": "https://platform.slack-edge.com/img/default_application_icon.png",
|
|
||||||
"ts": 123456789
|
|
||||||
},
|
|
||||||
'''
|
|
||||||
|
|
||||||
def service_formetter(summeries):
|
|
||||||
total_amount = defaultdict(Decimal)
|
|
||||||
for service_type, type_costs, service_amount in sorted(summeries, key=lambda service: service[2],
|
|
||||||
reverse=True):
|
|
||||||
message = '\n'.join(
|
|
||||||
'%s: %s' % (type, format_currency(cost, service_type.currency, decimal_quantization=False))
|
|
||||||
for type, cost in type_costs
|
|
||||||
)
|
|
||||||
service_name = '%s(%s) : %s ' % (
|
|
||||||
service_type.service,
|
|
||||||
service_type.currency,
|
|
||||||
format_currency(service_amount, service_type.currency, decimal_quantization=False)
|
|
||||||
)
|
|
||||||
yield {
|
|
||||||
'fallback': '%s\n%s' % (service_name, message),
|
|
||||||
"color": "#ffffff",
|
|
||||||
'title': service_name,
|
|
||||||
'text': message
|
|
||||||
}
|
|
||||||
total_amount[service_type.currency] += service_amount
|
|
||||||
# totals
|
|
||||||
for currency, total in total_amount.items():
|
|
||||||
total_amount_text = format_currency(total, currency, decimal_quantization=False)
|
|
||||||
message = 'TOT(%s): %s' % (currency, total_amount_text)
|
|
||||||
yield {
|
|
||||||
'fallback': message,
|
|
||||||
"color": "#ffffff",
|
|
||||||
'title': message,
|
|
||||||
}
|
|
||||||
|
|
||||||
slack_data = {
|
|
||||||
'text': "Daily AWS Billing Report :macos: from %s to %s" % (
|
|
||||||
start_month.strftime('%y-%m-%d'), today.strftime('%y-%m-%d')),
|
|
||||||
"attachments": [
|
|
||||||
*service_formetter(self.summery_costs(costs)),
|
|
||||||
{
|
|
||||||
"fallback": "Check out more at %s" % billing_url,
|
|
||||||
"color": "#eeeeee",
|
|
||||||
"title": "Check out more detils from AWS billing site",
|
|
||||||
"title_link": billing_url,
|
|
||||||
"footer": "Linus Billing Bot :confused_parrot:",
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
await self.send_msg(slack_data)
|
|
||||||
|
|
||||||
@memoized
|
|
||||||
async def get_current_manifest(
|
|
||||||
self,
|
|
||||||
standard_date=date.today(),
|
|
||||||
modified_entry=None
|
|
||||||
):
|
|
||||||
duration = '%s-%s' % (standard_date.replace(day=1).strftime('%Y%m%d'),
|
|
||||||
(standard_date.replace(day=1) + relativedelta(months=1)).strftime('%Y%m%d'))
|
|
||||||
request_path = path.join(self.bucket_path, duration, self.manifest_name)
|
|
||||||
try:
|
|
||||||
req_args = (
|
|
||||||
self.bucket_name,
|
|
||||||
request_path,
|
|
||||||
dict(modified_entry.params(self.bucket_name, request_path)) if modified_entry is not None else {}
|
|
||||||
)
|
|
||||||
response = await self.loop.run_in_executor(
|
|
||||||
None,
|
|
||||||
lambda name, key, kwargs: self.s3.get_object(Bucket=name, Key=key, **kwargs),
|
|
||||||
*req_args
|
|
||||||
)
|
|
||||||
obj = ujson.load(response['Body'])
|
|
||||||
return (
|
|
||||||
obj['reportKeys'][0],
|
|
||||||
ObjEntry(self.bucket_name, request_path, response['ETag'], response['LastModified'])
|
|
||||||
)
|
|
||||||
|
|
||||||
except ClientError as e:
|
|
||||||
if e.response['Error']['Code'] == '304':
|
|
||||||
return None, modified_entry
|
|
||||||
raise e
|
|
||||||
|
|
||||||
@memoized
|
|
||||||
async def get_current_cost_csv(self, report_path, modified_entry=None):
|
|
||||||
try:
|
|
||||||
|
|
||||||
req_args = (
|
|
||||||
self.bucket_name,
|
|
||||||
report_path,
|
|
||||||
dict(modified_entry.params(self.bucket_name, report_path)) if modified_entry is not None else {}
|
|
||||||
)
|
|
||||||
response = await self.loop.run_in_executor(
|
|
||||||
None,
|
|
||||||
lambda name, key, kwargs: self.s3.get_object(Bucket=name, Key=key, **kwargs),
|
|
||||||
*req_args
|
|
||||||
)
|
|
||||||
|
|
||||||
unzipped = TextIOWrapper(GzipFile(mode='rb', fileobj=response['Body']), encoding='utf-8')
|
|
||||||
cost_reader = csv.reader(unzipped)
|
|
||||||
keys = next(cost_reader)
|
|
||||||
return (
|
|
||||||
[
|
|
||||||
item for item in filter(
|
|
||||||
lambda cost: cost.amount != 0,
|
|
||||||
map(
|
|
||||||
lambda usage: Cost(**{k: v for k, v in zip(keys, usage)}),
|
|
||||||
cost_reader
|
|
||||||
)
|
|
||||||
)
|
|
||||||
],
|
|
||||||
ObjEntry(self.bucket_name, report_path, response['ETag'], response['LastModified'])
|
|
||||||
)
|
|
||||||
|
|
||||||
except ClientError as e:
|
|
||||||
if e.response['Error']['Code'] == '304':
|
|
||||||
return None, modified_entry
|
|
||||||
raise e
|
|
||||||
|
|
||||||
def summery_costs(self, costs):
|
|
||||||
def cost_summation(amounts, cost):
|
|
||||||
amounts[cost.key] += cost.amount
|
|
||||||
return amounts
|
|
||||||
|
|
||||||
amount_sumation = reduce(cost_summation, costs, defaultdict(Decimal))
|
|
||||||
|
|
||||||
def subcost_exteact(type_cost_item):
|
|
||||||
for vk, amt in type_cost_item:
|
|
||||||
yield vk.type, amt
|
|
||||||
|
|
||||||
for k, v in groupby(sorted(amount_sumation.items(), key=lambda item: item[0]), key=lambda item: item[0].key):
|
|
||||||
sv = sorted(subcost_exteact(v), key=lambda item: item[1], reverse=True)
|
|
||||||
yield (
|
|
||||||
k,
|
|
||||||
sv,
|
|
||||||
sum(map(lambda v: v[1], sv))
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def handle_daemon(name, action, args):
|
def handle_daemon(name, action, args):
|
||||||
log = logging.getLogger("main_loop")
|
log = logging.getLogger("main_loop")
|
||||||
|
from signal import SIGTERM
|
||||||
|
from util.log import wrap
|
||||||
|
|
||||||
if action == 'start':
|
if action == 'start':
|
||||||
# load config
|
# load config
|
||||||
|
from util import EnteringLoop
|
||||||
with EnteringLoop(
|
with EnteringLoop(
|
||||||
name,
|
name,
|
||||||
log_dir=args.log_location,
|
log_dir=args.log_location,
|
||||||
|
@ -402,6 +21,14 @@ def handle_daemon(name, action, args):
|
||||||
is_forground=args.foreground,
|
is_forground=args.foreground,
|
||||||
pid_path=args.pid_path
|
pid_path=args.pid_path
|
||||||
) as loop:
|
) as loop:
|
||||||
|
from apscheduler.jobstores.memory import MemoryJobStore
|
||||||
|
from apscheduler.executors.asyncio import AsyncIOExecutor
|
||||||
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||||
|
from apscheduler.util import undefined
|
||||||
|
from signal import SIGTERM, SIGABRT, SIGINT, SIGQUIT
|
||||||
|
from handler import Session
|
||||||
|
import ujson
|
||||||
|
|
||||||
jobstores = {
|
jobstores = {
|
||||||
'default': MemoryJobStore()
|
'default': MemoryJobStore()
|
||||||
}
|
}
|
||||||
|
@ -431,7 +58,8 @@ def handle_daemon(name, action, args):
|
||||||
minute=args.standard_time.minute,
|
minute=args.standard_time.minute,
|
||||||
second=args.standard_time.second,
|
second=args.standard_time.second,
|
||||||
args=(args.standard_time,),
|
args=(args.standard_time,),
|
||||||
next_run_time=datetime.utcnow().replace(tzinfo=UTCTZ).astimezone(KSTTZ) if args.immediately else undefined
|
next_run_time=datetime.utcnow().replace(tzinfo=UTCTZ).astimezone(
|
||||||
|
KSTTZ) if args.immediately else undefined
|
||||||
)
|
)
|
||||||
scheduler.start()
|
scheduler.start()
|
||||||
|
|
||||||
|
@ -452,6 +80,7 @@ def handle_daemon(name, action, args):
|
||||||
loop.run_forever()
|
loop.run_forever()
|
||||||
log.info("bye!")
|
log.info("bye!")
|
||||||
elif action == 'stop':
|
elif action == 'stop':
|
||||||
|
from os import path, kill
|
||||||
wrap(name, level=args.log_level, stderr=True)
|
wrap(name, level=args.log_level, stderr=True)
|
||||||
if not path.exists(args.pid_path):
|
if not path.exists(args.pid_path):
|
||||||
log.warning("cannot find pidfile(%s)", args.pid_path)
|
log.warning("cannot find pidfile(%s)", args.pid_path)
|
||||||
|
@ -465,6 +94,9 @@ def handle_daemon(name, action, args):
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
import argparse
|
||||||
|
from functools import partial
|
||||||
|
|
||||||
install_except_hook()
|
install_except_hook()
|
||||||
|
|
||||||
# 실행 플래그 파싱
|
# 실행 플래그 파싱
|
||||||
|
@ -513,11 +145,10 @@ if __name__ == '__main__':
|
||||||
),
|
),
|
||||||
default='120000')
|
default='120000')
|
||||||
|
|
||||||
sp_start.set_defaults(func=functools.partial(handle_daemon, parser.prog, 'start'))
|
sp_start.set_defaults(func=partial(handle_daemon, parser.prog, 'start'))
|
||||||
sp_stop = sp.add_parser('stop', help='Stop %(prog)s daemon',
|
sp_stop = sp.add_parser('stop', help='Stop %(prog)s daemon',
|
||||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||||
sp_stop.set_defaults(func=functools.partial(handle_daemon, parser.prog, 'stop'))
|
sp_stop.set_defaults(func=partial(handle_daemon, parser.prog, 'stop'))
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
args.func(args)
|
args.func(args)
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,274 @@
|
||||||
|
import csv
|
||||||
|
import ujson
|
||||||
|
from collections import defaultdict
|
||||||
|
from datetime import date, datetime
|
||||||
|
from decimal import Decimal
|
||||||
|
from functools import reduce
|
||||||
|
from gzip import GzipFile
|
||||||
|
from io import TextIOWrapper
|
||||||
|
from itertools import groupby
|
||||||
|
from os import path
|
||||||
|
|
||||||
|
import requests
|
||||||
|
from dateutil.relativedelta import relativedelta
|
||||||
|
|
||||||
|
from models import ObjEntry, Cost
|
||||||
|
from util import UTCTZ, KSTTZ, memoized
|
||||||
|
|
||||||
|
requests.models.json = ujson
|
||||||
|
|
||||||
|
from babel.numbers import format_currency
|
||||||
|
from botocore.exceptions import ClientError
|
||||||
|
from botocore.session import get_session
|
||||||
|
from requests import adapters
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
|
||||||
|
class Session:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
loop,
|
||||||
|
aws_access_key_id,
|
||||||
|
aws_secret_access_key,
|
||||||
|
aws_region_name,
|
||||||
|
s3_bucket_name,
|
||||||
|
s3_bucket_key,
|
||||||
|
s3_manifest_name,
|
||||||
|
slack_webhook_url,
|
||||||
|
):
|
||||||
|
self.log = logging.getLogger('bill_man.session')
|
||||||
|
self.loop = loop
|
||||||
|
self.webhook_url = slack_webhook_url
|
||||||
|
self.s3 = self.s3_client(aws_access_key_id, aws_secret_access_key, aws_region_name)
|
||||||
|
self.region_name = aws_region_name
|
||||||
|
self.bucket_name = s3_bucket_name
|
||||||
|
self.bucket_path = s3_bucket_key
|
||||||
|
self.manifest_name = s3_manifest_name
|
||||||
|
|
||||||
|
self.handler = self.request_hander()
|
||||||
|
|
||||||
|
def request_hander(self):
|
||||||
|
# 커넥션 관리 max_retries?
|
||||||
|
adapter = adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100)
|
||||||
|
# 이것은 알아서 cookie를 관리해준다.
|
||||||
|
handler = requests.Session()
|
||||||
|
# handler.stream = True
|
||||||
|
handler.mount("http://", adapter)
|
||||||
|
handler.mount("https://", adapter)
|
||||||
|
|
||||||
|
handler.headers.update({
|
||||||
|
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3514.2 Safari/537.36'
|
||||||
|
})
|
||||||
|
return handler
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
if self.handler is not None:
|
||||||
|
self.handler.close()
|
||||||
|
self.handler = None
|
||||||
|
|
||||||
|
def s3_client(self, aws_access_key_id, aws_secret_access_key, aws_region_name):
|
||||||
|
return get_session().create_client(
|
||||||
|
's3',
|
||||||
|
aws_access_key_id=aws_access_key_id,
|
||||||
|
aws_secret_access_key=aws_secret_access_key,
|
||||||
|
region_name=aws_region_name,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def send_msg(self, messages):
|
||||||
|
self.log.info("sending a message")
|
||||||
|
req_args = (
|
||||||
|
self.webhook_url,
|
||||||
|
{
|
||||||
|
'headers': {'Content-Type': 'application/json'},
|
||||||
|
'json': messages
|
||||||
|
}
|
||||||
|
)
|
||||||
|
response = await self.loop.run_in_executor(
|
||||||
|
None,
|
||||||
|
lambda url, kwargs: self.handler.post(url, **kwargs),
|
||||||
|
*req_args
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code != 200:
|
||||||
|
raise ValueError(
|
||||||
|
'Request to slack returned an error %s, the response is:\n%s'
|
||||||
|
% (response.status_code, response.text)
|
||||||
|
)
|
||||||
|
self.log.info("message sent")
|
||||||
|
|
||||||
|
async def get_resources(self, standard_time):
|
||||||
|
billing_url = 'https://console.aws.amazon.com/billing/home?region=%s#/' % (self.region_name)
|
||||||
|
|
||||||
|
now = datetime.utcnow().replace(tzinfo=UTCTZ).astimezone(KSTTZ)
|
||||||
|
today = date.fromordinal(now.toordinal())
|
||||||
|
|
||||||
|
if datetime.combine(today, standard_time) >= now:
|
||||||
|
today -= relativedelta(days=1)
|
||||||
|
report_time = datetime.combine(today, standard_time)
|
||||||
|
start_month = today.replace(day=1)
|
||||||
|
|
||||||
|
report_file_path = await self.get_current_manifest(standard_date=today)
|
||||||
|
|
||||||
|
costs = filter(
|
||||||
|
lambda cost: cost.end < report_time,
|
||||||
|
await self.get_current_cost_csv(report_file_path)
|
||||||
|
)
|
||||||
|
|
||||||
|
'''
|
||||||
|
{
|
||||||
|
"fallback": "Required plain-text summary of the attachment.",
|
||||||
|
"color": "#36a64f",
|
||||||
|
"pretext": "Optional text that appears above the attachment block",
|
||||||
|
"author_name": "Bobby Tables",
|
||||||
|
"author_link": "http://flickr.com/bobby/",
|
||||||
|
"author_icon": "http://flickr.com/icons/bobby.jpg",
|
||||||
|
"title": "Slack API Documentation",
|
||||||
|
"title_link": "https://api.slack.com/",
|
||||||
|
"text": "Optional text that appears within the attachment",
|
||||||
|
"fields": [
|
||||||
|
{
|
||||||
|
"title": "Priority",
|
||||||
|
"value": "High",
|
||||||
|
"short": False
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"image_url": "http://my-website.com/path/to/image.jpg",
|
||||||
|
"thumb_url": "http://example.com/path/to/thumb.png",
|
||||||
|
"footer": "Slack API",
|
||||||
|
"footer_icon": "https://platform.slack-edge.com/img/default_application_icon.png",
|
||||||
|
"ts": 123456789
|
||||||
|
},
|
||||||
|
'''
|
||||||
|
|
||||||
|
def service_formetter(summeries):
|
||||||
|
total_amount = defaultdict(Decimal)
|
||||||
|
for service_type, type_costs, service_amount in sorted(summeries, key=lambda service: service[2],
|
||||||
|
reverse=True):
|
||||||
|
message = '\n'.join(
|
||||||
|
'%s: %s' % (type, format_currency(cost, service_type.currency, decimal_quantization=False))
|
||||||
|
for type, cost in type_costs
|
||||||
|
)
|
||||||
|
service_name = '%s(%s) : %s ' % (
|
||||||
|
service_type.service,
|
||||||
|
service_type.currency,
|
||||||
|
format_currency(service_amount, service_type.currency, decimal_quantization=False)
|
||||||
|
)
|
||||||
|
yield {
|
||||||
|
'fallback': '%s\n%s' % (service_name, message),
|
||||||
|
"color": "#ffffff",
|
||||||
|
'title': service_name,
|
||||||
|
'text': message
|
||||||
|
}
|
||||||
|
total_amount[service_type.currency] += service_amount
|
||||||
|
# totals
|
||||||
|
for currency, total in total_amount.items():
|
||||||
|
total_amount_text = format_currency(total, currency, decimal_quantization=False)
|
||||||
|
message = 'TOT(%s): %s' % (currency, total_amount_text)
|
||||||
|
yield {
|
||||||
|
'fallback': message,
|
||||||
|
"color": "#ffffff",
|
||||||
|
'title': message,
|
||||||
|
}
|
||||||
|
|
||||||
|
slack_data = {
|
||||||
|
'text': "Daily AWS Billing Report :macos: from %s to %s" % (
|
||||||
|
start_month.strftime('%y-%m-%d'), today.strftime('%y-%m-%d')),
|
||||||
|
"attachments": [
|
||||||
|
*service_formetter(self.summery_costs(costs)),
|
||||||
|
{
|
||||||
|
"fallback": "Check out more at %s" % billing_url,
|
||||||
|
"color": "#eeeeee",
|
||||||
|
"title": "Check out more detils from AWS billing site",
|
||||||
|
"title_link": billing_url,
|
||||||
|
"footer": "Linus Billing Bot :confused_parrot:",
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
await self.send_msg(slack_data)
|
||||||
|
|
||||||
|
@memoized
|
||||||
|
async def get_current_manifest(
|
||||||
|
self,
|
||||||
|
standard_date=date.today(),
|
||||||
|
modified_entry=None
|
||||||
|
):
|
||||||
|
duration = '%s-%s' % (standard_date.replace(day=1).strftime('%Y%m%d'),
|
||||||
|
(standard_date.replace(day=1) + relativedelta(months=1)).strftime('%Y%m%d'))
|
||||||
|
request_path = path.join(self.bucket_path, duration, self.manifest_name)
|
||||||
|
try:
|
||||||
|
req_args = (
|
||||||
|
self.bucket_name,
|
||||||
|
request_path,
|
||||||
|
dict(modified_entry.params(self.bucket_name, request_path)) if modified_entry is not None else {}
|
||||||
|
)
|
||||||
|
response = await self.loop.run_in_executor(
|
||||||
|
None,
|
||||||
|
lambda name, key, kwargs: self.s3.get_object(Bucket=name, Key=key, **kwargs),
|
||||||
|
*req_args
|
||||||
|
)
|
||||||
|
obj = ujson.load(response['Body'])
|
||||||
|
return (
|
||||||
|
obj['reportKeys'][0],
|
||||||
|
ObjEntry(self.bucket_name, request_path, response['ETag'], response['LastModified'])
|
||||||
|
)
|
||||||
|
|
||||||
|
except ClientError as e:
|
||||||
|
if e.response['Error']['Code'] == '304':
|
||||||
|
return None, modified_entry
|
||||||
|
raise e
|
||||||
|
|
||||||
|
@memoized
|
||||||
|
async def get_current_cost_csv(self, report_path, modified_entry=None):
|
||||||
|
try:
|
||||||
|
|
||||||
|
req_args = (
|
||||||
|
self.bucket_name,
|
||||||
|
report_path,
|
||||||
|
dict(modified_entry.params(self.bucket_name, report_path)) if modified_entry is not None else {}
|
||||||
|
)
|
||||||
|
response = await self.loop.run_in_executor(
|
||||||
|
None,
|
||||||
|
lambda name, key, kwargs: self.s3.get_object(Bucket=name, Key=key, **kwargs),
|
||||||
|
*req_args
|
||||||
|
)
|
||||||
|
|
||||||
|
unzipped = TextIOWrapper(GzipFile(mode='rb', fileobj=response['Body']), encoding='utf-8')
|
||||||
|
cost_reader = csv.reader(unzipped)
|
||||||
|
keys = next(cost_reader)
|
||||||
|
return (
|
||||||
|
[
|
||||||
|
item for item in filter(
|
||||||
|
lambda cost: cost.amount != 0,
|
||||||
|
map(
|
||||||
|
lambda usage: Cost(**{k: v for k, v in zip(keys, usage)}),
|
||||||
|
cost_reader
|
||||||
|
)
|
||||||
|
)
|
||||||
|
],
|
||||||
|
ObjEntry(self.bucket_name, report_path, response['ETag'], response['LastModified'])
|
||||||
|
)
|
||||||
|
|
||||||
|
except ClientError as e:
|
||||||
|
if e.response['Error']['Code'] == '304':
|
||||||
|
return None, modified_entry
|
||||||
|
raise e
|
||||||
|
|
||||||
|
def summery_costs(self, costs):
|
||||||
|
def cost_summation(amounts, cost):
|
||||||
|
amounts[cost.key] += cost.amount
|
||||||
|
return amounts
|
||||||
|
|
||||||
|
amount_sumation = reduce(cost_summation, costs, defaultdict(Decimal))
|
||||||
|
|
||||||
|
def subcost_exteact(type_cost_item):
|
||||||
|
for vk, amt in type_cost_item:
|
||||||
|
yield vk.type, amt
|
||||||
|
|
||||||
|
for k, v in groupby(sorted(amount_sumation.items(), key=lambda item: item[0]), key=lambda item: item[0].key):
|
||||||
|
sv = sorted(subcost_exteact(v), key=lambda item: item[1], reverse=True)
|
||||||
|
yield (
|
||||||
|
k,
|
||||||
|
sv,
|
||||||
|
sum(map(lambda v: v[1], sv))
|
||||||
|
)
|
|
@ -0,0 +1,88 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
from collections import namedtuple, defaultdict
|
||||||
|
from datetime import datetime
|
||||||
|
from decimal import Decimal
|
||||||
|
|
||||||
|
from util import UTCTZ, KSTTZ
|
||||||
|
|
||||||
|
|
||||||
|
class ObjEntry(namedtuple('ModifiedEntry',
|
||||||
|
('bucket',
|
||||||
|
'path',
|
||||||
|
'etags',
|
||||||
|
'modified',
|
||||||
|
))
|
||||||
|
):
|
||||||
|
|
||||||
|
def __new__(cls, *args, **kwargs):
|
||||||
|
item = super(ObjEntry, cls).__new__(cls, *args, **kwargs)
|
||||||
|
return item
|
||||||
|
|
||||||
|
def params(self, bucket, path):
|
||||||
|
if (self.bucket, self.path) != (bucket, path):
|
||||||
|
return ()
|
||||||
|
if self.etags is not None:
|
||||||
|
yield ('IfNoneMatch', self.etags)
|
||||||
|
if self.modified is not None:
|
||||||
|
yield ('IfModifiedSince', self.modified)
|
||||||
|
|
||||||
|
|
||||||
|
class Cost(namedtuple('Cost',
|
||||||
|
('amount',
|
||||||
|
'currency',
|
||||||
|
'service',
|
||||||
|
'type',
|
||||||
|
'start',
|
||||||
|
'end'
|
||||||
|
))
|
||||||
|
):
|
||||||
|
__slots__ = ()
|
||||||
|
|
||||||
|
def __new__(cls, *args, **kwargs):
|
||||||
|
argdict = defaultdict(lambda: None)
|
||||||
|
argdict.update(**kwargs)
|
||||||
|
parse_date = lambda item: datetime.strptime(item, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=UTCTZ).astimezone(KSTTZ)
|
||||||
|
args = (
|
||||||
|
(Decimal(argdict['lineItem/BlendedCost']) if argdict['lineItem/BlendedCost'] else None),
|
||||||
|
(argdict['lineItem/CurrencyCode'] if argdict['lineItem/CurrencyCode'] else None),
|
||||||
|
(argdict['product/ProductName'] if argdict['product/ProductName'] else None),
|
||||||
|
(argdict['lineItem/UsageType'] if argdict['lineItem/UsageType'] else None),
|
||||||
|
(parse_date(argdict['lineItem/UsageStartDate']) if argdict['lineItem/UsageStartDate'] else None),
|
||||||
|
(parse_date(argdict['lineItem/UsageEndDate']) if argdict['lineItem/UsageEndDate'] else None)
|
||||||
|
)
|
||||||
|
item = super(Cost, cls).__new__(cls, *args)
|
||||||
|
return item
|
||||||
|
|
||||||
|
@property
|
||||||
|
def key(self):
|
||||||
|
return CostSummery(self.currency, self.service, self.type)
|
||||||
|
|
||||||
|
|
||||||
|
class CostSubItem(namedtuple('CostSubItem',
|
||||||
|
('currency',
|
||||||
|
'service',
|
||||||
|
))
|
||||||
|
):
|
||||||
|
__slots__ = ()
|
||||||
|
|
||||||
|
def __new__(cls, *args):
|
||||||
|
item = super(CostSubItem, cls).__new__(cls, *args)
|
||||||
|
return item
|
||||||
|
|
||||||
|
|
||||||
|
class CostSummery(namedtuple('CostSummery',
|
||||||
|
('currency',
|
||||||
|
'service',
|
||||||
|
'type'
|
||||||
|
))
|
||||||
|
):
|
||||||
|
__slots__ = ()
|
||||||
|
|
||||||
|
def __new__(cls, *args):
|
||||||
|
item = super(CostSummery, cls).__new__(cls, *args)
|
||||||
|
return item
|
||||||
|
|
||||||
|
@property
|
||||||
|
def key(self):
|
||||||
|
return CostSubItem(self.currency, self.service)
|
|
@ -1,7 +1,9 @@
|
||||||
|
from .asyn import EnteringLoop
|
||||||
from .file import to_module_path, find_executable_of_specify, makedirs
|
from .file import to_module_path, find_executable_of_specify, makedirs
|
||||||
from .hook import install_except_hook
|
from .hook import install_except_hook
|
||||||
from .log import wrap as wrap_log
|
from .log import wrap as wrap_log
|
||||||
from .asyn import EnteringLoop
|
from .method import memoized
|
||||||
|
from .tz import UTCTZ, KSTTZ
|
||||||
|
|
||||||
__all__ = (
|
__all__ = (
|
||||||
'install_except_hook',
|
'install_except_hook',
|
||||||
|
@ -9,5 +11,8 @@ __all__ = (
|
||||||
'wrap_log',
|
'wrap_log',
|
||||||
'EnteringLoop',
|
'EnteringLoop',
|
||||||
'to_module_path',
|
'to_module_path',
|
||||||
'makedirs'
|
'makedirs',
|
||||||
|
'memoized',
|
||||||
|
'UTCTZ',
|
||||||
|
'KSTTZ'
|
||||||
)
|
)
|
||||||
|
|
|
@ -8,12 +8,11 @@ Copyright (c) 2014, Stephan Schultchen.
|
||||||
License: MIT (see LICENSE for details)
|
License: MIT (see LICENSE for details)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
from .daemon import DaemonContext, DaemonError
|
from .daemon import DaemonContext, DaemonError
|
||||||
from .pidfile import PidFile
|
from .pidfile import PidFile
|
||||||
|
|
||||||
__all__ = (
|
__all__ = (
|
||||||
"DaemonContext",
|
"DaemonContext",
|
||||||
"DaemonError",
|
"DaemonError",
|
||||||
"PidFile",
|
"PidFile"
|
||||||
)
|
)
|
|
@ -0,0 +1,22 @@
|
||||||
|
import functools
|
||||||
|
|
||||||
|
|
||||||
|
def memoized(func):
|
||||||
|
cache = None
|
||||||
|
entry = None
|
||||||
|
|
||||||
|
async def callee(*args, **kwargs):
|
||||||
|
nonlocal entry, cache, func
|
||||||
|
kwargs['modified_entry'] = entry
|
||||||
|
returned, entry = await func(*args, **kwargs)
|
||||||
|
if returned is not None:
|
||||||
|
cache = returned
|
||||||
|
return returned
|
||||||
|
elif cache is None:
|
||||||
|
raise RuntimeError('no ret value')
|
||||||
|
else:
|
||||||
|
return cache
|
||||||
|
|
||||||
|
# @functools.wraps 대신
|
||||||
|
functools.update_wrapper(callee, func)
|
||||||
|
return callee
|
|
@ -0,0 +1,8 @@
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
import pytz
|
||||||
|
|
||||||
|
UTCTZ = pytz.utc
|
||||||
|
# localize 안하면 timezone이 canonical하게 표현되지 않는다 .
|
||||||
|
KSTTZ = pytz.timezone('Asia/Seoul').localize(datetime.now()).tzinfo
|
||||||
|
|
Loading…
Reference in New Issue