There are many ways to load data into BQ but using Python gives us the most control. Below I’m going to demonstrate loading files into BQ in parallel.

My test scenario is: 191 csv files, sizing from 10Mb – 1Gb each with total size of 30GB; this data translates to 200 million records in BQ.

Obviously loading them sequentially will take a long time therefore I considered multiple threading first. 

Multiple threading

... in Main.... 
try:  
    
    #identify all eligible files based on file_mask
    if run_type.upper() == 'LOCAL':
      files = [f for f in os.listdir(folder_work) if (re.match(file_mask,f) )]        
      os.chdir(folder_work)
    else:        
      all_files = bucket.list_blobs(prefix=bucket_folder_base)
      files = [f.name for f in all_files if (re.match(file_mask,f.name) )]
    
    #randomise the files
    random.shuffle(files)
    #now split the files into multiple processes
    file_arrays = np.array_split(files,number_of_processes)        
    
    workers = []
    for i in range(number_of_processes):
      if run_type.upper() == 'LOCAL':                
        wp = threading.Thread(target=bq_load_local, name='worker%d' % (i + 1), args=(file_arrays[i],job_config, credentials, table_ref, logger,))
      else:        
        wp = threading.Thread(target=bq_load_bucket, name='worker%d' % (i + 1), args=(file_arrays[i],job_config, credentials, table_ref, logger,))
        
      workers.append(wp)
      wp.start()
    
    # At this point, the main process could do some useful work of its own
    # Once it's done that, it can wait for the workers to terminate...
    for wp in workers:
      wp.join()
    
    #get how many rows in the BQ table
    destination_table = bq_client.get_table(table_ref)
    logger.info("{}.{} : Loaded {} rows.".format(dataset_name,table_name, destination_table.num_rows))    
      
  except:
    logger.error("Unexpected error:", sys.exc_info()[0])
    raise

  finally:
    #change back to original folder
    logger.debug('Changed back to original folder:%s ',saved_path)
    os.chdir(saved_path)

    #last log entry
    logger.info(__file__ + ' - Completed')        
#function that will be run in multiple processes
def bq_load_local( files, job_config, credentials, table_ref, logger ):
  
  folder_processed = './processed_test/'
  
  #get the current process's name  
  process_name = threading.current_thread().name  
  logger.info('{}: ----- started ----'.format(process_name))
  
  if len(files)> 0 :   #only create bq session if there is a file to be loaded
    #each process gets its own client    
    bq_client = bigquery.Client(credentials=credentials, project=auth_cfg.auth['project_id'])
    logger.info('{} : GCP authentication & connection successfull....'.format(process_name))   
  
  for file in files:
    with open(file, "rb") as source_file:
      logger.info("{} : start reading file {} ".format(process_name, file))        
      load_job = bq_client.load_table_from_file(source_file, table_ref, job_config=job_config)
      #real work to load the csv
      logger.info("{} : started job {}".format(process_name, load_job.job_id))        
      load_job.result()  # Waits for table load to complete.
      logger.info("{} : finished job {}".format(process_name, load_job.job_id))
        
      #for performance, comment this line out
      #get how many rows in the BQ table
      #destination_table = bq_client.get_table(table_ref)
      #logger.info("{} : Loaded {} rows.".format(process_name, destination_table.num_rows))
        
    #finished processing the csv file, move it to processed folder, with datetime appended
    file_base = os.path.splitext(os.path.basename(file))[0]
    file_ext = os.path.splitext(os.path.basename(file))[1]        
    file_name_new = file_base + '.' + datetime.now().strftime("%Y%m%d_%H%M%S") + file_ext    
    os.rename(file, folder_processed + file_name_new)
    logger.info("{} : {} has been moved to processed folder successfully!".format(process_name, file))
  
  logger.info('{}: ----- finished ----'.format(process_name))

threading .Thread is used to spawn number_of_processes(threads) , wp.start() then wp.join() to wait them to finish. Note logging is thread safe so it can be safely used in function bq_load_local.

In bq_load_local, load_table_from_file is used to read CSV into BQ

As you can see, multiple threading is quite easy to implement. My test scenario is more IO bound than CPU bound, this is probably the best solution; However if you want to experiment multiprocessing, below is the modified script

Multiple processing

try:  
    
    #set up a queue
    q = Queue()
    #use Lock to ensure "Maximum rate of table metadata update operations — 5 operations every 10 seconds per table"
    lock = Lock() 
    
    #for logging module
    lp = threading.Thread(target=logger_thread, args=(q,))
    lp.start()
    
    #identify all eligible files based on file_mask
    if run_type.upper() == 'LOCAL':
      files = [f for f in os.listdir(folder_work) if (re.match(file_mask,f) )]        
      os.chdir(folder_work)
    else:        
      all_files = bucket.list_blobs(prefix=bucket_folder_base)
      files = [f.name for f in all_files if (re.match(file_mask,f.name) )]
    
    #randomise the files
    random.shuffle(files)
    #now split the files into multiple processes
    file_arrays = np.array_split(files,number_of_processes)        
    
    workers = []
    for i in range(number_of_processes):
      if run_type.upper() == 'LOCAL':        
        wp = Process(target=bq_load_local, name='worker%d' % (i + 1), args=(lock, q,file_arrays[i],job_config, credentials, table_ref, logger,))
      else:
        wp = Process(target=bq_load_bucket, name='worker%d' % (i + 1), args=(lock, q,file_arrays[i],job_config, credentials, table_ref, logger,))
        
      workers.append(wp)
      wp.start()
    
    # At this point, the main process could do some useful work of its own
    # Once it's done that, it can wait for the workers to terminate...
    for wp in workers:
      wp.join()
        
    # And now tell the logging thread to finish up, too
    q.put(None)
    lp.join()
    
    #get how many rows in the BQ table
    destination_table = bq_client.get_table(table_ref)
    logger.info("{}.{} : Loaded {} rows.".format(dataset_name,table_name, destination_table.num_rows))    
      
  except:
    logger.error("Unexpected error:", sys.exc_info()[0])
    raise

  finally:
    #change back to original folder
    logger.debug('Changed back to original folder:%s ',saved_path)
    os.chdir(saved_path)

    #last log entry
    logger.info(__file__ + ' - Completed')        
#logging is in the main process but in a separate thread:
#refer to https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes for detail
def logger_thread(q):
  while True:
    record = q.get()
    if record is None:
      break
    logger = logging.getLogger(record.name)
    logger.handle(record)
        
#function that will be run in multiple processes
def bq_load_local(lock, q, files, job_config, credentials, table_ref, logger ):
  #enable logging
  qh = logging.handlers.QueueHandler(q)
  root = logging.getLogger()
  root.setLevel(logging.DEBUG)
  root.addHandler(qh)
  
  folder_processed = './processed_test/'
  
  #get the current process's name
  process_name = current_process().name    
  logger.info('{}: ----- started ----'.format(process_name))
  
  if len(files)> 0 :   #only create bq session if there is a file to be loaded
    #each process gets its own client    
    bq_client = bigquery.Client(credentials=credentials, project=auth_cfg.auth['project_id'])
    logger.info('{} : GCP authentication & connection successfull....'.format(process_name))   
  
  #lock bq resource
  lock.acquire(True, 2)
  for file in files:
    with open(file, "rb") as source_file:
      logger.info("{} : start reading file {} ".format(process_name, file))        
      load_job = bq_client.load_table_from_file(source_file, table_ref, job_config=job_config)
      #reall work to load the csv
      logger.info("{} : started job {}".format(process_name, load_job.job_id))        
      load_job.result()  # Waits for table load to complete.
      logger.info("{} : finished job {}".format(process_name, load_job.job_id))
        
      #for performance, comment this line out
      #get how many rows in the BQ table
      #destination_table = bq_client.get_table(table_ref)
      #logger.info("{} : Loaded {} rows.".format(process_name, destination_table.num_rows))
        
    #finished processing the csv file, move it to processed folder, with datetime appended
    file_base = os.path.splitext(os.path.basename(file))[0]
    file_ext = os.path.splitext(os.path.basename(file))[1]        
    file_name_new = file_base + '.' + datetime.now().strftime("%Y%m%d_%H%M%S") + file_ext
    #logger.info("{} : folder_processed {}".format(process_name, folder_processed))   
    #logger.info("{} : file_name_new {}".format(process_name, file_name_new))           
    os.rename(file, folder_processed + file_name_new)
    logger.info("{} : {} has been moved to processed folder successfully!".format(process_name, file))
  
  #put a lock first then release so that no RuntimeError for "semaphore or lock released too many times"
  lock.acquire(True,0)
  #release the lock for this session
  lock.release() 
  
  logger.info('{}: ----- finished ----'.format(process_name))

In order to use logging in multiprocess mode, the script has to be modified by utilizing a queue. A lock is also used as when many processes are running, it’s likely BQ resource limitation is reached: in my case, 5 operations per 10 seconds on a BQ table. Refer to this

In general, multiprocessing is very similar to multi-threading. Multiple processes obviously will consume more memory and at startup, my machine reached 100% cpu for a short period but during execution there was no bottleneck(My machine has a 4-core CPU with 16Gb memory). I did notice a performance gain comparing to multi-threading, I think it’s probably because my task was more IO intensive and memory/cpu were not constrained.

A variant of the above is by using Pool class

 try:
    
    #identify all eligible files based on file_mask
    files = [f for f in os.listdir(folder_work) if (re.match(file_mask,f) )]        
    os.chdir(folder_work)
    #randomise the file list
    random.shuffle(files)
    
    #set up a queue
    q = Queue()
    
    #based on number of cores
    #pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    pool = multiprocessing.Pool(processes=20)
    
    #use partial to re-shape the function
    bq_load_p=partial(bq_load,job_config = job_config, table_ref = table_ref ) # prod_x has only one argument x (y is fixed to 10)
    
    pool.map(bq_load_p,files)
    #workers = []
    #for i in range(10):
    #  wp = Process(target=bq_load, name='worker%d' % (i + 1), args=(q,file_arrays[i],job_config, credentials, table_ref, logger,))
    #  workers.append(wp)
    #  wp.start()
    
    lp = threading.Thread(target=logger_thread, args=(q,))
    lp.start()
    
    # At this point, the main process could do some useful work of its own
    # Once it's done that, it can wait for the workers to terminate...
    #for wp in workers:
    #    wp.join()
    # And now tell the logging thread to finish up, too
    q.put(None)
    lp.join()
       
      
  except:
    logger.error("Unexpected error:", sys.exc_info()[0])
    raise

  finally:
    #change back to original folder
    logger.debug('Changed back to original folder:%s ',saved_path)
    os.chdir(saved_path)

    #last log entry
    logger.info(__file__ + ' - Completed')        

#function that will be run in multiple processes
def bq_load(file, job_config, table_ref ):
  folder_processed = './processed_test/'
  
  #get the current process's name
  process_name = current_process().name    
  
  #logger.info('------ {} started ----'.format(process_name))
  print('------ {} started ----'.format(process_name))
  
  #qh = logging.handlers.QueueHandler(q)
  #root = logging.getLogger()
  #root.setLevel(logging.DEBUG)
  #root.addHandler(qh)
    
  #each process gets its own client
  try:
    if auth_cfg.auth['use_oauth']=='Y':
        auth_client  = auth.Client(service=False)
    else:
        auth_client = auth.Client() 
    
    credentials = auth_client.credentials
    bq_client = bigquery.Client(credentials=credentials, project=auth_cfg.auth['project_id'])
    #print('GCP authentication & connection successfull....')
  except:
    print.error('Issue with GCP authentication or connection....')
    print.error('SCRIPT TERMINATED!')    
    sys.exit()
    
  #credentials = auth_client.credentials    
  #bq_client = bigquery.Client(credentials=credentials, project=auth_cfg.auth['project_id'])
  #logger.info('{} : GCP authentication & connection successfull....'.format(process_name))
  print('{} : GCP authentication & connection successfull....'.format(process_name))
   
  
  #for file in files:
  with open(file, "rb") as source_file:
    print('{} : loading file {}'.format(process_name, file))
    time.sleep(3)  #BQ restriction: Maximum rate of table metadata update operations — 5 operations every 10 seconds per table
    load_job = bq_client.load_table_from_file(source_file, table_ref, job_config=job_config)
    #reall work to load the csv
    #logger.info("{} : Started job {}".format(process_name, load_job.job_id))        
    print("{} : BQ started job {} for {} ".format(process_name, load_job.job_id, file)) 
    load_job.result()  # Waits for table load to complete.
    print("{} : BQ finished job {} for {} ".format(process_name, load_job.job_id,file))
        
    #get how many rows in the BQ table
    #destination_table = bq_client.get_table(table_ref)
    #print("{} : Loaded {} rows.".format(process_name, destination_table.num_rows))
        
  #finished processing the csv file, move it to processed folder, with datetime appended
  file_base = os.path.splitext(os.path.basename(file))[0]
  file_ext = os.path.splitext(os.path.basename(file))[1]        
  file_name_new = file_base + '.' + datetime.now().strftime("%Y%m%d_%H%M%S") + file_ext
  #logger.info("{} : folder_processed {}".format(process_name, folder_processed))   
  #logger.info("{} : file_name_new {}".format(process_name, file_name_new))           
  os.rename(file, folder_processed + file_name_new)
  print("{} : {} has been moved to processed folder successfully!".format(process_name, file))

Use GCP bucket

Another different approach is to upload all files to GCP storage and loading all files from the bucket. This turns out to be quicker as disk IO is now on GCP platform. Below is the demo code (based on multi-processing code base)

def bq_load_bucket(lock, q, files, job_config, credentials, table_ref, logger ):
  
  #enable logging
  qh = logging.handlers.QueueHandler(q)
  root = logging.getLogger()
  root.setLevel(logging.DEBUG)
  root.addHandler(qh)
  
  #get variables from app config
  #bucket and bucket folder are two different objects!
  bucket_name = app_cfg.bucket['bucket_name']
  bucket_name_processed = app_cfg.bucket['bucket_name_processed']
  
  bucket_folder_base = app_cfg.bucket['bucket_folder_base']
  #bucket_folder_processed = app_cfg.bucket['bucket_folder_processed']
    
  storage_client = storage.Client(credentials=credentials, project=auth_cfg.auth['project_id'])
  bucket = storage_client.get_bucket(bucket_name)
  #best practice that processed folder is better in another bucket as otherwise:  
  #a. bucket list will show all files under a bucket ; b. different cost, retention policy
  bucket_processed = storage_client.get_bucket(bucket_name_processed)
    
  #get the current process's name
  process_name = current_process().name    
  logger.info('{}: ----- started ----'.format(process_name))
  
  if len(files)> 0 :   #only create bq session if there is a file to be loaded
    #each process gets its own client    
    bq_client = bigquery.Client(credentials=credentials, project=auth_cfg.auth['project_id'])
    logger.info('{} : GCP authentication & connection successfull....'.format(process_name))   

Further enhancement

For my 30Gb data(191 CSV files), multi-processing took 30 minutes, multiple-threading took 40 minutes and bucket loading took 20 minutes(apart from uploading time to the bucket).

Further performance improvement could be multiple nodes(machines), you might be using RQ or Celery, this article might be helpful

Bigquery parallel loading
Tagged on:                 

Leave a Reply

Your email address will not be published. Required fields are marked *

− 4 = 4

This site uses Akismet to reduce spam. Learn how your comment data is processed.