How often have you kicked off a Bridge refresh and just hoped for the best? Whether manually via Tableau Online or automatically via API, Tableau doesn’t have a reliable way to tell you if the Bridge refresh completed successfully until the last refresh time updates (which can also have a delay). It also can die without notification, which means you don’t know if the job needs to be kicked off again or if you should keep waiting until your stakeholders come looking for you
For those wondering why Bridge continues to be such an important topic with everything moved to the cloud, it comes down to security: most corporate data is going to be on a private network behind a firewall, and that needs Bridge.
One thing that can be confusing for people new to using Bridge is that polling the job status doesn’t tell you if the data source refresh is done; it only tells you that the job was successfully sent to a Bridge Pool Client.
This becomes massively important when running multiple data refreshes while trying to meet an SLA for data freshness. You don’t want to overload your resources but also want to complete it in the minimum amount of time.
So how does one do this? You must poll the data source’s last refresh time and only run as many threads as you are willing to commit Bridge Pool Clients to (in case you need to leave some open for other jobs/departments). Maxing out the number and memory of Bridge Pool Clients is the first solution and by far the biggest bang for your buck.
I find the out-of-the-box Python examples provided by Tableau to sometimes be buggy, so I wrote a bunch of my own scripts to accomplish this in a slightly less Pythonic, but easier-to-understand way for the average data person with less programming experience.
Because these API calls run asynchronously, they can be separately run, canceled, and fail without affecting each other.
The module I use to run multiple threads is:
import concurrent.futures
Here is a sample get-request method with some basic error handling:
def get_request(request_url, auth_header):
# Retry loop
for i in range(0,retries):
try:
# Make an HTTP GET request
response = requests.get(request_url, headers=auth_header)
response_code = response.status_code
if response_code == 401:
print(f"Get: Status 401. Waiting {delay}s to sign in again.")
time.sleep(delay)
delay *= backoff
auth_header, site_id = sign_in()
elif response_code != 200:
print(f"Failed to get {request_url}. Status code: {response_code}. Sleeping for {delay} seconds")
time.sleep(delay)
delay *= backoff
else:
return response
except Exception as e:
print(f"Exception: {e} Retrying signin and post in {delay} seconds")
time.sleep(delay)
delay *= backoff
auth_header, site_id = sign_in()
continue
return f"Failed to get {request_url}"
Then a few functions to refresh the data source and get their last update time:
def refresh_datasource(datasource_id, auth_header, site_id):
# Endpoint for refreshing a data source
refresh_url = f"{tableau_cloud_url}/api/{api_version}/sites/{site_id}/datasources/{datasource_id}/refresh"
# Body for data source refresh
refresh_body = '<tsRequest></tsRequest>'
# Retry loop
for i in range(0,retries):
try:
# Make an HTTP POST request to refresh the data source
refresh_response = requests.post(refresh_url, data=refresh_body, headers=auth_header)
response_code = refresh_response.status_code
if response_code == 401:
print(f"Refresh data source {datasource_id}: Status 401. Waiting {delay}s to sign in again.")
time.sleep(delay)
delay *= backoff
auth_header, site_id = sign_in()
elif response_code == 409:
print(f"Refresh data source {datasource_id}. Status code: {response_code}.")
return response_code, datetime.now(timezone.utc).replace(microsecond=0), "refresh response 409"
# exit and let later error handling try again
elif response_code == 429:
print(f"Refresh data source {datasource_id}. Status code: 429. (Too many requests)")
return response_code, datetime.now(timezone.utc).replace(microsecond=0), "refresh response 429"
elif response_code != 202:
print(f"Failed to request refresh for {datasource_id}. Status code: {response_code}. Sleeping for {delay} seconds")
time.sleep(delay)
delay *= backoff
else:
datasource_name = refresh_response.json()["job"]["extractRefreshJob"]["datasource"]["name"]
job_id = refresh_response.json()["job"]["id"]
refresh_request_datetime = datetime.strptime(refresh_response.json()['job']['createdAt'],'%Y-%m-%dT%H:%M:%SZ')
refresh_request_datetime = utc_timezone.localize(refresh_request_datetime)
print(f"\n[Refresh request for data source {datasource_name} "
f"(ID: {datasource_id}) accepted at {refresh_request_datetime}. \n"
f"(Job ID: {job_id}).]\n")
return response_code, refresh_request_datetime, job_id
except Exception as e:
print(f"Exception: {e} Retrying signin and post in {delay} seconds")
time.sleep(delay)
delay *= backoff
auth_header, site_id = sign_in()
continue
return f"Failed to request refresh for {datasource_name} (ID: {datasource_id})"
def get_datasource_last_refresh(datasource_id, auth_header, site_id):
data_source_info = get_datasource_info(datasource_id, auth_header, site_id)
last_refresh_string = data_source_info.json()["datasource"]["updatedAt"]
last_refresh_datetime = datetime.strptime(last_refresh_string,'%Y-%m-%dT%H:%M:%SZ')
last_refresh_datetime = utc_timezone.localize(last_refresh_datetime)
return last_refresh_datetime
def get_job_info(job_id, auth_header, site_id):
# Endpoint for getting job info
job_url = f"{tableau_cloud_url}/api/{api_version}/sites/{site_id}/jobs/{job_id}"
job_info = get_request(job_url, auth_header)
return job_info
Here is the core method to refresh and check for the data source to refresh:
It has 3 timeouts:
The entire script has a timeout limit timeout_seconds
This method has a timeout refresh_timeout
The subloop has a timeout refresh_loop_timeout before it attempts to kickoff another refresh. This is based on the size data data_size returns and was determined from looking at the historical refresh logs
Because Bridge itself can sometimes lose requests and not notify, we are triggering job runs until we get a completion code. But because this can lead to conflicting requests, we have some exception handling for a 409 response code, which tends to mean there is a concurrent refresh request. If the datasource_info already exists, we know at some point this run successfully triggered a refresh. If there’s a 409 response and there is no datasource_info, then another job was underway and this data source is skipped. Because refreshes don’t fail with notice, if the refresh_loop_timeout is reached it will trigger another refresh but still check against the original start time, because we only care if the last refresh time exceeds the start of this script. Is there possibly a second refresh request sent to Bridge? We unfortunately don’t know without connecting directly to the Bridge machines, which is beyond the scope of this article.
# Refresh each data source and check complete
def refresh_and_check(datasource, start_time, timeout_seconds,
refresh_timeout, auth_header, site_id, path_to_check, refreshed_datasources):
loop_start_time = datetime.now(timezone.utc).replace(microsecond=0)
elapsed_time = round((loop_start_time - start_time).total_seconds(),2)
print(f"Starting refresh & check loop for {datasource['name']} (ID: {datasource['id']}). {loop_start_time}.\n")
job_id_set = set()
job_id_completed_set = set()
if elapsed_time >= timeout_seconds:
return f"[Timeout reached ({timeout_seconds}s). {datasource['name']} (ID: {datasource['id']}) Exiting.]"
else:
# get datasource info
last_info = get_datasource_info(datasource['id'], auth_header, site_id)
data_size = int(last_info.json()["datasource"]["size"])
last_refresh_string = last_info.json()["datasource"]["updatedAt"]
last_refresh_datetime = datetime.strptime(last_refresh_string,'%Y-%m-%dT%H:%M:%SZ')
last_refresh_datetime = utc_timezone.localize(last_refresh_datetime)
# Set refresh check timeout based on data source size
refresh_loop_timeout = loop_timeout_small if data_size < data_size_threshold else loop_timeout_large
datasource_info = None
'''
Logic:
- Kick off refresh
- Loop to check job status
- If job complete start refresh time check loop, handle job errors & timeouts
- Try to refresh and check again if first timeout happens
- We're checking for last refresh time to be > the loop start to assume refresh successful
- Since refreshes aren't cancelled when the loop ends, the earlier loop can finish can end the loop
- We don't want to just wait until a success returns in case there was a failure at the Bridge
so we make a compromise between possible redundent refresh requests and not letting something die
for too long before retrying
'''
loop_number = 1
error_count = 0
while True:
job_id = None
print(f"Loop Number: {loop_number}")
response_code, request_time, job_id_temp = refresh_datasource(datasource['id'], auth_header, site_id)
if response_code == 202:
job_id = job_id_temp
job_id_set.add(job_id)
print(f"Added {job_id} to job_id_set.")
print(f"Writing {datasource['name']} (ID: {datasource['id']}) to datasource_info.")
datasource_info = {
'project_id': datasource['project']['id'],
'project_name': path_to_check,
'id': datasource['id'],
'name': datasource['name'],
'refresh_request_code': response_code,
'request_time': request_time,
'refresh_success': False,
'job_id': job_id
}
# refresh underway from this run
elif response_code == 409 and datasource_info:
job_id = datasource_info['job_id']
print(f"{datasource['name']} (ID: {datasource['id']}) response 409 to refresh request."
f"Continuing job status check loop for job {job_id}.")
# a refresh was already underway before the script was run
elif response_code == 409:
return(f"{datasource['name']} (ID: {datasource['id']}) response 409 to refresh request."
f"Refresh already underway from before script launch. Skipping this datasource.")
# too many server requests on later loops
elif response_code == 429 and datasource_info:
print(f"{datasource['name']} (ID: {datasource['id']}) response 429 (Too Many requests)"
f"Waiting 1 hour for job {job_id}.")
# need to modify timeout to try one more time
refresh_loop_timeout = 7200
time.sleep(3600)
# too many server requests on first try
elif response_code == 429:
print(f"{datasource['name']} (ID: {datasource['id']}) response 429 (Too Many requests)"
f"Waiting 1 hour to try again.")
# need to modify timeout to try one more time
refresh_loop_timeout = 7200
time.sleep(3600)
continue
else:
print("Continuing after unknown exception")
continue
# loop to check job status
job_check_loop = True
loop_start = datetime.now(timezone.utc)
job_info = None
finish_code = 'no response'
while job_check_loop == True:
#timeout checks
loop_duration = round((datetime.now(timezone.utc) - loop_start).total_seconds(),2)
refresh_duration = round((datetime.now(timezone.utc) - request_time).total_seconds(),2)
elapsed_time = round((datetime.now(timezone.utc) - start_time).total_seconds(),2)
if elapsed_time > timeout_seconds:
print(f"[Script Timeout reached ({timeout_seconds}s). Skipping {datasource['name']}. Exiting."
f"({datasource['id']}) {datetime.now(timezone.utc).replace(microsecond=0)}]")
# add datasource to list of refreshed data sources
refreshed_datasources.append(datasource_info)
print_job_statuses(job_id_set, datasource, auth_header, site_id)
return f"[Script timeout reached ({timeout_seconds}s). Exiting.]"
# this data source timeout
elif refresh_duration > refresh_timeout:
print(f"[Timeout reached ({refresh_timeout}s). Skipping {datasource['name']}"
f"({datasource['id']}) {datetime.now(timezone.utc).replace(microsecond=0)}]")
# add datasource to list of refreshed data sources
refreshed_datasources.append(datasource_info)
print_job_statuses(job_id_set, datasource, auth_header, site_id)
return (f"[Timeout reached ({refresh_timeout}s). Skipping {datasource['name']}({datasource['id']}) "
f"{datetime.now(timezone.utc).replace(microsecond=0)}]")
# loop timeout before retry
elif loop_duration > job_check_timeout:
print(f"[Refresh Loop Timeout reached ({job_check_timeout}s). Trying to refresh {datasource['name']} again."
f"({datasource['id']}) {datetime.now(timezone.utc).replace(microsecond=0)}]")
break
time.sleep(wait_to_check_job)
print(f"Getting job {job_id} info")
job_info = get_job_info(job_id, auth_header, site_id)
job_info = job_info.json()['job']
print(job_info)
if 'finishCode' in job_info:
finish_code = job_info['finishCode']
job_id_completed_set.add(job_id)
print(f"Added {job_id} to job_id_completed_set.")
job_check_loop = False
# loop to check last refresh time
if finish_code == '0':
loop_start = datetime.now(timezone.utc)
while True:
#timeout checks
loop_duration = round((datetime.now(timezone.utc) - loop_start).total_seconds(),2)
refresh_duration = round((datetime.now(timezone.utc) - request_time).total_seconds(),2)
elapsed_time = round((datetime.now(timezone.utc) - start_time).total_seconds(),2)
if elapsed_time > timeout_seconds:
print(f"[Script Timeout reached ({timeout_seconds}s). Skipping {datasource['name']}. Exiting."
f"({datasource['id']}) {datetime.now(timezone.utc).replace(microsecond=0)}]")
# add datasource to list of refreshed data sources
refreshed_datasources.append(datasource_info)
print_job_statuses(job_id_set, datasource, auth_header, site_id)
return f"[Script timeout reached ({timeout_seconds}s). Exiting.]"
# this data source timeout
elif refresh_duration > refresh_timeout:
print(f"[Timeout reached ({refresh_timeout}s). Skipping {datasource['name']}"
f"({datasource['id']}) {datetime.now(timezone.utc).replace(microsecond=0)}]")
# add datasource to list of refreshed data sources
refreshed_datasources.append(datasource_info)
print_job_statuses(job_id_set, datasource, auth_header, site_id)
return (f"[Timeout reached ({refresh_timeout}s). Skipping {datasource['name']}({datasource['id']}) "
f"{datetime.now(timezone.utc).replace(microsecond=0)}]")
# loop timeout before retry
elif loop_duration > refresh_loop_timeout:
print(f"[Refresh Loop Timeout reached ({refresh_loop_timeout}s). Trying to refresh {datasource['name']} again."
f"({datasource['id']}) {datetime.now(timezone.utc).replace(microsecond=0)}]")
break
time.sleep(refresh_loop_interval)
print(f"Checking last refresh time of {datasource['name']} (ID: {datasource['id']})")
last_refresh = get_datasource_last_refresh(datasource['id'], auth_header, site_id)
# if the last refresh was after the time of the loop start, tag as successful
if isinstance(last_refresh,datetime):
if last_refresh > loop_start_time:
datasource_info['refresh_success'] = True
# add datasource to list of refreshed data sources
refreshed_datasources.append(datasource_info)
refresh_tries = len(job_id_set)
completed_refresh_tries = len(job_id_completed_set)
refresh_length_s = round((last_refresh - loop_start_time).total_seconds(),2)
# print out all the jobs as a summary for troubleshooting convenience
print_job_statuses(job_id_set, datasource, auth_header, site_id)
return (f"\n***\nRefresh {datasource['name']} ({datasource['id']}) success at: "
f"{last_refresh} Refresh took: {refresh_length_s}s over {completed_refresh_tries}/{refresh_tries} tries.\n***\n")
else:
print(f"{datasource['name']} ({datasource['id']}) still refreshing. "
f"Request time: {request_time}. Last refresh: {last_refresh} "
f"{datetime.now(timezone.utc).replace(microsecond=0)} ({refresh_duration}s)")
else:
print('Unable to get last refresh time')
continue
# else kickoff another refresh loop
elif finish_code == '1':
error_count += 1
if error_count >= 5:
return(f"{datasource['name']} (ID: {datasource['id']}) returned 5 errors from job. Skipping this data source.")
elif job_info['extractRefreshJob']['notes']:
if 'not allowed' in job_info['extractRefreshJob']['notes']:
return(f"{datasource['name']} (ID: {datasource['id']}) permissions error. Skipping this data source.")
else:
# wait to try again
error_pause = error_count * 60
print(f"Data source {datasource['name']} (ID: {datasource['id']}) "
f"finish code 1. Unknown error for Job ID: {job_id}. Trying again after {error_pause}s.")
time.sleep(error_pause)
loop_number += 1
else:
# wait to try again
error_pause = error_count * 60
print(f"Data source {datasource['name']} (ID: {datasource['id']}) "
f"finish code 1. Unknown error for Job ID: {job_id}. Trying again after {error_pause}s.")
time.sleep(error_pause)
loop_number +=1
else:
loop_number += 1
print(f"Data source {datasource['name']} (ID: {datasource['id']}) "
f"finish code: {finish_code} for Job ID: {job_id}. Trying again.")
And to manage the multiple threads. We must pass the manage refresh method a lot of the parameters the inner function needs.
# Function to manage the refreshing process
def manage_refresh(datasources_to_refresh, max_concurrent_refreshes,
start_time, timeout_seconds, refresh_timeout, auth_header,
site_id, path_to_check, refreshed_datasources):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_concurrent_refreshes) as executor:
future_to_ds = {executor.submit(refresh_and_check, datasource,
start_time, timeout_seconds, refresh_timeout,
auth_header, site_id,path_to_check, refreshed_datasources):
datasource for datasource in datasources_to_refresh}
for future in concurrent.futures.as_completed(future_to_ds):
ds_id = future_to_ds[future]
try:
print(future.result())
except Exception as e:
print(f"Data source {ds_id['name']} (ID:{ds_id['id']}) refresh failed with exception: {e}")
The main function works like this:
Get all the datasource ids by traversing the folder or individual data source you’re trying to refresh.
Build a list of them and have the manage_refresh method go through them and record which ones were successful.
Then at the end summarize the results
def is_in_list(target, lookuplist, keys):
return any(all(target[k] == d.get(k) for k in keys) for d in lookuplist)
# Multithreaded
def main(*args):
# Build project_list and include_subproject or source name from arguments
project_list = list(args[:-1])
print(f"Projects to check: {project_list}")
include_subprojects_or_name = args[-1]
# Sign in to Tableau Server
auth_header, site_id = sign_in()
# Build project hierarchy
project_hierarchy = build_project_hierarchy(auth_header, site_id)
# Get all data sources
all_datasources = get_all_datasources(auth_header, site_id)
#print(f"{all_datasources}")
# Initialize lists to track all requested refreshes
refreshed_datasources = []
datasources_to_refresh, path_to_check = get_datasource_refresh_list(
project_list, project_hierarchy,include_subprojects_or_name,all_datasources)
#print(datasources_to_refresh)
# Periodically check the refresh status. Timeout after 5 hours
start_time = datetime.now(timezone.utc).replace(microsecond=0)
manage_refresh(datasources_to_refresh, max_concurrent_refreshes,
start_time, timeout_seconds, refresh_timeout,
auth_header, site_id, path_to_check, refreshed_datasources)
# Print the final message
total_runtime = round((datetime.now(timezone.utc).replace(microsecond=0) - start_time).total_seconds(),2)
if all(datasource.get('refresh_success') for datasource in refreshed_datasources) == True \
and len(refreshed_datasources) == len(datasources_to_refresh):
print(f"[All data sources refreshed successfully! Total runtime: {total_runtime}s]")
else:
print('[Failed to refresh the following data sources:]')
# list all datasources that failed to refresh
for datasource in refreshed_datasources:
if datasource['refresh_success'] == False:
print(f"[{datasource['name']} (ID: {datasource['id']}) in {datasource['project_name']}].")
print(f"[Total runtime: {total_runtime}s]")
if len(refreshed_datasources) != len(datasources_to_refresh):
not_found = [item for item in datasources_to_refresh if not is_in_list(item, refreshed_datasources, ['id'])]
print('[Did not attempt to refresh the following data sources:]')
for item in not_found:
print(f"Name: {item['name']}. (ID: {item['id']})")
try:
# Sign out to release the authentication token
sign_out_url = f"{tableau_cloud_url}/api/{api_version}/auth/signout"
sign_out_response = requests.post(sign_out_url)
print('[Signed out successfully]')
except Exception as sign_out_error:
print(f"Sign-out error: {sign_out_error}")
if __name__ == "__main__":
main(*sys.argv[1:])
Then the function is called like this:
main('Production/My Department/Data Sources’, ‘Sales’) for an individual refresh
or:
main('Production/My Department/Data Sources’, ‘False’) to refresh the whole folder but not the subfolders.
I didn’t list all the supporting functions here, but I hope this helps those who are still struggling with this and some of the 7-year open Tableau Support Community Threads.
Connect with the Author | Roy Wang
If you’re looking for help modernizing your data platform, business intelligence, or preparing your AI-data strategy and infrastructure, reach out to us at: contact.us@keyrus.com