top of page

GCP Bigquery tables have the following format while querying them on GCP BQ(BigQuery) console.


project_id.dataset_id.table_id


where project_id is the project under which you are creating the big query dataset and then the table.


here are the following steps and the python code to connect to GCP and BQ and create the dataset and table and then load data.


First, we will import the required packages.


from google.cloud import bigquery

file_to_load = r'C:\File.csv' # file to load into BQ

the following code creates the GCBQ class where we create the respective methods to execute different steps.


use the GOOGLE_APPLICATION_CREDENTIALS environment variable to configure the key file from GCP to connect to BQ(BigQuery). You can refer my previous post for more information.


Create class GCBQ


class GCBQ: def __init__(self, client): self.client = client

Method to get project id from the json key file (Mostly not needed but just keeping it here).

def getProjectId(self, jsonfile): with open(jsonfile, ) as jsonob: getdata = json.load(jsonob) print(getdata['project_id']) return getdata['project_id']

Method to get the dataset id

def getBQDsetId(self, required_dataset_id_name, clientBQ): try: dataset = clientBQ.get_dataset(required_dataset_id_name) except Exception as e: return None return dataset

Method to get the table id

def getBQTableId(self, required_table_id_name, clientBQ): try: table = clientBQ.get_table(required_table_id_name) except Exception as e: return None return table


Method to create the dataset id def createDatasetBQ(self, required_dataset_id_name, client): dataset = bigquery.Dataset(required_dataset_id_name) dataset.location = "us-west2" dataset = client.create_dataset(dataset) print("Created dataset {}.{}".format(client.project, dataset.dataset_id)) return dataset

Method to create the table id with the required schema of the file that is being loaded into BigQuery

def createTableBQ(self, required_table_id_name, required_dataset_id_name, client): schema = [


Considering the file that we are loading has ID and FirstName as column names we Create the BQ table with the same names. If you column names have spaces then keep the space while creating the columns for the BQ table.

We need to check if the mode is REQUIRED or NULLABLE depending on the data in the file that we are loading. I suggest keeping NULLABLE

bigquery.SchemaField("ID", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("FirstName", "STRING", mode="REQUIRED"), ]


# below is the format for the BQ table

table = bigquery.Table("{}.{}.{}".format(client.project, required_dataset_id_name, required_table_id_name), schema=schema) table = client.create_table(table) #print(table) print( "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id) ) return table

# Method to load the file into the table that we created above

def loaddataIntoBQ(self, client, file_to_load, mybucket, tableId, dataset_id): job_config = bigquery.LoadJobConfig( source_format=bigquery.SourceFormat.CSV, skip_leading_rows=1, # skip for header

autodetect=True, ) with open(file_to_load, 'rb') as fload: job = client.load_table_from_file(fload,"{}.{}.{}".format(client.project, dataset_id, tableId),job_config=job_config) job.result() # Waits for the job to complete. table = client.get_table("{}.{}.{}".format(client.project, dataset_id, tableId)) print(table) print( "Loaded {} rows and {} columns to {}".format( table.num_rows, len(table.schema), tableId ) ) return True


# Main method to align the workflow.


def main(): connBQ = GCBQ(bigquery.Client()) datasetId = 'universal110211' tableId = 'UniversalBank

dataset_id = connBQ.getBQDsetId(datasetId, connBQ.client) # print(dataset_id) if dataset_id is None: dataset_id = connBQ.createDatasetBQ("{}.{}".format(connBQ.client.project, datasetId), connBQ.client) # print(dataset_id) table_id = connBQ.getBQTableId("{}.{}.{}".format(connBQ.client.project, dataset_id, tableId), connBQ.client) # print(table_id) if table_id is not None: table_id = connBQ.createTableBQ(tableId, datasetId, connBQ.client) fileLoadedToBQ = connBQ.loaddataIntoBQ(connBQ.client, file_to_load, mybucket, tableId, datasetId) if __name__ == "__main__": main()

After the code exists with 0 you can login to GCP and check your dataset and then the table with the data that we have loaded.

 
 
 

Updated: Dec 24, 2020


We are going to see how we can use python to load file into GCP Storage bucket.


At First, Create the GCP Storage Client that you need to connect to the GCP Storage API


I have created a class and respective functions to do the needful.


Mind the indentation of the code for it to work correctly in an IDE or .py file


The Environment variable GOOGLE_APPLICATION_CREDENTIALS should be set at the IDE level or at the OS level which points to the JSON file key that has been created in google cloud console under service account creation so that your system can talk to GCP to do the needful like creating a bucket for now.


Important:

Please download and manage the JSON security key file safely somewhere where your IDE can access or it would result in hacking of you google cloud platform and respective resources.


Import the respective packages if already installed or use pip install to install the same


from gcloud import storage


Local file assigned to a variable to be used later.


file_to_load_final = r'C:\File.csv'

Observe the letter r in front of the single quote above in r'C:\File.csv'. This is used to avoid double back slashes when mentioning an explicit path and in-turn you can use a single back slash to mention the path of the file if you mention r.

Create the GCStorage class


class GCStorage: def __init__(self, client): self.client = client

Create a method to create the GC bucket

def createGCBucket(self, clientCS, bucketname): bucket = clientCS.bucket(bucketname) bucket.location = 'Location of the bucket' bucket.create() return bucket

Create a method to get the GC bucket if it already exists else return None.

def getGCBucket(self, clientCS, bucketname): try: bucket = clientCS.get_bucket(bucketname) except Exception as e: return None return bucket

Create a method to copy the file from local drive to GCP bucket.

def copylocaltoGCPBucket(self, local_file_path, mybucket): blob = mybucket.blob((os.path.basename(local_file_path)))

with open(local_file_path, 'rb') as docc: blob.upload_from_file(docc) return True


Main method to integrate everything we have done before into a single workflow.


def main(): connGCS = GCStorage(storage.Client()) bucketName = 'Name of bucket' mybucket = connGCS.getGCBucket(connGCS.client, bucketName) if mybucket is None: mybucket = connGCS.createGCBucket(connGCS.client, bucketName) fileCopied = connGCS.copylocaltoGCPBucket(file_to_load_final, mybucket) if fileCopied: print('File Copied to GCP Bucket {}', format(mybucket))



if __name__ == "__main__": main()


Add the print statements where ever necessary


After everything in coded in an orderly manner and then executed, you can login to GCP to see your file in the bucket that you have created

 
 
 
bottom of page