How often have you kicked off a Bridge refresh and just hoped for the best? Whether manually via Tableau Online or automatically via API, Tableau doesn’t have a reliable way to tell you if the Bridge refresh completed successfully until the last refresh time updates (which can also have a delay). It also can die without notification, which means you don’t know if the job needs to be kicked off again or if you should keep waiting until your stakeholders come looking for you
For those wondering why Bridge continues to be such an important topic with everything moved to the cloud, it comes down to security: most corporate data is going to be on a private network behind a firewall, and that needs Bridge.
One thing that can be confusing for people new to using Bridge is that polling the job status doesn’t tell you if the data source refresh is done; it only tells you that the job was successfully sent to a Bridge Pool Client.
This becomes massively important when running multiple data refreshes while trying to meet an SLA for data freshness. You don’t want to overload your resources but also want to complete it in the minimum amount of time.
So how does one do this? You must poll the data source’s last refresh time and only run as many threads as you are willing to commit Bridge Pool Clients to (in case you need to leave some open for other jobs/departments). Maxing out the number and memory of Bridge Pool Clients is the first solution and by far the biggest bang for your buck.
I find the out-of-the-box Python examples provided by Tableau to sometimes be buggy, so I wrote a bunch of my own scripts to accomplish this in a slightly less Pythonic, but easier-to-understand way for the average data person with less programming experience.
Because these API calls run asynchronously, they can be separately run, canceled, and fail without affecting each other.
The module I use to run multiple threads is:
import concurrent.futures Here is a sample get-request method with some basic error handling:
def get_request(request_url, auth_header): # Retry loop for i in range(0,retries): try: # Make an HTTP GET request response = requests.get(request_url, headers=auth_header) response_code = response.status_code if response_code == 401: print(f"Get: Status 401. Waiting {delay}s to sign in again.") time.sleep(delay) delay *= backoff auth_header, site_id = sign_in() elif response_code != 200: print(f"Failed to get {request_url}. Status code: {response_code}. Sleeping for {delay} seconds") time.sleep(delay) delay *= backoff else: return response except Exception as e: print(f"Exception: {e} Retrying signin and post in {delay} seconds") time.sleep(delay) delay *= backoff auth_header, site_id = sign_in() continue return f"Failed to get {request_url}" Then a few functions to refresh the data source and get their last update time:
def refresh_datasource(datasource_id, auth_header, site_id): # Endpoint for refreshing a data source refresh_url = f"{tableau_cloud_url}/api/{api_version}/sites/{site_id}/datasources/{datasource_id}/refresh" # Body for data source refresh refresh_body = '<tsRequest></tsRequest>' # Retry loop for i in range(0,retries): try: # Make an HTTP POST request to refresh the data source refresh_response = requests.post(refresh_url, data=refresh_body, headers=auth_header) response_code = refresh_response.status_code if response_code == 401: print(f"Refresh data source {datasource_id}: Status 401. Waiting {delay}s to sign in again.") time.sleep(delay) delay *= backoff auth_header, site_id = sign_in() elif response_code == 409: print(f"Refresh data source {datasource_id}. Status code: {response_code}.") return response_code, datetime.now(timezone.utc).replace(microsecond=0), "refresh response 409" # exit and let later error handling try again elif response_code == 429: print(f"Refresh data source {datasource_id}. Status code: 429. (Too many requests)") return response_code, datetime.now(timezone.utc).replace(microsecond=0), "refresh response 429" elif response_code != 202: print(f"Failed to request refresh for {datasource_id}. Status code: {response_code}. Sleeping for {delay} seconds") time.sleep(delay) delay *= backoff else: datasource_name = refresh_response.json()["job"]["extractRefreshJob"]["datasource"]["name"] job_id = refresh_response.json()["job"]["id"] refresh_request_datetime = datetime.strptime(refresh_response.json()['job']['createdAt'],'%Y-%m-%dT%H:%M:%SZ') refresh_request_datetime = utc_timezone.localize(refresh_request_datetime) print(f"\n[Refresh request for data source {datasource_name} " f"(ID: {datasource_id}) accepted at {refresh_request_datetime}. \n" f"(Job ID: {job_id}).]\n") return response_code, refresh_request_datetime, job_id except Exception as e: print(f"Exception: {e} Retrying signin and post in {delay} seconds") time.sleep(delay) delay *= backoff auth_header, site_id = sign_in() continue return f"Failed to request refresh for {datasource_name} (ID: {datasource_id})" def get_datasource_last_refresh(datasource_id, auth_header, site_id): data_source_info = get_datasource_info(datasource_id, auth_header, site_id) last_refresh_string = data_source_info.json()["datasource"]["updatedAt"] last_refresh_datetime = datetime.strptime(last_refresh_string,'%Y-%m-%dT%H:%M:%SZ') last_refresh_datetime = utc_timezone.localize(last_refresh_datetime) return last_refresh_datetime def get_job_info(job_id, auth_header, site_id): # Endpoint for getting job info job_url = f"{tableau_cloud_url}/api/{api_version}/sites/{site_id}/jobs/{job_id}" job_info = get_request(job_url, auth_header) return job_info Here is the core method to refresh and check for the data source to refresh:
It has 3 timeouts:
The entire script has a timeout limit timeout_seconds
This method has a timeout refresh_timeout
The subloop has a timeout refresh_loop_timeout before it attempts to kickoff another refresh. This is based on the size data data_size returns and was determined from looking at the historical refresh logs
Because Bridge itself can sometimes lose requests and not notify, we are triggering job runs until we get a completion code. But because this can lead to conflicting requests, we have some exception handling for a 409 response code, which tends to mean there is a concurrent refresh request. If the datasource_info already exists, we know at some point this run successfully triggered a refresh. If there’s a 409 response and there is no datasource_info, then another job was underway and this data source is skipped. Because refreshes don’t fail with notice, if the refresh_loop_timeout is reached it will trigger another refresh but still check against the original start time, because we only care if the last refresh time exceeds the start of this script. Is there possibly a second refresh request sent to Bridge? We unfortunately don’t know without connecting directly to the Bridge machines, which is beyond the scope of this article.
# Refresh each data source and check completedef refresh_and_check(datasource, start_time, timeout_seconds, refresh_timeout, auth_header, site_id, path_to_check, refreshed_datasources):loop_start_time = datetime.now(timezone.utc).replace(microsecond=0)elapsed_time = round((loop_start_time - start_time).total_seconds(),2)print(f"Starting refresh & check loop for {datasource['name']} (ID: {datasource['id']}). {loop_start_time}.\n")job_id_set = set()job_id_completed_set = set()if elapsed_time >= timeout_seconds:return f"[Timeout reached ({timeout_seconds}s). {datasource['name']} (ID: {datasource['id']}) Exiting.]"else: # get datasource infolast_info = get_datasource_info(datasource['id'], auth_header, site_id)data_size = int(last_info.json()["datasource"]["size"])last_refresh_string = last_info.json()["datasource"]["updatedAt"]last_refresh_datetime = datetime.strptime(last_refresh_string,'%Y-%m-%dT%H:%M:%SZ')last_refresh_datetime = utc_timezone.localize(last_refresh_datetime)# Set refresh check timeout based on data source sizerefresh_loop_timeout = loop_timeout_small if data_size < data_size_threshold else loop_timeout_largedatasource_info = None'''Logic: - Kick off refresh- Loop to check job status- If job complete start refresh time check loop, handle job errors & timeouts- Try to refresh and check again if first timeout happens- We're checking for last refresh time to be > the loop start to assume refresh successful- Since refreshes aren't cancelled when the loop ends, the earlier loop can finish can end the loop- We don't want to just wait until a success returns in case there was a failure at the Bridgeso we make a compromise between possible redundent refresh requests and not letting something die for too long before retrying'''loop_number = 1error_count = 0while True:job_id = None print(f"Loop Number: {loop_number}")response_code, request_time, job_id_temp = refresh_datasource(datasource['id'], auth_header, site_id)if response_code == 202:job_id = job_id_tempjob_id_set.add(job_id)print(f"Added {job_id} to job_id_set.")print(f"Writing {datasource['name']} (ID: {datasource['id']}) to datasource_info.")datasource_info = {'project_id': datasource['project']['id'],'project_name': path_to_check,'id': datasource['id'],'name': datasource['name'],'refresh_request_code': response_code,'request_time': request_time,'refresh_success': False,'job_id': job_id}# refresh underway from this runelif response_code == 409 and datasource_info:job_id = datasource_info['job_id']print(f"{datasource['name']} (ID: {datasource['id']}) response 409 to refresh request."f"Continuing job status check loop for job {job_id}.")# a refresh was already underway before the script was runelif response_code == 409:return(f"{datasource['name']} (ID: {datasource['id']}) response 409 to refresh request."f"Refresh already underway from before script launch. Skipping this datasource.")# too many server requests on later loops elif response_code == 429 and datasource_info:print(f"{datasource['name']} (ID: {datasource['id']}) response 429 (Too Many requests)"f"Waiting 1 hour for job {job_id}.")# need to modify timeout to try one more timerefresh_loop_timeout = 7200time.sleep(3600)# too many server requests on first try elif response_code == 429:print(f"{datasource['name']} (ID: {datasource['id']}) response 429 (Too Many requests)"f"Waiting 1 hour to try again.")# need to modify timeout to try one more timerefresh_loop_timeout = 7200time.sleep(3600)continueelse:print("Continuing after unknown exception")continue# loop to check job statusjob_check_loop = Trueloop_start = datetime.now(timezone.utc)job_info = Nonefinish_code = 'no response'while job_check_loop == True:#timeout checksloop_duration = round((datetime.now(timezone.utc) - loop_start).total_seconds(),2)refresh_duration = round((datetime.now(timezone.utc) - request_time).total_seconds(),2)elapsed_time = round((datetime.now(timezone.utc) - start_time).total_seconds(),2)if elapsed_time > timeout_seconds:print(f"[Script Timeout reached ({timeout_seconds}s). Skipping {datasource['name']}. Exiting."f"({datasource['id']}) {datetime.now(timezone.utc).replace(microsecond=0)}]")# add datasource to list of refreshed data sourcesrefreshed_datasources.append(datasource_info)print_job_statuses(job_id_set, datasource, auth_header, site_id)return f"[Script timeout reached ({timeout_seconds}s). Exiting.]"# this data source timeoutelif refresh_duration > refresh_timeout:print(f"[Timeout reached ({refresh_timeout}s). Skipping {datasource['name']}"f"({datasource['id']}) {datetime.now(timezone.utc).replace(microsecond=0)}]")# add datasource to list of refreshed data sourcesrefreshed_datasources.append(datasource_info)print_job_statuses(job_id_set, datasource, auth_header, site_id)return (f"[Timeout reached ({refresh_timeout}s). Skipping {datasource['name']}({datasource['id']}) "f"{datetime.now(timezone.utc).replace(microsecond=0)}]") # loop timeout before retry elif loop_duration > job_check_timeout:print(f"[Refresh Loop Timeout reached ({job_check_timeout}s). Trying to refresh {datasource['name']} again."f"({datasource['id']}) {datetime.now(timezone.utc).replace(microsecond=0)}]")breaktime.sleep(wait_to_check_job)print(f"Getting job {job_id} info")job_info = get_job_info(job_id, auth_header, site_id)job_info = job_info.json()['job']print(job_info)if 'finishCode' in job_info:finish_code = job_info['finishCode']job_id_completed_set.add(job_id)print(f"Added {job_id} to job_id_completed_set.")job_check_loop = False# loop to check last refresh timeif finish_code == '0':loop_start = datetime.now(timezone.utc)while True:#timeout checksloop_duration = round((datetime.now(timezone.utc) - loop_start).total_seconds(),2)refresh_duration = round((datetime.now(timezone.utc) - request_time).total_seconds(),2)elapsed_time = round((datetime.now(timezone.utc) - start_time).total_seconds(),2)if elapsed_time > timeout_seconds:print(f"[Script Timeout reached ({timeout_seconds}s). Skipping {datasource['name']}. Exiting."f"({datasource['id']}) {datetime.now(timezone.utc).replace(microsecond=0)}]")# add datasource to list of refreshed data sourcesrefreshed_datasources.append(datasource_info)print_job_statuses(job_id_set, datasource, auth_header, site_id)return f"[Script timeout reached ({timeout_seconds}s). Exiting.]"# this data source timeoutelif refresh_duration > refresh_timeout:print(f"[Timeout reached ({refresh_timeout}s). Skipping {datasource['name']}"f"({datasource['id']}) {datetime.now(timezone.utc).replace(microsecond=0)}]")# add datasource to list of refreshed data sourcesrefreshed_datasources.append(datasource_info)print_job_statuses(job_id_set, datasource, auth_header, site_id)return (f"[Timeout reached ({refresh_timeout}s). Skipping {datasource['name']}({datasource['id']}) "f"{datetime.now(timezone.utc).replace(microsecond=0)}]") # loop timeout before retry elif loop_duration > refresh_loop_timeout:print(f"[Refresh Loop Timeout reached ({refresh_loop_timeout}s). Trying to refresh {datasource['name']} again."f"({datasource['id']}) {datetime.now(timezone.utc).replace(microsecond=0)}]")breaktime.sleep(refresh_loop_interval)print(f"Checking last refresh time of {datasource['name']} (ID: {datasource['id']})")last_refresh = get_datasource_last_refresh(datasource['id'], auth_header, site_id)# if the last refresh was after the time of the loop start, tag as successfulif isinstance(last_refresh,datetime):if last_refresh > loop_start_time:datasource_info['refresh_success'] = True# add datasource to list of refreshed data sourcesrefreshed_datasources.append(datasource_info)refresh_tries = len(job_id_set)completed_refresh_tries = len(job_id_completed_set)refresh_length_s = round((last_refresh - loop_start_time).total_seconds(),2)# print out all the jobs as a summary for troubleshooting convenienceprint_job_statuses(job_id_set, datasource, auth_header, site_id)return (f"\n***\nRefresh {datasource['name']} ({datasource['id']}) success at: "f"{last_refresh} Refresh took: {refresh_length_s}s over {completed_refresh_tries}/{refresh_tries} tries.\n***\n")else:print(f"{datasource['name']} ({datasource['id']}) still refreshing. "f"Request time: {request_time}. Last refresh: {last_refresh} "f"{datetime.now(timezone.utc).replace(microsecond=0)} ({refresh_duration}s)")else:print('Unable to get last refresh time')continue # else kickoff another refresh loopelif finish_code == '1':error_count += 1if error_count >= 5:return(f"{datasource['name']} (ID: {datasource['id']}) returned 5 errors from job. Skipping this data source.")elif job_info['extractRefreshJob']['notes']:if 'not allowed' in job_info['extractRefreshJob']['notes']:return(f"{datasource['name']} (ID: {datasource['id']}) permissions error. Skipping this data source.")else:# wait to try againerror_pause = error_count * 60print(f"Data source {datasource['name']} (ID: {datasource['id']}) "f"finish code 1. Unknown error for Job ID: {job_id}. Trying again after {error_pause}s.") time.sleep(error_pause) loop_number += 1else:# wait to try againerror_pause = error_count * 60print(f"Data source {datasource['name']} (ID: {datasource['id']}) "f"finish code 1. Unknown error for Job ID: {job_id}. Trying again after {error_pause}s.") time.sleep(error_pause)loop_number +=1else:loop_number += 1print(f"Data source {datasource['name']} (ID: {datasource['id']}) "f"finish code: {finish_code} for Job ID: {job_id}. Trying again.") And to manage the multiple threads. We must pass the manage refresh method a lot of the parameters the inner function needs.
# Function to manage the refreshing processdef manage_refresh(datasources_to_refresh, max_concurrent_refreshes, start_time, timeout_seconds, refresh_timeout, auth_header, site_id, path_to_check, refreshed_datasources):with concurrent.futures.ThreadPoolExecutor(max_workers=max_concurrent_refreshes) as executor:future_to_ds = {executor.submit(refresh_and_check, datasource, start_time, timeout_seconds, refresh_timeout, auth_header, site_id,path_to_check, refreshed_datasources): datasource for datasource in datasources_to_refresh}for future in concurrent.futures.as_completed(future_to_ds):ds_id = future_to_ds[future]try:print(future.result())except Exception as e:print(f"Data source {ds_id['name']} (ID:{ds_id['id']}) refresh failed with exception: {e}")The main function works like this:
Get all the datasource ids by traversing the folder or individual data source you’re trying to refresh.
Build a list of them and have the manage_refresh method go through them and record which ones were successful.
Then at the end summarize the results
def is_in_list(target, lookuplist, keys):return any(all(target[k] == d.get(k) for k in keys) for d in lookuplist)# Multithreadeddef main(*args):# Build project_list and include_subproject or source name from argumentsproject_list = list(args[:-1])print(f"Projects to check: {project_list}")include_subprojects_or_name = args[-1]# Sign in to Tableau Serverauth_header, site_id = sign_in()# Build project hierarchyproject_hierarchy = build_project_hierarchy(auth_header, site_id)# Get all data sourcesall_datasources = get_all_datasources(auth_header, site_id)#print(f"{all_datasources}")# Initialize lists to track all requested refreshesrefreshed_datasources = []datasources_to_refresh, path_to_check = get_datasource_refresh_list(project_list, project_hierarchy,include_subprojects_or_name,all_datasources)#print(datasources_to_refresh)# Periodically check the refresh status. Timeout after 5 hoursstart_time = datetime.now(timezone.utc).replace(microsecond=0) manage_refresh(datasources_to_refresh, max_concurrent_refreshes, start_time, timeout_seconds, refresh_timeout, auth_header, site_id, path_to_check, refreshed_datasources)# Print the final messagetotal_runtime = round((datetime.now(timezone.utc).replace(microsecond=0) - start_time).total_seconds(),2)if all(datasource.get('refresh_success') for datasource in refreshed_datasources) == True \and len(refreshed_datasources) == len(datasources_to_refresh):print(f"[All data sources refreshed successfully! Total runtime: {total_runtime}s]")else:print('[Failed to refresh the following data sources:]')# list all datasources that failed to refreshfor datasource in refreshed_datasources:if datasource['refresh_success'] == False:print(f"[{datasource['name']} (ID: {datasource['id']}) in {datasource['project_name']}].")print(f"[Total runtime: {total_runtime}s]")if len(refreshed_datasources) != len(datasources_to_refresh):not_found = [item for item in datasources_to_refresh if not is_in_list(item, refreshed_datasources, ['id'])] print('[Did not attempt to refresh the following data sources:]')for item in not_found:print(f"Name: {item['name']}. (ID: {item['id']})")try:# Sign out to release the authentication tokensign_out_url = f"{tableau_cloud_url}/api/{api_version}/auth/signout"sign_out_response = requests.post(sign_out_url)print('[Signed out successfully]')except Exception as sign_out_error:print(f"Sign-out error: {sign_out_error}")if __name__ == "__main__":main(*sys.argv[1:])Then the function is called like this:
main('Production/My Department/Data Sources’, ‘Sales’) for an individual refresh
or:
main('Production/My Department/Data Sources’, ‘False’) to refresh the whole folder but not the subfolders.
I didn’t list all the supporting functions here, but I hope this helps those who are still struggling with this and some of the 7-year open Tableau Support Community Threads.
Connect with the Author | Roy Wang
If you’re looking for help modernizing your data platform, business intelligence, or preparing your AI-data strategy and infrastructure, reach out to us at: contact.us@keyrus.com
