Source code for gwrappy.dataproc.dataproc

from gwrappy.service import get_service
from gwrappy.utils import iterate_list
from gwrappy.dataproc.utils import OperationResponse, JobResponse

from time import sleep


[docs]class DataprocUtility: def __init__(self, project_id, **kwargs): """ Initializes object for interacting with Dataproc API. | By default, Application Default Credentials are used. | If gcloud SDK isn't installed, credential files have to be specified using the kwargs *json_credentials_path* and *client_id*. :param project_id: Project ID linked to Dataproc. :keyword client_secret_path: File path for client secret JSON file. Only required if credentials are invalid or unavailable. :keyword json_credentials_path: File path for automatically generated credentials. :keyword client_id: Credentials are stored as a key-value pair per client_id to facilitate multiple clients using the same credentials file. For simplicity, using one's email address is sufficient. """ self.project_id = project_id self._service = get_service('dataproc', **kwargs) self._max_retries = kwargs.get('max_retries', 3)
[docs] def list_clusters(self, max_results=None, filter=None): """ Abstraction of projects().regions().clusters().list() method with inbuilt iteration functionality. [https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters/list] :param max_results: If None, all results are iterated over and returned. :type max_results: integer :param filter: Query param [https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters/list#query-parameters] :type filter: String :return: List of dictionary objects representing cluster resources. """ return iterate_list( self._service.projects().regions().clusters(), 'clusters', max_results, self._max_retries, projectId=self.project_id, region='global', filter=filter )
[docs] def get_cluster(self, cluster_name): """ Abstraction of projects().regions().clusters().get() method. [https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters/get] :param cluster_name: Cluster name. :return: Dictionary object representing cluster resource. """ cluster_resp = self._service.projects().regions().clusters().get( projectId=self.project_id, region='global', clusterName=cluster_name ).execute(num_retries=self._max_retries) return cluster_resp
[docs] def diagnose_cluster(self, cluster_name): """ Abstraction of projects().regions().clusters().diagnose() method. [https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters/diagnose] :param cluster_name: Cluster name. :return: Dictionary object representing operation resource. """ cluster_resp = self._service.projects().regions().clusters().diagnose( projectId=self.project_id, region='global', clusterName=cluster_name, body={} ).execute(num_retries=self._max_retries) return cluster_resp
[docs] def list_operations(self, max_results=None, filter=None): """ Abstraction of projects().regions().operations().list() method with inbuilt iteration functionality. [https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.operations/list] :param max_results: If None, all results are iterated over and returned. :type max_results: integer :param filter: Query param [https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.operations/list#query-parameters] :type filter: String :return: List of dictionary objects representing operation resources. """ return iterate_list( self._service.projects().regions().operations(), 'operations', max_results, self._max_retries, name='projects/{project_id}/regions/{region}/operations'.format(project_id=self.project_id, region='global'), filter=filter )
[docs] def get_operation(self, operation_name): """ Abstraction of projects().regions().operations().get() method. [https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.operations/get] :param operation_name: Name of operation resource. :return: Dictionary object representing operation resource. """ operation_resp = self._service.projects().regions().operations().get( name=operation_name ).execute(num_retries=self._max_retries) return operation_resp
[docs] def poll_operation_status(self, operation_resp, sleep_time=3): """ Abstraction of projects().regions().operations().get() method. [https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.operations/get] :param operation_resp: Representation of operation resource. :param sleep_time: If wait_finish is set to True, sets polling wait time. :return: Dictionary object representing operation resource. """ is_complete = False while not is_complete: operation_resp = self._service.projects().regions().operations().get( name=operation_resp['name'] ).execute(num_retries=self._max_retries) is_complete = operation_resp.get('done', False) or \ operation_resp['metadata']['status']['state'] == 'DONE' if not is_complete: sleep(sleep_time) return OperationResponse(operation_resp)
[docs] def create_cluster(self, zone, cluster_name, wait_finish=True, sleep_time=3, **kwargs): """ Abstraction of projects().regions().clusters().create() method. [https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters/create] :param zone: Dataproc zone. :param cluster_name: Cluster name. :param wait_finish: If set to True, operation will be polled till completion. :param sleep_time: If wait_finish is set to True, sets polling wait time. :keyword config_bucket: Google Cloud Storage staging bucket used for sharing generated SSH keys and config. :keyword network: Google Compute Engine network to be used for machine communications. :keyword master_boot_disk: Size in GB of the master boot disk. :keyword master_num: The number of master VM instances in the instance group. :keyword master_machine_type: Google Compute Engine machine type used for master cluster instances. :keyword worker_boot_disk: Size in GB of the worker boot disk. :keyword worker_num: The number of VM worker instances in the instance group. :keyword worker_machine_type: Google Compute Engine machine type used for worker cluster instances. :keyword init_actions: Google Cloud Storage URI of executable file(s). :return: Dictionary object or OperationResponse representing cluster resource. """ init_actions = kwargs.get('init_actions', []) if not isinstance(init_actions, list): init_actions = [init_actions] cluster_body = { 'clusterName': cluster_name, 'projectId': self.project_id, 'config': { 'configBucket': kwargs.get('config_bucket', ''), 'gceClusterConfig': { 'networkUri': 'https://www.googleapis.com/compute/v1/projects/{project_id}/global/networks/{network}'.format( project_id=self.project_id, network=kwargs.get('network', 'default') ), 'zoneUri': 'https://www.googleapis.com/compute/v1/projects/{project_id}/zones/{zone}'.format( project_id=self.project_id, zone=zone ) }, 'masterConfig': { 'numInstances': kwargs.get('master_num', 1), 'machineTypeUri': 'https://www.googleapis.com/compute/v1/projects/{project_id}/zones/{zone}/machineTypes/{master_machine_type}'.format( project_id=self.project_id, zone=zone, master_machine_type=kwargs.get('master_machine_type', 'n1-standard-4') ), 'diskConfig': { 'bootDiskSizeGb': kwargs.get('master_boot_disk', 500), 'numLocalSsds': 0 } }, 'workerConfig': { 'numInstances': kwargs.get('worker_num', 2), 'machineTypeUri': 'https://www.googleapis.com/compute/v1/projects/{project_id}/zones/{zone}/machineTypes/{worker_machine_type}'.format( project_id=self.project_id, zone=zone, worker_machine_type=kwargs.get('worker_machine_type', 'n1-standard-4') ), 'diskConfig': { 'bootDiskSizeGb': kwargs.get('worker_boot_disk', 500), 'numLocalSsds': 0 } }, 'initializationActions': [{'executableFile': x} for x in init_actions] } } cluster_resp = self._service.projects().regions().clusters().create( projectId=self.project_id, region='global', body=cluster_body ).execute(num_retries=self._max_retries) if wait_finish: return self.poll_operation_status(cluster_resp, sleep_time) else: return cluster_resp
[docs] def delete_cluster(self, cluster_name, wait_finish=True, sleep_time=3): """ Abstraction of projects().regions().clusters().delete() method. [https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters/delete] :param cluster_name: Cluster name. :param wait_finish: If set to True, operation will be polled till completion. :param sleep_time: If wait_finish is set to True, sets polling wait time. :return: Dictionary object or OperationResponse representing cluster resource. """ cluster_resp = self._service.projects().regions().clusters().delete( projectId=self.project_id, region='global', clusterName=cluster_name ).execute(num_retries=self._max_retries) if wait_finish: return self.poll_operation_status(cluster_resp, sleep_time) else: return cluster_resp
[docs] def list_jobs(self, cluster_name=None, job_state='ACTIVE', max_results=None, filter=None): """ Abstraction of projects().regions().jobs().list() method with inbuilt iteration functionality. [https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs/list] :param cluster_name: Cluster name, if unset, will return jobs from all clusters. :param job_state: Category of jobs to return. [https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs/list#JobStateMatcher] :param max_results: If None, all results are iterated over and returned. :type max_results: integer :param filter: Query param [https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs/list#query-parameters] :type filter: String :return: List of dictionary objects representing cluster resources. """ return iterate_list( self._service.projects().regions().jobs(), 'jobs', max_results, self._max_retries, projectId=self.project_id, region='global', clusterName=cluster_name, jobStateMatcher=job_state, filter=filter )
[docs] def get_job(self, job_id): """ Abstraction of projects().regions().jobs().get() method. [https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs/get] :param job_id: Job Id. :return: Dictionary object representing job resource. """ job_resp = self._service.projects().regions().jobs().get( projectId=self.project_id, region='global', jobId=job_id ).execute(num_retries=self._max_retries) return job_resp
[docs] def poll_job_status(self, job_resp, sleep_time=3): """ :param job_resp: Representation of job resource. :param sleep_time: If wait_finish is set to True, sets polling wait time. :return: Dictionary object representing job resource. """ is_complete = False while not is_complete: job_resp = self.get_job(job_resp['reference']['jobId']) is_complete = job_resp['status']['state'] in ('DONE', 'ERROR', 'CANCELLED') if not is_complete: sleep(sleep_time) return JobResponse(job_resp)
[docs] def submit_spark_job(self, cluster_name, main_class, wait_finish=True, sleep_time=5, **kwargs): """ Abstraction of projects().regions().jobs().submit() method. [https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs/submit] Body parameters can be found here: [https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs#resource-job] :param cluster_name: The name of the cluster where the job will be submitted. :param main_class: The name of the driver's main class. :param wait_finish: If set to True, operation will be polled till completion. :param sleep_time: If wait_finish is set to True, sets polling wait time. :keyword args: The arguments to pass to the driver. :keyword jar_uris: HCFS URIs of jar files to add to the CLASSPATHs of the Spark driver and tasks. :keyword file_uris: HCFS URIs of files to be copied to the working directory of Spark drivers and distributed tasks. :keyword archive_uris: HCFS URIs of archives to be extracted in the working directory of Spark drivers and tasks. :keyword properties: A mapping of property names to values, used to configure Spark. :return: Dictionary object or JobResponse representing job resource. """ # validate fields assert isinstance(kwargs.get('args', []), list) assert isinstance(kwargs.get('jar_uris', []), list) assert isinstance(kwargs.get('file_uris', []), list) assert isinstance(kwargs.get('archive_uris', []), list) assert isinstance(kwargs.get('properties', {}), dict) submit_body = { 'job': { 'placement': { 'clusterName': cluster_name }, 'sparkJob': { 'mainClass': main_class, 'args': kwargs.get('args', []), 'jarFileUris': kwargs.get('jar_uris', []), 'fileUris': kwargs.get('file_uris', []), 'archiveUris': kwargs.get('archive_uris', []), 'properties': kwargs.get('properties', {}) } } } job_resp = self._service.projects().regions().jobs().submit( projectId=self.project_id, region='global', body=submit_body ).execute(num_retries=self._max_retries) if wait_finish: return self.poll_job_status(job_resp, sleep_time) else: return job_resp
[docs] def submit_pyspark_job(self, cluster_name, main_py_uri, wait_finish=True, sleep_time=5, **kwargs): """ Abstraction of projects().regions().jobs().submit() method. [https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs/submit] Body parameters can be found here: [https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs#resource-job] :param cluster_name: The name of the cluster where the job will be submitted. :param main_py_uri: The HCFS URI of the Python file to use as the driver. :param wait_finish: If set to True, operation will be polled till completion. :param sleep_time: If wait_finish is set to True, sets polling wait time. :keyword args: The arguments to pass to the driver. :keyword python_uris: HCFS file URIs of Python files to pass to the PySpark framework. :keyword jar_uris: HCFS URIs of jar files to add to the CLASSPATHs of the Python driver and tasks. :keyword file_uris: HCFS URIs of files to be copied to the working directory of Python drivers and distributed tasks. :keyword archive_uris: HCFS URIs of archives to be extracted in the working directory of Spark drivers and tasks. :keyword properties: A mapping of property names to values, used to configure PySpark. :return: Dictionary object or JobResponse representing job resource. """ # validate fields assert isinstance(kwargs.get('args', []), list) assert isinstance(kwargs.get('python_uris', []), list) assert isinstance(kwargs.get('jar_uris', []), list) assert isinstance(kwargs.get('file_uris', []), list) assert isinstance(kwargs.get('archive_uris', []), list) assert isinstance(kwargs.get('properties', {}), dict) submit_body = { 'placement': { 'clusterName': cluster_name }, 'job': { 'pysparkJob': { 'mainPythonFileUri': main_py_uri, 'args': kwargs.get('args', []), 'pythonFileUris': kwargs.get('python_uris', []), 'jarFileUris': kwargs.get('jar_uris', []), 'fileUris': kwargs.get('file_uris', []), 'archiveUris': kwargs.get('archive_uris', []), 'properties': kwargs.get('properties', {}) } } } job_resp = self._service.projects().regions().jobs().submit( projectId=self.project_id, region='global', body=submit_body ).execute(num_retries=self._max_retries) if wait_finish: return self.poll_job_status(job_resp, sleep_time) else: return job_resp