from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
from pytz import timezone
from tzlocal import get_localzone
import logging
# python 2/3 compatibility
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
def iterate_list(service, object_name, max_results=None, max_retries=3, filter_exp=None, break_condition=None, **kwargs):
object_count = 0
resp = service.list(
**kwargs if kwargs is not None else {}
).execute(num_retries=max_retries)
if object_name in resp:
for x in resp[object_name]:
if filter_exp is not None and not filter_exp(x):
continue
# if max_results is None, get all results
if max_results is not None and object_count >= max_results:
resp.pop('nextPageToken', None)
break
# break condition mainly used to limit jobs dates which are sorted in reverse chronological order
if break_condition is not None and break_condition(x):
resp.pop('nextPageToken', None)
break
object_count += 1
yield x
while 'nextPageToken' in resp:
page_token = resp.get('nextPageToken', None)
resp = service.list(
pageToken=page_token,
**kwargs if kwargs is not None else {}
).execute(num_retries=max_retries)
if object_name in resp:
for x in resp[object_name]:
if filter_exp is not None and not filter_exp(x):
continue
if max_results is not None and object_count >= max_results:
resp.pop('nextPageToken', None)
break
if break_condition is not None and break_condition(x):
resp.pop('nextPageToken', None)
break
object_count += 1
yield x
[docs]def timestamp_to_datetime(input_timestamp, tz=None):
"""
Converts epoch timestamp into datetime object.
:param input_timestamp: Epoch timestamp. Microsecond or millisecond inputs accepted.
:type input_timestamp: long
:param tz: String representation of timezone accepted by pytz. eg. 'Asia/Hong_Kong'. If param is unfilled, system timezone is used.
:return: timezone aware datetime object
"""
input_timestamp = long(input_timestamp)
if tz is None:
tz = get_localzone()
else:
tz = timezone(tz)
# if timestamp granularity is microseconds
try:
return_value = datetime.fromtimestamp(input_timestamp, tz=tz)
except ValueError:
input_timestamp = float(input_timestamp)/1000
return_value = datetime.fromtimestamp(input_timestamp, tz=tz)
return return_value
[docs]def datetime_to_timestamp(input_datetime, date_format='%Y-%m-%d %H:%M:%S', tz=None):
"""
Converts datetime to epoch timestamp.
**Note** - If input_datetime is timestamp aware, it would first be localized according to the tz parameter if filled, or the system timezone if unfilled.
:param input_datetime: Date to convert.
:type input_datetime: datetime object or string representation of datetime.
:param date_format: If input is string, denotes string datetime format to convert from.
:param tz: String representation of timezone accepted by pytz. eg. 'Asia/Hong_Kong'. If param is unfilled, system timezone is used.
:return: timezone aware datetime object
"""
epoch = timezone('UTC').localize(datetime.utcfromtimestamp(0))
if isinstance(input_datetime, str):
input_datetime = datetime.strptime(input_datetime, date_format)
assert isinstance(input_datetime, datetime)
if tz is None:
tz = get_localzone()
else:
tz = timezone(tz)
if input_datetime.tzinfo is None:
input_value = tz.localize(input_datetime)
else:
input_value = input_datetime.astimezone(tz)
return_value = long((input_value - epoch).total_seconds())
return return_value
[docs]def date_range(start, end, ascending=True, date_format='%Y-%m-%d'):
"""
Simple datetime generator for dates between start and end (inclusive).
:param start: Date to start at.
:type start: datetime object or string representation of datetime.
:param end: Date to stop at.
:type end: datetime object or string representation of datetime.
:param ascending: Toggle sorting of output.
:type ascending: boolean
:param date_format: If input is string, denotes string datetime format to convert from.
:return: generator object for naive datetime objects
"""
if isinstance(start, str):
start_date = datetime.strptime(start, date_format)
else:
start_date = start.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=None)
if isinstance(end, str):
end_date = datetime.strptime(end, date_format)
else:
end_date = end.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=None)
assert end_date >= start_date
days_apart = (end_date - start_date).days + 1
for i in (range(0, days_apart) if ascending else range(0, days_apart)[::-1]):
yield start_date + timedelta(i)
[docs]def month_range(start, end, full_months=False, month_format='%Y-%m', ascending=True, date_format='%Y-%m-%d'):
"""
Simple utility to chunk date range into months.
:param start: Date to start at.
:param end: Date to end at.
:param full_months: If true, only data up till the last complete month would be included.
:param month_format: Format for month key.
:param date_format: If start and end is string, denotes string datetime format to convert from.
:param ascending: Sort date list ascending
:return: dictionary keyed by month (in the format specified by month_format) with values being the list of dates within that month.
"""
def is_last_day(input_date):
return input_date == (input_date.replace(day=1) + relativedelta(months=1) - timedelta(days=1))
date_dict = {}
for temp_date in date_range(start, end, date_format=date_format):
month_key = temp_date.strftime(month_format)
date_dict.setdefault(month_key, [])
date_dict[month_key].append(temp_date)
if full_months:
return {k: sorted(v, reverse=not ascending) for k, v in date_dict.items() if is_last_day(max(v))}
else:
return {k: sorted(v, reverse=not ascending) for k, v in date_dict.items()}
[docs]def simple_mail(send_to, subject, text, send_from=None, username=None, password=None, server='smtp.gmail.com', port=587):
"""
Simple utility mail function - only text messages without attachments.
*Note* - In Gmail you'd have to allow 'less secure apps to access your account'. Not recommended for secure information/accounts.
:param send_to: Email recipients
:type send_to: list or string
:param subject: Email Subject
:param text: Email Body
:param send_from: Name of sender
:param username: Login username
:param password: Login password
:param server: Mail server
:param port: Connection port
"""
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import COMMASPACE, formatdate
assert username is not None and password is not None
if send_from is None:
send_from = username
if not isinstance(send_to, list):
send_to = [send_to]
message = MIMEMultipart()
message['From'] = send_from
message['To'] = COMMASPACE.join(send_to)
message['Date'] = formatdate(localtime=True)
message['Subject'] = subject
message.attach(MIMEText(text))
smtp = smtplib.SMTP(server, port)
smtp.starttls()
smtp.login(username, password)
smtp.sendmail(send_from, send_to, msg=message.as_string())
smtp.quit()
[docs]class StringLogger:
def __init__(self, name=None, level=logging.INFO, formatter=None, ignore_modules=None):
"""
Simple logging wrapper with a string buffer handler to easily write and retrieve logs as strings.
:param name: Name of logger
:param level: Logging level
:param formatter: logging.Formatter() object
:param ignore_modules: list of module names to ignore from logging process
"""
self.logger = logging.getLogger(name)
self.logger.setLevel(level)
if formatter is None:
self._formatter = logging.Formatter(
fmt='%(asctime)s [%(levelname)s] (%(name)s): %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
else:
assert isinstance(formatter, logging.Formatter)
self._formatter = formatter
self._log_capture_string = StringIO()
sh = logging.StreamHandler(self._log_capture_string)
sh.setLevel(level)
sh.setFormatter(self._formatter)
self.logger.addHandler(sh)
# filters logging for modules in ignore_modules
if ignore_modules is not None and isinstance(ignore_modules, (list, tuple)):
class _LoggingFilter(logging.Filter):
def filter(self, record):
# still lets WARNING, ERROR and CRITICAL through
return record.name not in ignore_modules or \
record.levelno in (logging.WARNING, logging.ERROR, logging.CRITICAL)
for handler in self.logger.handlers:
handler.addFilter(_LoggingFilter())
[docs] def get_logger(self):
"""
Return instantiated Logger.
:return: logging.Logger object
"""
return self.logger
[docs] def get_log_string(self):
"""
Return logs as string.
:return: logged data as string
"""
return self._log_capture_string.getvalue()
def close(self):
self._log_capture_string.close()