Source code for gwrappy.dataproc.utils

from datetime import datetime


[docs]class OperationResponse: def __init__(self, resp): """ Wrapper for Dataproc Operation responses, mainly for calculating/parsing statistics into human readable formats for logging. :param resp: Dictionary representation of an operation resource. :type resp: dictionary """ self.resp = resp self.name = resp['name'] self.type = resp['metadata']['operationType'] self._parse_timing() def _parse_timing(self): try: start_time = datetime.strptime( self.resp['metadata']['statusHistory'][0]['stateStartTime'], '%Y-%m-%dT%H:%M:%S.%fZ' ) end_time = datetime.strptime( self.resp['metadata']['status']['stateStartTime'], '%Y-%m-%dT%H:%M:%S.%fZ' ) time_taken = (end_time - start_time).total_seconds() setattr( self, 'time_taken', dict(zip( ('m', 's'), divmod(time_taken, 60) )) ) except (IndexError, KeyError): pass def __repr__(self): return '[Dataproc] %s Operation (%s)%s' % ( self.type, self.name.split('/')[-1], ' ({m:.0f} Minutes {s:.0f} Seconds)'.format(**getattr(self, 'time_taken')) if hasattr(self, 'time_taken') else '' ) __str__ = __repr__
[docs]class JobResponse: def __init__(self, resp): """ Wrapper for Dataproc Job responses, mainly for calculating/parsing statistics into human readable formats for logging. :param resp: Dictionary representation of an operation resource. :type resp: dictionary """ self.resp = resp self.id = '{projectId}:{jobId}'.format(**resp['reference']) self.state = resp['status']['state'] self.job_type = set(tuple(resp.keys()))\ .intersection({'hadoopJob', 'sparkJob', 'pysparkJob', 'hiveJob', 'pigJob', 'sparkSqlJob'})\ .pop() self._parse_timing() def _parse_timing(self): try: start_time = datetime.strptime( self.resp['statusHistory'][0]['stateStartTime'], '%Y-%m-%dT%H:%M:%S.%fZ' ) end_time = datetime.strptime( self.resp['status']['stateStartTime'], '%Y-%m-%dT%H:%M:%S.%fZ' ) time_taken = (end_time - start_time).total_seconds() setattr( self, 'time_taken', dict(zip( ('m', 's'), divmod(time_taken, 60) )) ) except (IndexError, KeyError): pass def __repr__(self): return '[Dataproc] %s (%s) %s%s' % ( self.job_type[0].upper() + self.job_type[1:], self.id, self.state, ' ({m:.0f} Minutes {s:.0f} Seconds)'.format(**getattr(self, 'time_taken')) if hasattr(self, 'time_taken') else '' ) __str__ = __repr__