There are many ways to load data into BQ but using Python gives us the most control. Below I’m going to demonstrate loading files into BQ in parallel.
My test scenario is: 191 csv files, sizing from 10Mb – 1Gb each with total size of 30GB; this data translates to 200 million records in BQ.
Obviously loading them sequentially will take a long time therefore I considered multiple threading first.
Multiple threading
... in Main....
try:
#identify all eligible files based on file_mask
if run_type.upper() == 'LOCAL':
files = [f for f in os.listdir(folder_work) if (re.match(file_mask,f) )]
os.chdir(folder_work)
else:
all_files = bucket.list_blobs(prefix=bucket_folder_base)
files = [f.name for f in all_files if (re.match(file_mask,f.name) )]
#randomise the files
random.shuffle(files)
#now split the files into multiple processes
file_arrays = np.array_split(files,number_of_processes)
workers = []
for i in range(number_of_processes):
if run_type.upper() == 'LOCAL':
wp = threading.Thread(target=bq_load_local, name='worker%d' % (i + 1), args=(file_arrays[i],job_config, credentials, table_ref, logger,))
else:
wp = threading.Thread(target=bq_load_bucket, name='worker%d' % (i + 1), args=(file_arrays[i],job_config, credentials, table_ref, logger,))
workers.append(wp)
wp.start()
# At this point, the main process could do some useful work of its own
# Once it's done that, it can wait for the workers to terminate...
for wp in workers:
wp.join()
#get how many rows in the BQ table
destination_table = bq_client.get_table(table_ref)
logger.info("{}.{} : Loaded {} rows.".format(dataset_name,table_name, destination_table.num_rows))
except:
logger.error("Unexpected error:", sys.exc_info()[0])
raise
finally:
#change back to original folder
logger.debug('Changed back to original folder:%s ',saved_path)
os.chdir(saved_path)
#last log entry
logger.info(__file__ + ' - Completed')
#function that will be run in multiple processes
def bq_load_local( files, job_config, credentials, table_ref, logger ):
folder_processed = './processed_test/'
#get the current process's name
process_name = threading.current_thread().name
logger.info('{}: ----- started ----'.format(process_name))
if len(files)> 0 : #only create bq session if there is a file to be loaded
#each process gets its own client
bq_client = bigquery.Client(credentials=credentials, project=auth_cfg.auth['project_id'])
logger.info('{} : GCP authentication & connection successfull....'.format(process_name))
for file in files:
with open(file, "rb") as source_file:
logger.info("{} : start reading file {} ".format(process_name, file))
load_job = bq_client.load_table_from_file(source_file, table_ref, job_config=job_config)
#real work to load the csv
logger.info("{} : started job {}".format(process_name, load_job.job_id))
load_job.result() # Waits for table load to complete.
logger.info("{} : finished job {}".format(process_name, load_job.job_id))
#for performance, comment this line out
#get how many rows in the BQ table
#destination_table = bq_client.get_table(table_ref)
#logger.info("{} : Loaded {} rows.".format(process_name, destination_table.num_rows))
#finished processing the csv file, move it to processed folder, with datetime appended
file_base = os.path.splitext(os.path.basename(file))[0]
file_ext = os.path.splitext(os.path.basename(file))[1]
file_name_new = file_base + '.' + datetime.now().strftime("%Y%m%d_%H%M%S") + file_ext
os.rename(file, folder_processed + file_name_new)
logger.info("{} : {} has been moved to processed folder successfully!".format(process_name, file))
logger.info('{}: ----- finished ----'.format(process_name))
threading .Thread is used to spawn number_of_processes(threads) , wp.start() then wp.join() to wait them to finish. Note logging is thread safe so it can be safely used in function bq_load_local.
In bq_load_local, load_table_from_file is used to read CSV into BQ
As you can see, multiple threading is quite easy to implement. My test scenario is more IO bound than CPU bound, this is probably the best solution; However if you want to experiment multiprocessing, below is the modified script
Multiple processing
try:
#set up a queue
q = Queue()
#use Lock to ensure "Maximum rate of table metadata update operations — 5 operations every 10 seconds per table"
lock = Lock()
#for logging module
lp = threading.Thread(target=logger_thread, args=(q,))
lp.start()
#identify all eligible files based on file_mask
if run_type.upper() == 'LOCAL':
files = [f for f in os.listdir(folder_work) if (re.match(file_mask,f) )]
os.chdir(folder_work)
else:
all_files = bucket.list_blobs(prefix=bucket_folder_base)
files = [f.name for f in all_files if (re.match(file_mask,f.name) )]
#randomise the files
random.shuffle(files)
#now split the files into multiple processes
file_arrays = np.array_split(files,number_of_processes)
workers = []
for i in range(number_of_processes):
if run_type.upper() == 'LOCAL':
wp = Process(target=bq_load_local, name='worker%d' % (i + 1), args=(lock, q,file_arrays[i],job_config, credentials, table_ref, logger,))
else:
wp = Process(target=bq_load_bucket, name='worker%d' % (i + 1), args=(lock, q,file_arrays[i],job_config, credentials, table_ref, logger,))
workers.append(wp)
wp.start()
# At this point, the main process could do some useful work of its own
# Once it's done that, it can wait for the workers to terminate...
for wp in workers:
wp.join()
# And now tell the logging thread to finish up, too
q.put(None)
lp.join()
#get how many rows in the BQ table
destination_table = bq_client.get_table(table_ref)
logger.info("{}.{} : Loaded {} rows.".format(dataset_name,table_name, destination_table.num_rows))
except:
logger.error("Unexpected error:", sys.exc_info()[0])
raise
finally:
#change back to original folder
logger.debug('Changed back to original folder:%s ',saved_path)
os.chdir(saved_path)
#last log entry
logger.info(__file__ + ' - Completed')
#logging is in the main process but in a separate thread:
#refer to https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes for detail
def logger_thread(q):
while True:
record = q.get()
if record is None:
break
logger = logging.getLogger(record.name)
logger.handle(record)
#function that will be run in multiple processes
def bq_load_local(lock, q, files, job_config, credentials, table_ref, logger ):
#enable logging
qh = logging.handlers.QueueHandler(q)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(qh)
folder_processed = './processed_test/'
#get the current process's name
process_name = current_process().name
logger.info('{}: ----- started ----'.format(process_name))
if len(files)> 0 : #only create bq session if there is a file to be loaded
#each process gets its own client
bq_client = bigquery.Client(credentials=credentials, project=auth_cfg.auth['project_id'])
logger.info('{} : GCP authentication & connection successfull....'.format(process_name))
#lock bq resource
lock.acquire(True, 2)
for file in files:
with open(file, "rb") as source_file:
logger.info("{} : start reading file {} ".format(process_name, file))
load_job = bq_client.load_table_from_file(source_file, table_ref, job_config=job_config)
#reall work to load the csv
logger.info("{} : started job {}".format(process_name, load_job.job_id))
load_job.result() # Waits for table load to complete.
logger.info("{} : finished job {}".format(process_name, load_job.job_id))
#for performance, comment this line out
#get how many rows in the BQ table
#destination_table = bq_client.get_table(table_ref)
#logger.info("{} : Loaded {} rows.".format(process_name, destination_table.num_rows))
#finished processing the csv file, move it to processed folder, with datetime appended
file_base = os.path.splitext(os.path.basename(file))[0]
file_ext = os.path.splitext(os.path.basename(file))[1]
file_name_new = file_base + '.' + datetime.now().strftime("%Y%m%d_%H%M%S") + file_ext
#logger.info("{} : folder_processed {}".format(process_name, folder_processed))
#logger.info("{} : file_name_new {}".format(process_name, file_name_new))
os.rename(file, folder_processed + file_name_new)
logger.info("{} : {} has been moved to processed folder successfully!".format(process_name, file))
#put a lock first then release so that no RuntimeError for "semaphore or lock released too many times"
lock.acquire(True,0)
#release the lock for this session
lock.release()
logger.info('{}: ----- finished ----'.format(process_name))
In order to use logging in multiprocess mode, the script has to be modified by utilizing a queue. A lock is also used as when many processes are running, it’s likely BQ resource limitation is reached: in my case, 5 operations per 10 seconds on a BQ table. Refer to this
In general, multiprocessing is very similar to multi-threading. Multiple processes obviously will consume more memory and at startup, my machine reached 100% cpu for a short period but during execution there was no bottleneck(My machine has a 4-core CPU with 16Gb memory). I did notice a performance gain comparing to multi-threading, I think it’s probably because my task was more IO intensive and memory/cpu were not constrained.
A variant of the above is by using Pool class
try:
#identify all eligible files based on file_mask
files = [f for f in os.listdir(folder_work) if (re.match(file_mask,f) )]
os.chdir(folder_work)
#randomise the file list
random.shuffle(files)
#set up a queue
q = Queue()
#based on number of cores
#pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
pool = multiprocessing.Pool(processes=20)
#use partial to re-shape the function
bq_load_p=partial(bq_load,job_config = job_config, table_ref = table_ref ) # prod_x has only one argument x (y is fixed to 10)
pool.map(bq_load_p,files)
#workers = []
#for i in range(10):
# wp = Process(target=bq_load, name='worker%d' % (i + 1), args=(q,file_arrays[i],job_config, credentials, table_ref, logger,))
# workers.append(wp)
# wp.start()
lp = threading.Thread(target=logger_thread, args=(q,))
lp.start()
# At this point, the main process could do some useful work of its own
# Once it's done that, it can wait for the workers to terminate...
#for wp in workers:
# wp.join()
# And now tell the logging thread to finish up, too
q.put(None)
lp.join()
except:
logger.error("Unexpected error:", sys.exc_info()[0])
raise
finally:
#change back to original folder
logger.debug('Changed back to original folder:%s ',saved_path)
os.chdir(saved_path)
#last log entry
logger.info(__file__ + ' - Completed')
#function that will be run in multiple processes
def bq_load(file, job_config, table_ref ):
folder_processed = './processed_test/'
#get the current process's name
process_name = current_process().name
#logger.info('------ {} started ----'.format(process_name))
print('------ {} started ----'.format(process_name))
#qh = logging.handlers.QueueHandler(q)
#root = logging.getLogger()
#root.setLevel(logging.DEBUG)
#root.addHandler(qh)
#each process gets its own client
try:
if auth_cfg.auth['use_oauth']=='Y':
auth_client = auth.Client(service=False)
else:
auth_client = auth.Client()
credentials = auth_client.credentials
bq_client = bigquery.Client(credentials=credentials, project=auth_cfg.auth['project_id'])
#print('GCP authentication & connection successfull....')
except:
print.error('Issue with GCP authentication or connection....')
print.error('SCRIPT TERMINATED!')
sys.exit()
#credentials = auth_client.credentials
#bq_client = bigquery.Client(credentials=credentials, project=auth_cfg.auth['project_id'])
#logger.info('{} : GCP authentication & connection successfull....'.format(process_name))
print('{} : GCP authentication & connection successfull....'.format(process_name))
#for file in files:
with open(file, "rb") as source_file:
print('{} : loading file {}'.format(process_name, file))
time.sleep(3) #BQ restriction: Maximum rate of table metadata update operations — 5 operations every 10 seconds per table
load_job = bq_client.load_table_from_file(source_file, table_ref, job_config=job_config)
#reall work to load the csv
#logger.info("{} : Started job {}".format(process_name, load_job.job_id))
print("{} : BQ started job {} for {} ".format(process_name, load_job.job_id, file))
load_job.result() # Waits for table load to complete.
print("{} : BQ finished job {} for {} ".format(process_name, load_job.job_id,file))
#get how many rows in the BQ table
#destination_table = bq_client.get_table(table_ref)
#print("{} : Loaded {} rows.".format(process_name, destination_table.num_rows))
#finished processing the csv file, move it to processed folder, with datetime appended
file_base = os.path.splitext(os.path.basename(file))[0]
file_ext = os.path.splitext(os.path.basename(file))[1]
file_name_new = file_base + '.' + datetime.now().strftime("%Y%m%d_%H%M%S") + file_ext
#logger.info("{} : folder_processed {}".format(process_name, folder_processed))
#logger.info("{} : file_name_new {}".format(process_name, file_name_new))
os.rename(file, folder_processed + file_name_new)
print("{} : {} has been moved to processed folder successfully!".format(process_name, file))
Use GCP bucket
Another different approach is to upload all files to GCP storage and loading all files from the bucket. This turns out to be quicker as disk IO is now on GCP platform. Below is the demo code (based on multi-processing code base)
def bq_load_bucket(lock, q, files, job_config, credentials, table_ref, logger ):
#enable logging
qh = logging.handlers.QueueHandler(q)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(qh)
#get variables from app config
#bucket and bucket folder are two different objects!
bucket_name = app_cfg.bucket['bucket_name']
bucket_name_processed = app_cfg.bucket['bucket_name_processed']
bucket_folder_base = app_cfg.bucket['bucket_folder_base']
#bucket_folder_processed = app_cfg.bucket['bucket_folder_processed']
storage_client = storage.Client(credentials=credentials, project=auth_cfg.auth['project_id'])
bucket = storage_client.get_bucket(bucket_name)
#best practice that processed folder is better in another bucket as otherwise:
#a. bucket list will show all files under a bucket ; b. different cost, retention policy
bucket_processed = storage_client.get_bucket(bucket_name_processed)
#get the current process's name
process_name = current_process().name
logger.info('{}: ----- started ----'.format(process_name))
if len(files)> 0 : #only create bq session if there is a file to be loaded
#each process gets its own client
bq_client = bigquery.Client(credentials=credentials, project=auth_cfg.auth['project_id'])
logger.info('{} : GCP authentication & connection successfull....'.format(process_name))
Further enhancement
For my 30Gb data(191 CSV files), multi-processing took 30 minutes, multiple-threading took 40 minutes and bucket loading took 20 minutes(apart from uploading time to the bucket).
Further performance improvement could be multiple nodes(machines), you might be using RQ or Celery, this article might be helpful