from time import sleep
from gwrappy.service import get_service
from gwrappy.utils import iterate_list, datetime_to_timestamp
from gwrappy.errors import HttpError
from gwrappy.bigquery.errors import JobError
from gwrappy.bigquery.utils import JobResponse, TableResponse
[docs]class BigqueryUtility:
def __init__(self, **kwargs):
"""
Initializes object for interacting with Bigquery API.
| By default, Application Default Credentials are used.
| If gcloud SDK isn't installed, credential files have to be specified using the kwargs *json_credentials_path* and *client_id*.
:keyword max_retries: Argument specified with each API call to natively handle retryable errors.
:type max_retries: integer
:keyword client_secret_path: File path for client secret JSON file. Only required if credentials are invalid or unavailable.
:keyword json_credentials_path: File path for automatically generated credentials.
:keyword client_id: Credentials are stored as a key-value pair per client_id to facilitate multiple clients using the same credentials file. For simplicity, using one's email address is sufficient.
"""
self._service = get_service('bigquery', **kwargs)
self._max_retries = kwargs.get('max_retries', 3)
[docs] def list_projects(self, max_results=None, filter_exp=None):
"""
Abstraction of projects().list() method with inbuilt iteration functionality. [https://cloud.google.com/bigquery/docs/reference/v2/projects/list]
:param max_results: If None, all results are iterated over and returned.
:type max_results: integer
:param filter_exp: Function that filters entries if filter_exp evaluates to True.
:type filter_exp: function
:return: List of dictionary objects representing project resources.
"""
return iterate_list(
self._service.projects(),
'projects',
max_results,
self._max_retries,
filter_exp
)
[docs] def list_jobs(self, project_id, state_filter=None, show_all=False, projection='full', max_results=None, earliest_date=None, filter_exp=None):
"""
Abstraction of jobs().list() method with inbuilt iteration functionality. [https://cloud.google.com/bigquery/docs/reference/v2/jobs/list]
**Note** - All jobs are stored in BigQuery. Do set *max_results* or *earliest_date* to limit data returned.
:param project_id: Unique project identifier.
:type project_id: string
:param state_filter: Pre-filter API request for job state. Acceptable values are "done", "pending" and "running". [Equivalent API param: stateFilter]
:type state_filter: string
:param show_all: Whether to display jobs owned by all users in the project. [Equivalent API param: allUsers]
:type show_all: boolean
:param max_results: If None, all results are iterated over and returned.
:type max_results: integer
:param projection: Acceptable values are *'full'*, *'minimal'*. *'full'* includes job configuration.
:type projection: string
:param earliest_date: Only returns data after this date.
:type earliest_date: datetime object or string representation of datetime in %Y-%m-%d format.
:param filter_exp: Function that filters entries if filter_exp evaluates to True.
:type filter_exp: function
:return: List of dictionary objects representing job resources.
"""
if earliest_date is not None:
# bigquery timestamp is in milliseconds
earliest_date_timestamp = datetime_to_timestamp(earliest_date) * 1000
break_condition = lambda x: int(x['statistics']['creationTime']) < earliest_date_timestamp
else:
break_condition = None
return iterate_list(
self._service.jobs(),
'jobs',
max_results,
self._max_retries,
filter_exp,
break_condition=break_condition,
projectId=project_id,
allUsers=show_all,
projection=projection,
stateFilter=state_filter
)
[docs] def list_datasets(self, project_id, show_all=False, max_results=None, filter_exp=None):
"""
Abstraction of datasets().list() method with inbuilt iteration functionality. [https://cloud.google.com/bigquery/docs/reference/v2/datasets/list]
:param project_id: Unique project identifier.
:type project_id: string
:param show_all: Include hidden datasets generated when running queries on the UI.
:type show_all: boolean
:param max_results: If None, all results are iterated over and returned.
:type max_results: integer
:param filter_exp: Function that filters entries if filter_exp evaluates to True.
:type filter_exp: function
:return: List of dictionary objects representing dataset resources.
"""
return iterate_list(
self._service.datasets(),
'datasets',
max_results,
self._max_retries,
filter_exp,
projectId=project_id,
all=show_all
)
[docs] def list_tables(self, project_id, dataset_id, max_results=None, filter_exp=None):
"""
Abstraction of tables().list() method with inbuilt iteration functionality. [https://cloud.google.com/bigquery/docs/reference/v2/tables/list]
:param project_id: Unique project identifier.
:type project_id: string
:param dataset_id: Unique dataset identifier.
:type dataset_id: string
:param max_results: If None, all results are iterated over and returned.
:type max_results: integer
:param filter_exp: Function that filters entries if filter_exp evaluates to True.
:type filter_exp: function
:return: List of dictionary objects representing table resources.
"""
return iterate_list(
self._service.tables(),
'tables',
max_results,
self._max_retries,
filter_exp,
projectId=project_id,
datasetId=dataset_id
)
[docs] def get_job(self, project_id, job_id):
"""
Abstraction of jobs().get() method. [https://cloud.google.com/bigquery/docs/reference/v2/jobs/get]
:param project_id: Unique project identifier.
:type project_id: string
:param job_id: Unique job identifier.
:type job_id: string
:return: Dictionary object representing job resource.
"""
job_resp = self._service.jobs().get(
projectId=project_id,
jobId=job_id
).execute(num_retries=self._max_retries)
return job_resp
[docs] def get_table_info(self, project_id, dataset_id, table_id):
"""
Abstraction of tables().get() method. [https://cloud.google.com/bigquery/docs/reference/v2/tables/get]
:param project_id: Unique project identifier.
:type project_id: string
:param dataset_id: Unique dataset identifier.
:type dataset_id: string
:param table_id: Unique table identifier.
:type table_id: string
:return: Dictionary object representing table resource.
"""
return self._service.tables().get(
projectId=project_id,
datasetId=dataset_id,
tableId=table_id
).execute(num_retries=self._max_retries)
[docs] def delete_table(self, project_id, dataset_id, table_id):
"""
Abstraction of tables().delete() method. [https://cloud.google.com/bigquery/docs/reference/v2/tables/delete]
:param project_id: Unique project identifier.
:type project_id: string
:param dataset_id: Unique dataset identifier.
:type dataset_id: string
:param table_id: Unique table identifier.
:type table_id: string
:raises: AssertionError if unsuccessful. Response should be empty string if successful.
"""
# if successful, will return empty string
job_resp = self._service.tables().delete(
projectId=project_id,
datasetId=dataset_id,
tableId=table_id
).execute(num_retries=self._max_retries)
if len(job_resp) > 0:
raise AssertionError(job_resp)
def _get_query_results(self, job_resp, page_token=None, max_results=None):
resp = self._service.jobs().getQueryResults(
projectId=job_resp['jobReference']['projectId'],
jobId=job_resp['jobReference']['jobId'],
maxResults=max_results,
pageToken=page_token,
timeoutMs=0
).execute(num_retries=self._max_retries)
return resp
[docs] def poll_job_status(self, job_resp, sleep_time=1):
"""
Check status of job until status is "DONE".
:param job_resp: Representation of job resource.
:type job_resp: dictionary
:param sleep_time: Time to pause (seconds) between polls.
:type sleep_time: integer
:return: Dictionary object representing job resource's final state.
:raises: JobError object if an error is discovered after job finishes running.
"""
status_state = None
while not status_state == 'DONE':
job_resp = self.get_job(
project_id=job_resp['jobReference']['projectId'],
job_id=job_resp['jobReference']['jobId']
)
status_state = job_resp['status']['state']
sleep(sleep_time)
if 'errorResult' in job_resp['status']:
raise JobError(job_resp)
return job_resp
def _iterate_job_results(self, job_resp, return_type, sleep_time):
assert return_type in ('list', 'dataframe', 'json')
job_resp = self.poll_job_status(job_resp, sleep_time)
results = []
query_resp = None
while query_resp is None or 'pageToken' in query_resp:
page_token = None
if query_resp is not None:
page_token = query_resp.get('pageToken', None)
query_resp = self._get_query_results(
job_resp,
page_token=page_token
)
# add column names as first row
if page_token is None:
results.append([item['name'] for item in query_resp['schema']['fields']])
# iterate through rows
rows = query_resp.pop('rows', [])
for row in rows:
results.append([item['v'] for item in row['f']])
sleep(sleep_time)
# only job_resp from getQueryResults has totalRows field, combined with return_resp
job_resp[u'totalRows'] = query_resp.get('totalRows', u'0')
if return_type == 'list':
return results, job_resp
# for dataframe and json
# json will be first converted to dataframe for proper typing
else:
import pandas as pd
pd.set_option('display.expand_frame_repr', False)
from io import BytesIO
import unicodecsv as csv
query_schema = query_resp['schema']['fields']
def _convert_timestamp(input_value):
try:
return pd.datetime.utcfromtimestamp(float(input_value))
except (TypeError, ValueError):
return pd.np.NaN
def _convert_dtypes(schema):
dtype_dict = {
'STRING': object,
# 'INTEGER': long,
'FLOAT': float,
'BOOLEAN': bool
}
# pandas 0.19.0 would wrongly convert int columns to float if column: None in dtype dict
return {
x['name']: dtype_dict[x['type']] for x in schema if x['type'] in dtype_dict.keys()
}
with BytesIO() as file_buffer:
csv_writer = csv.writer(file_buffer, lineterminator='\n')
csv_writer.writerows(results)
file_buffer.seek(0)
timestamp_cols = [x['name'] for x in query_schema if x['type'] == 'TIMESTAMP']
results = pd.read_csv(
file_buffer,
parse_dates=timestamp_cols,
date_parser=_convert_timestamp,
keep_default_na=False,
na_values=['NULL', 'null', ''],
dtype=_convert_dtypes(query_schema)
)
if return_type == 'dataframe':
return results, job_resp
elif return_type == 'json':
results = results.to_dict('records')
# pandas will return NaN, convert to native python None
results = [{k: v if pd.notnull(v) else None for k, v in x.items()} for x in results]
return results, job_resp
[docs] def sync_query(self, project_id, query, return_type='list', sleep_time=1, dry_run=False, **kwargs):
"""
Abstraction of jobs().query() method, iterating and parsing query results. [https://cloud.google.com/bigquery/docs/reference/v2/jobs/query]
:param project_id: Unique project identifier.
:type project_id: string
:param query: SQL query
:type query: string
:param return_type: Format for result to be returned. Accepted types are "list", "dataframe", and "json".
:type return_type: string
:param sleep_time: Time to pause (seconds) between polls.
:type sleep_time: integer
:param dry_run: Basic statistics about the query, without actually running it. Mainly for testing or estimating amount of data processed.
:type dry_run: boolean
:keyword useLegacySql: Toggle between Legacy and Standard SQL.
:return: If not dry_run: result in specified type, JobResponse object. If dry_run: Dictionary object representing expected query statistics.
:raises: JobError object if an error is discovered after job finishes running.
"""
request_body = {
'query': query,
'timeoutMs': 0,
'dryRun': dry_run,
'useLegacySql': kwargs.get('useLegacySql', None)
}
job_resp = self._service.jobs().query(
projectId=project_id,
body=request_body
).execute(num_retries=self._max_retries)
if not dry_run:
result, job_resp = self._iterate_job_results(job_resp, return_type, sleep_time)
return result, JobResponse(job_resp, 'sync')
else:
return job_resp
[docs] def async_query(self, project_id, query, dest_project_id, dest_dataset_id, dest_table_id, udf=None,
return_type='list', sleep_time=1, **kwargs):
"""
Abstraction of jobs().insert() method for **query** job, iterating and parsing query results. [https://cloud.google.com/bigquery/docs/reference/v2/jobs/insert]
Asynchronous queries always write to an intermediate (destination) table.
| This query method is preferable over sync_query if:
| 1. Large results are returned.
| 2. UDF functions are required.
| 3. Results returned also need to be stored in a table.
:param project_id: Unique project identifier.
:type project_id: string
:param query: SQL query
:type query: string
:param dest_project_id: Unique project identifier of destination table.
:type dest_project_id: string
:param dest_dataset_id: Unique dataset identifier of destination table.
:type dest_dataset_id: string
:param dest_table_id: Unique table identifier of destination table.
:type dest_table_id: string
:param udf: One or more UDF functions if required by the query.
:type udf: string or list
:param return_type: Format for result to be returned. Accepted types are "list", "dataframe", and "json".
:type return_type: string
:param sleep_time: Time to pause (seconds) between polls.
:type sleep_time: integer
:keyword useLegacySql: Toggle between Legacy and Standard SQL.
:keyword writeDisposition: (Optional) Config kwarg that determines table writing behaviour.
:return: result in specified type, JobResponse object.
:raises: JobError object if an error is discovered after job finishes running.
"""
request_body = {
'jobReference': {
'projectId': project_id
},
'configuration': {
'query': {
'userDefinedFunctionResources': udf,
'query': query,
'allowLargeResults': 'true',
'destinationTable': {
'projectId': dest_project_id,
'datasetId': dest_dataset_id,
'tableId': dest_table_id,
},
'writeDisposition': kwargs.get('writeDisposition', 'WRITE_TRUNCATE'),
'flattenResults': kwargs.get('flattenResults', None),
'useLegacySql': kwargs.get('useLegacySql', None)
}
}
}
response = self._service.jobs().insert(
projectId=project_id,
body=request_body
).execute(num_retries=self._max_retries)
result, job_resp = self._iterate_job_results(response, return_type, sleep_time)
return result, JobResponse(job_resp, 'async')
[docs] def write_table(self, project_id, query, dest_project_id, dest_dataset_id, dest_table_id, udf=None,
wait_finish=True, sleep_time=1, **kwargs):
"""
Abstraction of jobs().insert() method for **query** job, without returning results. [https://cloud.google.com/bigquery/docs/reference/v2/jobs/insert]
:param project_id: Unique project identifier.
:type project_id: string
:param query: SQL query
:type query: string
:param dest_project_id: Unique project identifier of destination table.
:type dest_project_id: string
:param dest_dataset_id: Unique dataset identifier of destination table.
:type dest_dataset_id: string
:param dest_table_id: Unique table identifier of destination table.
:type dest_table_id: string
:param udf: One or more UDF functions if required by the query.
:type udf: string or list
:param wait_finish: Flag whether to poll job till completion. If set to false, multiple jobs can be submitted, repsonses stored, iterated over and polled till completion afterwards.
:type wait_finish: boolean
:param sleep_time: Time to pause (seconds) between polls.
:type sleep_time: integer
:keyword useLegacySql: Toggle between Legacy and Standard SQL.
:keyword writeDisposition: (Optional) Config kwarg that determines table writing behaviour.
:return: If wait_finish: result in specified type, JobResponse object. If not wait_finish: JobResponse object.
:raises: If wait_finish: JobError object if an error is discovered after job finishes running.
"""
request_body = {
'jobReference': {
'projectId': project_id
},
'configuration': {
'query': {
'userDefinedFunctionResources': udf,
'query': query,
'allowLargeResults': 'true',
'destinationTable': {
'projectId': dest_project_id,
'datasetId': dest_dataset_id,
'tableId': dest_table_id,
},
'writeDisposition': kwargs.get('writeDisposition', 'WRITE_TRUNCATE'),
'flattenResults': kwargs.get('flattenResults', None),
'useLegacySql': kwargs.get('useLegacySql', None)
}
}
}
# error would be raised if overwriting a view, check if exists and delete first
try:
existing_table = self.get_table_info(dest_project_id, dest_dataset_id, dest_table_id)
if existing_table['type'] == 'VIEW':
self.delete_table(
dest_project_id,
dest_dataset_id,
dest_table_id
)
except HttpError as e:
# table does not exist
if e.resp.status == 404:
pass
else:
raise e
job_resp = self._service.jobs().insert(
projectId=project_id,
body=request_body
).execute(num_retries=self._max_retries)
if wait_finish:
job_resp = self.poll_job_status(job_resp, sleep_time)
# additional check to get total_rows
query_resp = self._get_query_results(
job_resp,
max_results=0
)
job_resp[u'totalRows'] = query_resp.get('totalRows', u'0')
return JobResponse(job_resp, 'write table')
[docs] def write_view(self, query, dest_project_id, dest_dataset_id, dest_table_id, udf=None, overwrite_existing=True, **kwargs):
"""
Views are analogous to a virtual table, functioning as a table but only returning results from the underlying query when called.
:param query: SQL query
:type query: string
:param dest_project_id: Unique project identifier of destination table.
:type dest_project_id: string
:param dest_dataset_id: Unique dataset identifier of destination table.
:type dest_dataset_id: string
:param dest_table_id: Unique table identifier of destination table.
:type dest_table_id: string
:param udf: One or more UDF functions if required by the query.
:type udf: string or list
:param overwrite_existing: Safety flag, would raise HttpError if table exists and overwrite_existing=False
:keyword useLegacySql: Toggle between Legacy and Standard SQL.
:return: TableResponse object for the newly inserted table
"""
request_body = {
'tableReference': {
'projectId': dest_project_id,
'datasetId': dest_dataset_id,
'tableId': dest_table_id
},
'view': {
'userDefinedFunctionResources': udf,
'query': query,
'useLegacySql': kwargs.get('useLegacySql', None)
}
}
# error would be raised if table/view already exists, delete first before reinserting
try:
job_resp = self._service.tables().insert(
projectId=dest_project_id,
datasetId=dest_dataset_id,
body=request_body
).execute(num_retries=self._max_retries)
except HttpError as e:
if e.resp.status == 409 and overwrite_existing:
self.delete_table(
dest_project_id,
dest_dataset_id,
dest_table_id
)
job_resp = self._service.tables().insert(
projectId=dest_project_id,
datasetId=dest_dataset_id,
body=request_body
).execute(num_retries=self._max_retries)
else:
raise e
return TableResponse(job_resp, 'write')
[docs] def load_from_gcs(self, dest_project_id, dest_dataset_id, dest_table_id, schema, source_uris,
wait_finish=True, sleep_time=1, **kwargs):
"""
| For loading data from Google Cloud Storage.
| Abstraction of jobs().insert() method for **load** job. [https://cloud.google.com/bigquery/docs/reference/v2/jobs/insert]
:param dest_project_id: Unique project identifier of destination table.
:type dest_project_id: string
:param dest_dataset_id: Unique dataset identifier of destination table.
:type dest_dataset_id: string
:param dest_table_id: Unique table identifier of destination table.
:type dest_table_id: string
:param schema: Schema of input data (schema.fields[]) [https://cloud.google.com/bigquery/docs/reference/v2/tables]
:type schema: list of dictionaries
:param source_uris: One or more uris referencing GCS objects
:type source_uris: string or list
:param wait_finish: Flag whether to poll job till completion. If set to false, multiple jobs can be submitted, repsonses stored, iterated over and polled till completion afterwards.
:type wait_finish: boolean
:param sleep_time: Time to pause (seconds) between polls.
:type sleep_time: integer
:keyword writeDisposition: Determines table writing behaviour.
:keyword sourceFormat: Indicates format of input data.
:keyword skipLeadingRows: Leading rows to skip. Defaults to 1 to account for headers if sourceFormat is CSV or default, 0 otherwise.
:keyword fieldDelimiter: Indicates field delimiter.
:keyword allowQuotedNewlines: Indicates presence of quoted newlines in fields.
:keyword allowJaggedRows: Accept rows that are missing trailing optional columns. (Only CSV)
:keyword ignoreUnknownValues: Allow extra values that are not represented in the table schema.
:keyword maxBadRecords: Maximum number of bad records that BigQuery can ignore when running the job.
:return: JobResponse object
"""
request_body = {
'jobReference': {
'projectId': dest_project_id
},
'configuration': {
'load': {
'destinationTable': {
'projectId': dest_project_id,
'datasetId': dest_dataset_id,
'tableId': dest_table_id
},
'writeDisposition': kwargs.get('writeDisposition', 'WRITE_TRUNCATE'),
'sourceFormat': kwargs.get('sourceFormat', None),
'skipLeadingRows': kwargs.get('skipLeadingRows', 1) if kwargs.get('sourceFormat', None) in (None, 'CSV') else None,
'fieldDelimiter': kwargs.get('fieldDelimiter', None),
'schema': {
'fields': schema
},
'sourceUris': source_uris if isinstance(source_uris, list) else [source_uris],
'allowQuotedNewlines': kwargs.get('allowQuotedNewlines', None),
'allowJaggedRows': kwargs.get('allowJaggedRows', None),
'ignoreUnknownValues': kwargs.get('ignoreUnknownValues', None),
'maxBadRecords': kwargs.get('maxBadRecords', None)
}
}
}
job_resp = self._service.jobs().insert(
projectId=dest_project_id,
body=request_body
).execute(num_retries=self._max_retries)
if wait_finish:
job_resp = self.poll_job_status(job_resp, sleep_time)
return JobResponse(job_resp, 'gcs')
[docs] def export_to_gcs(self, source_project_id, source_dataset_id, source_table_id, dest_uris,
wait_finish=True, sleep_time=1, **kwargs):
"""
| For exporting data into Google Cloud Storage.
| Abstraction of jobs().insert() method for **extract** job. [https://cloud.google.com/bigquery/docs/reference/v2/jobs/insert]
:param source_project_id: Unique project identifier of source table.
:type source_project_id: string
:param source_dataset_id: Unique dataset identifier of source table.
:type source_dataset_id: string
:param source_table_id: Unique table identifier of source table.
:type source_table_id: string
:param dest_uris: One or more uris referencing GCS objects
:type dest_uris: string or list
:param wait_finish: Flag whether to poll job till completion. If set to false, multiple jobs can be submitted, repsonses stored, iterated over and polled till completion afterwards.
:type wait_finish: boolean
:param sleep_time: Time to pause (seconds) between polls.
:type sleep_time: integer
:keyword destinationFormat: (Optional) Config kwarg that indicates format of output data.
:keyword compression: (Optional) Config kwarg for type of compression applied.
:keyword fieldDelimiter: (Optional) Config kwarg that indicates field delimiter.
:keyword printHeader: (Optional) Config kwarg indicating if table headers should be written.
:return: JobResponse object
"""
request_body = {
'jobReference': {
'projectId': source_project_id
},
'configuration': {
'extract': {
'sourceTable': {
'projectId': source_project_id,
'datasetId': source_dataset_id,
'tableId': source_table_id
},
'destinationUris': dest_uris if isinstance(dest_uris, list) else [dest_uris],
'destinationFormat': kwargs.get('destinationFormat', None),
'compression': kwargs.get('compression', None),
'fieldDelimiter': kwargs.get('fieldDelimiter', None),
'printHeader': kwargs.get('printHeader', None),
}
}
}
job_resp = self._service.jobs().insert(
projectId=source_project_id,
body=request_body
).execute(num_retries=self._max_retries)
if wait_finish:
job_resp = self.poll_job_status(job_resp, sleep_time)
return JobResponse(job_resp)
[docs] def copy_table(self, source_data, dest_project_id, dest_dataset_id, dest_table_id,
wait_finish=True, sleep_time=1, **kwargs):
"""
| For copying existing table(s) to a new or existing table.
| Abstraction of jobs().insert() method for **copy** job. [https://cloud.google.com/bigquery/docs/reference/v2/jobs/insert]
:param source_data: Representations of single or multiple existing tables to copy from.
:param source_date: dictionary or list of dictionaries
:param dest_project_id: Unique project identifier of destination table.
:type dest_project_id: string
:param dest_dataset_id: Unique dataset identifier of destination table.
:type dest_dataset_id: string
:param dest_table_id: Unique table identifier of destination table.
:type dest_table_id: string
:param wait_finish: Flag whether to poll job till completion. If set to false, multiple jobs can be submitted, repsonses stored, iterated over and polled till completion afterwards.
:type wait_finish: boolean
:param sleep_time: Time to pause (seconds) between polls.
:type sleep_time: integer
:keyword writeDisposition: (Optional) Config kwarg that determines table writing behaviour.
:return: JobResponse object
"""
required_keys = ['datasetId', 'projectId', 'tableId']
if isinstance(source_data, dict):
assert list(sorted(source_data.keys())) == required_keys
source_data = [source_data]
else:
assert isinstance(source_data, list)
assert all([isinstance(x, dict) for x in source_data])
assert all([list(sorted(x.keys())) == required_keys for x in source_data])
request_body = {
'jobReference': {
'projectId': dest_project_id
},
'configuration': {
'copy': {
'destinationTable': {
'projectId': dest_project_id,
'datasetId': dest_dataset_id,
'tableId': dest_table_id
},
'sourceTables': source_data,
'writeDisposition': kwargs.get('writeDisposition', 'WRITE_TRUNCATE')
}
}
}
job_resp = self._service.jobs().insert(
projectId=dest_project_id,
body=request_body
).execute(num_retries=self._max_retries)
if wait_finish:
job_resp = self.poll_job_status(job_resp, sleep_time)
return JobResponse(job_resp)
[docs] def load_from_string(self, dest_project_id, dest_dataset_id, dest_table_id, schema, load_string,
wait_finish=True, sleep_time=1, **kwargs):
"""
| For loading data from string representation of a file/object.
| Can be used in conjunction with gwrappy.bigquery.utils.file_to_string()
:param dest_project_id: Unique project identifier of destination table.
:type dest_project_id: string
:param dest_dataset_id: Unique dataset identifier of destination table.
:type dest_dataset_id: string
:param dest_table_id: Unique table identifier of destination table.
:type dest_table_id: string
:param schema: Schema of input data (schema.fields[]) [https://cloud.google.com/bigquery/docs/reference/v2/tables]
:type schema: list of dictionaries
:param load_string: String representation of an object.
:type load_string: string
:param wait_finish: Flag whether to poll job till completion. If set to false, multiple jobs can be submitted, repsonses stored, iterated over and polled till completion afterwards.
:type wait_finish: boolean
:param sleep_time: Time to pause (seconds) between polls.
:type sleep_time: integer
:keyword writeDisposition: (Optional) Config kwarg that determines table writing behaviour.
:keyword sourceFormat: (Optional) Config kwarg that indicates format of input data.
:keyword skipLeadingRows: (Optional) Config kwarg for leading rows to skip. Defaults to 1 to account for headers if sourceFormat is CSV or default, 0 otherwise.
:keyword fieldDelimiter: (Optional) Config kwarg that indicates field delimiter.
:keyword allowQuotedNewlines: (Optional) Config kwarg indicating presence of quoted newlines in fields.
:keyword allowJaggedRows: Accept rows that are missing trailing optional columns. (Only CSV)
:keyword ignoreUnknownValues: Allow extra values that are not represented in the table schema.
:keyword maxBadRecords: Maximum number of bad records that BigQuery can ignore when running the job.
:return: JobResponse object
"""
from googleapiclient.http import MediaInMemoryUpload
request_body = {
'jobReference': {
'projectId': dest_project_id
},
'configuration': {
'load': {
'destinationTable': {
'projectId': dest_project_id,
'datasetId': dest_dataset_id,
'tableId': dest_table_id
},
'writeDisposition': kwargs.get('writeDisposition', 'WRITE_TRUNCATE'),
'sourceFormat': kwargs.get('sourceFormat', None),
'skipLeadingRows': kwargs.get('skipLeadingRows', 1) if kwargs.get('sourceFormat', None) in (None, 'CSV') else None,
'fieldDelimiter': kwargs.get('fieldDelimiter', None),
'schema': {
'fields': schema
},
'allowQuotedNewlines': kwargs.get('allowQuotedNewlines', None),
'allowJaggedRows': kwargs.get('allowJaggedRows', None),
'ignoreUnknownValues': kwargs.get('ignoreUnknownValues', None),
'maxBadRecords': kwargs.get('maxBadRecords', None)
}
}
}
media_body = MediaInMemoryUpload(load_string, mimetype='application/octet-stream')
job_resp = self._service.jobs().insert(
projectId=dest_project_id,
body=request_body,
media_body=media_body
).execute(num_retries=self._max_retries)
if wait_finish:
job_resp = self.poll_job_status(job_resp, sleep_time)
return JobResponse(job_resp, 'string')
[docs] def write_federated_table(self, dest_project_id, dest_dataset_id, dest_table_id, schema, source_uris,
overwrite_existing=True, **kwargs):
"""
| Imagine a View for Google Cloud Storage object(s).
| Abstraction of jobs().insert() method for **load** job. [https://cloud.google.com/bigquery/docs/reference/v2/jobs/insert]
:param dest_project_id: Unique project identifier of destination table.
:type dest_project_id: string
:param dest_dataset_id: Unique dataset identifier of destination table.
:type dest_dataset_id: string
:param dest_table_id: Unique table identifier of destination table.
:type dest_table_id: string
:param schema: Schema of input data (schema.fields[]) [https://cloud.google.com/bigquery/docs/reference/v2/tables]
:type schema: list of dictionaries
:param source_uris: One or more uris referencing GCS objects
:type source_uris: string or list
:param overwrite_existing: Safety flag, would raise HttpError if table exists and overwrite_existing=False
:keyword sourceFormat: (Optional) Config kwarg that indicates format of input data.
:keyword skipLeadingRows: (Optional) Config kwarg for leading rows to skip. Defaults to 1 to account for headers if sourceFormat is CSV or default, 0 otherwise.
:keyword fieldDelimiter: (Optional) Config kwarg that indicates field delimiter.
:keyword compression: (Optional) Config kwarg for type of compression applied.
:keyword allowQuotedNewlines: (Optional) Config kwarg indicating presence of quoted newlines in fields.
:return: TableResponse object
"""
request_body = {
'tableReference': {
'projectId': dest_project_id,
'datasetId': dest_dataset_id,
'tableId': dest_table_id
},
'externalDataConfiguration': {
'sourceUris': source_uris if isinstance(source_uris, list) else [source_uris],
'schema': {
'fields': schema
},
'sourceFormat': kwargs.get('sourceFormat', None),
'compression': kwargs.get('sourceFormat', None),
'csvOptions': {
'skipLeadingRows': kwargs.get('skipLeadingRows', 1) if kwargs.get('sourceFormat', None) in (None, 'CSV') else None,
'fieldDelimiter': kwargs.get('fieldDelimiter', None),
'allowQuotedNewlines': kwargs.get('allowQuotedNewlines', None)
}
}
}
# error would be raised if table/view already exists, delete first before reinserting
try:
job_resp = self._service.tables().insert(
projectId=dest_project_id,
datasetId=dest_dataset_id,
body=request_body
).execute(num_retries=self._max_retries)
except HttpError as e:
if e.resp.status == 409 and overwrite_existing:
self.delete_table(
dest_project_id,
dest_dataset_id,
dest_table_id
)
job_resp = self._service.tables().insert(
projectId=dest_project_id,
datasetId=dest_dataset_id,
body=request_body
).execute(num_retries=self._max_retries)
else:
raise e
return TableResponse(job_resp, 'write federated')
[docs] def update_table_info(self, project_id, dataset_id, table_id, table_description=None, schema=None):
"""
Abstraction of tables().patch() method. [https://cloud.google.com/bigquery/docs/reference/v2/tables/patch]
:param project_id: Unique project identifier.
:type project_id: string
:param dataset_id: Unique dataset identifier.
:type dataset_id: string
:param table_id: Unique table identifier.
:type table_id: string
:param table_description: Optional description for table. If None, would not overwrite existing description.
:type table_description: string
:param schema_fields:
:param schema: Schema fields to change (schema.fields[]) [https://cloud.google.com/bigquery/docs/reference/v2/tables]
:type schema: list of dictionaries
:return: TableResponse
"""
request_body = {
'tableReference': {
'projectId': project_id,
'datasetId': dataset_id,
'tableId': table_id
}
}
if table_description is not None:
request_body['description'] = table_description
if schema is not None:
assert isinstance(schema, list)
assert all([isinstance(x, dict) for x in schema])
original_fields = self.get_table_info(
project_id,
dataset_id,
table_id
)['schema']['fields']
# all fields have to be supplied even with table patch, this method checks and updates original fields
# this method won't support adding new fields to prevent potentially accidentally adding etc
# checks that all supplied schema fields are already existing fields
assert all([schema_field['name'] in [x['name'] for x in original_fields] for schema_field in schema])
for schema_field in schema:
for original_field in original_fields:
if schema_field['name'] == original_field['name']:
original_field.update(schema_field)
break
request_body['schema'] = {
'fields': original_fields
}
job_resp = self._service.tables().patch(
projectId=project_id,
datasetId=dataset_id,
tableId=table_id,
body=request_body
).execute(num_retries=self._max_retries)
return TableResponse(job_resp)
[docs] def poll_resp_list(self, response_list, sleep_time=1):
"""
Convenience function for iterating and polling list of responses collected with jobs wait_finish=False.
If any job fails, its respective Error object is returned to ensure errors would not break polling subsequent responses.
:param response_list: List of response objects
:type response_list: list of dicts or JobResponse objects
:param sleep_time: Time to pause (seconds) between polls.
:type sleep_time: integer
:return: List of JobResponse or Error (JobError/HttpError) objects representing job resource's final state.
"""
# to use when setting wait_finish = False to insert jobs without waiting
# store initial responses in list and use this method to iterate and poll responses
assert isinstance(response_list, (list, tuple, set))
return_list = []
for response in response_list:
try:
if isinstance(response, JobResponse):
resp = self.poll_job_status(response.resp, sleep_time)
return_list.append(JobResponse(resp, getattr(response, 'description', None)))
else:
assert isinstance(response, dict)
resp = self.poll_job_status(response, sleep_time)
return_list.append(JobResponse(resp))
except (JobError, HttpError) as e:
return_list.append(e)
return return_list