Logo - Keyrus
  • Playbook
  • Services
    Data advisory & consulting
    Data & analytics solutions
    Artificial Intelligence (AI)
    Enterprise Performance Management (EPM)
    Digital & multi-experience
  • Insights
  • Partners
  • Careers
  • About us
    What sets us apart
    Company purpose
    Innovation & Technologies
    Committed Keyrus
    Regulatory compliance
    Investors
    Management team
    Brands
    Locations
  • Contact UsJoin us

Expert opinion

Refresh Tableau Bridge Reliably via API: Optimize Error Handling When Missing Failure Notifications

Written by Roy Wang, Managing Consultant, Keyrus North America (NORAM)

How often have you kicked off a Bridge refresh and just hoped for the best? Whether manually via Tableau Online or automatically via API, Tableau doesn’t have a reliable way to tell you if the Bridge refresh completed successfully until the last refresh time updates (which can also have a delay). It also can die without notification, which means you don’t know if the job needs to be kicked off again or if you should keep waiting until your stakeholders come looking for you 

For those wondering why Bridge continues to be such an important topic with everything moved to the cloud, it comes down to security: most corporate data is going to be on a private network behind a firewall, and that needs Bridge.  

One thing that can be confusing for people new to using Bridge is that polling the job status doesn’t tell you if the data source refresh is done; it only tells you that the job was successfully sent to a Bridge Pool Client.  

This becomes massively important when running multiple data refreshes while trying to meet an SLA for data freshness. You don’t want to overload your resources but also want to complete it in the minimum amount of time.  

So how does one do this? You must poll the data source’s last refresh time and only run as many threads as you are willing to commit Bridge Pool Clients to (in case you need to leave some open for other jobs/departments). Maxing out the number and memory of Bridge Pool Clients is the first solution and by far the biggest bang for your buck. 

I find the out-of-the-box Python examples provided by Tableau to sometimes be buggy, so I wrote a bunch of my own scripts to accomplish this in a slightly less Pythonic, but easier-to-understand way for the average data person with less programming experience.  

Because these API calls run asynchronously, they can be separately run, canceled, and fail without affecting each other.

The module I use to run multiple threads is:  

import concurrent.futures  

Here is a sample get-request method with some basic error handling:  

def get_request(request_url, auth_header): 

    # Retry loop 

    for i in range(0,retries): 

        try: 

            # Make an HTTP GET request 

            response = requests.get(request_url, headers=auth_header) 

            response_code = response.status_code 

            if response_code == 401: 

                print(f"Get: Status 401. Waiting {delay}s to sign in again.") 

                time.sleep(delay) 

                delay *= backoff 

                auth_header, site_id = sign_in() 

            elif response_code != 200: 

                print(f"Failed to get {request_url}. Status code: {response_code}. Sleeping for {delay} seconds") 

                time.sleep(delay) 

                delay *= backoff 

            else:             

                return response 

        except Exception as e: 

            print(f"Exception: {e} Retrying signin and post in {delay} seconds") 

            time.sleep(delay) 

            delay *= backoff 

            auth_header, site_id = sign_in() 

            continue 

    return f"Failed to get {request_url}" 

Then a few functions to refresh the data source and get their last update time: 

def refresh_datasource(datasource_id, auth_header, site_id): 

    # Endpoint for refreshing a data source 

    refresh_url = f"{tableau_cloud_url}/api/{api_version}/sites/{site_id}/datasources/{datasource_id}/refresh" 

    # Body for data source refresh 

    refresh_body = '<tsRequest></tsRequest>'     

    # Retry loop 

    for i in range(0,retries): 

        try: 

            # Make an HTTP POST request to refresh the data source 

            refresh_response = requests.post(refresh_url, data=refresh_body, headers=auth_header) 

            response_code = refresh_response.status_code 

            if response_code == 401: 

                print(f"Refresh data source {datasource_id}: Status 401. Waiting {delay}s to sign in again.") 

                time.sleep(delay) 

                delay *= backoff 

                auth_header, site_id = sign_in() 

            elif response_code == 409: 

                print(f"Refresh data source {datasource_id}. Status code: {response_code}.") 

                return response_code, datetime.now(timezone.utc).replace(microsecond=0), "refresh response 409" 

            # exit and let later error handling try again 

            elif response_code == 429: 

                print(f"Refresh data source {datasource_id}. Status code: 429. (Too many requests)") 

                return response_code, datetime.now(timezone.utc).replace(microsecond=0), "refresh response 429" 

            elif response_code != 202: 

                print(f"Failed to request refresh for {datasource_id}. Status code: {response_code}. Sleeping for {delay} seconds") 

                time.sleep(delay) 

                delay *= backoff 

            else: 

                datasource_name = refresh_response.json()["job"]["extractRefreshJob"]["datasource"]["name"] 

                job_id = refresh_response.json()["job"]["id"] 

                refresh_request_datetime = datetime.strptime(refresh_response.json()['job']['createdAt'],'%Y-%m-%dT%H:%M:%SZ') 

                refresh_request_datetime = utc_timezone.localize(refresh_request_datetime) 

                print(f"\n[Refresh request for data source {datasource_name} " 

                      f"(ID: {datasource_id}) accepted at {refresh_request_datetime}. \n" 

                      f"(Job ID: {job_id}).]\n")  

                return response_code, refresh_request_datetime, job_id 

        except Exception as e: 

            print(f"Exception: {e} Retrying signin and post in {delay} seconds") 

            time.sleep(delay) 

            delay *= backoff 

            auth_header, site_id = sign_in() 

            continue 

    return f"Failed to request refresh for {datasource_name} (ID: {datasource_id})" 

def get_datasource_last_refresh(datasource_id, auth_header, site_id): 

    data_source_info = get_datasource_info(datasource_id, auth_header, site_id) 

    last_refresh_string = data_source_info.json()["datasource"]["updatedAt"] 

    last_refresh_datetime = datetime.strptime(last_refresh_string,'%Y-%m-%dT%H:%M:%SZ') 

    last_refresh_datetime = utc_timezone.localize(last_refresh_datetime) 

    return last_refresh_datetime  

def get_job_info(job_id, auth_header, site_id): 

    # Endpoint for getting job info 

    job_url = f"{tableau_cloud_url}/api/{api_version}/sites/{site_id}/jobs/{job_id}" 

    job_info = get_request(job_url, auth_header) 

    return job_info 

Here is the core method to refresh and check for the data source to refresh: 

It has 3 timeouts:  

  • The entire script has a timeout limit timeout_seconds 

  • This method has a timeout refresh_timeout 

  • The subloop has a timeout refresh_loop_timeout before it attempts to kickoff another refresh. This is based on the size data data_size returns and was determined from looking at the historical refresh logs 

Because Bridge itself can sometimes lose requests and not notify, we are triggering job runs until we get a completion code. But because this can lead to conflicting requests, we have some exception handling for a 409 response code, which tends to mean there is a concurrent refresh request. If the datasource_info already exists, we know at some point this run successfully triggered a refresh. If there’s a 409 response and there is no datasource_info, then another job was underway and this data source is skipped.  Because refreshes don’t fail with notice, if the refresh_loop_timeout is reached it will trigger another refresh but still check against the original start time, because we only care if the last refresh time exceeds the start of this script. Is there possibly a second refresh request sent to Bridge? We unfortunately don’t know without connecting directly to the Bridge machines, which is beyond the scope of this article. 

# Refresh each data source and check complete

def refresh_and_check(datasource, start_time, timeout_seconds, 

refresh_timeout, auth_header, site_id, path_to_check, refreshed_datasources):

loop_start_time = datetime.now(timezone.utc).replace(microsecond=0)

elapsed_time = round((loop_start_time - start_time).total_seconds(),2)

print(f"Starting refresh & check loop for {datasource['name']} (ID: {datasource['id']}). {loop_start_time}.\n")

job_id_set = set()

job_id_completed_set = set()

if elapsed_time >= timeout_seconds:

return f"[Timeout reached ({timeout_seconds}s). {datasource['name']} (ID: {datasource['id']}) Exiting.]"

else: 

# get datasource info

last_info = get_datasource_info(datasource['id'], auth_header, site_id)

data_size = int(last_info.json()["datasource"]["size"])

last_refresh_string = last_info.json()["datasource"]["updatedAt"]

last_refresh_datetime = datetime.strptime(last_refresh_string,'%Y-%m-%dT%H:%M:%SZ')

last_refresh_datetime = utc_timezone.localize(last_refresh_datetime)

# Set refresh check timeout based on data source size

refresh_loop_timeout = loop_timeout_small if data_size < data_size_threshold else loop_timeout_large

datasource_info = None

'''

Logic: 

- Kick off refresh

- Loop to check job status

- If job complete start refresh time check loop, handle job errors & timeouts

- Try to refresh and check again if first timeout happens

- We're checking for last refresh time to be > the loop start to assume refresh successful

- Since refreshes aren't cancelled when the loop ends, the earlier loop can finish can end the loop

- We don't want to just wait until a success returns in case there was a failure at the Bridge

so we make a compromise between possible redundent refresh requests and not letting something die 

for too long before retrying

'''

loop_number = 1

error_count = 0

while True:

job_id = None 

print(f"Loop Number: {loop_number}")

response_code, request_time, job_id_temp = refresh_datasource(datasource['id'], auth_header, site_id)

if response_code == 202:

job_id = job_id_temp

job_id_set.add(job_id)

print(f"Added {job_id} to job_id_set.")

print(f"Writing {datasource['name']} (ID: {datasource['id']}) to datasource_info.")

datasource_info = {

'project_id': datasource['project']['id'],

'project_name': path_to_check,

'id': datasource['id'],

'name': datasource['name'],

'refresh_request_code': response_code,

'request_time': request_time,

'refresh_success': False,

'job_id': job_id

}

# refresh underway from this run

elif response_code == 409 and datasource_info:

job_id = datasource_info['job_id']

print(f"{datasource['name']} (ID: {datasource['id']}) response 409 to refresh request."

f"Continuing job status check loop for job {job_id}.")

# a refresh was already underway before the script was run

elif response_code == 409:

return(f"{datasource['name']} (ID: {datasource['id']}) response 409 to refresh request."

f"Refresh already underway from before script launch. Skipping this datasource.")

# too many server requests on later loops 

elif response_code == 429 and datasource_info:

print(f"{datasource['name']} (ID: {datasource['id']}) response 429 (Too Many requests)"

f"Waiting 1 hour for job {job_id}.")

# need to modify timeout to try one more time

refresh_loop_timeout = 7200

time.sleep(3600)

# too many server requests on first try 

elif response_code == 429:

print(f"{datasource['name']} (ID: {datasource['id']}) response 429 (Too Many requests)"

f"Waiting 1 hour to try again.")

# need to modify timeout to try one more time

refresh_loop_timeout = 7200

time.sleep(3600)

continue

else:

print("Continuing after unknown exception")

continue

# loop to check job status

job_check_loop = True

loop_start = datetime.now(timezone.utc)

job_info = None

finish_code = 'no response'

while job_check_loop == True:

#timeout checks

loop_duration = round((datetime.now(timezone.utc) - loop_start).total_seconds(),2)

refresh_duration = round((datetime.now(timezone.utc) - request_time).total_seconds(),2)

elapsed_time = round((datetime.now(timezone.utc) - start_time).total_seconds(),2)

if elapsed_time > timeout_seconds:

print(f"[Script Timeout reached ({timeout_seconds}s). Skipping {datasource['name']}. Exiting."

f"({datasource['id']}) {datetime.now(timezone.utc).replace(microsecond=0)}]")

# add datasource to list of refreshed data sources

refreshed_datasources.append(datasource_info)

print_job_statuses(job_id_set, datasource, auth_header, site_id)

return f"[Script timeout reached ({timeout_seconds}s). Exiting.]"

# this data source timeout

elif refresh_duration > refresh_timeout:

print(f"[Timeout reached ({refresh_timeout}s). Skipping {datasource['name']}"

f"({datasource['id']}) {datetime.now(timezone.utc).replace(microsecond=0)}]")

# add datasource to list of refreshed data sources

refreshed_datasources.append(datasource_info)

print_job_statuses(job_id_set, datasource, auth_header, site_id)

return (f"[Timeout reached ({refresh_timeout}s). Skipping {datasource['name']}({datasource['id']}) "

f"{datetime.now(timezone.utc).replace(microsecond=0)}]") 

# loop timeout before retry 

elif loop_duration > job_check_timeout:

print(f"[Refresh Loop Timeout reached ({job_check_timeout}s). Trying to refresh {datasource['name']} again."

f"({datasource['id']}) {datetime.now(timezone.utc).replace(microsecond=0)}]")

break

time.sleep(wait_to_check_job)

print(f"Getting job {job_id} info")

job_info = get_job_info(job_id, auth_header, site_id)

job_info = job_info.json()['job']

print(job_info)

if 'finishCode' in job_info:

finish_code = job_info['finishCode']

job_id_completed_set.add(job_id)

print(f"Added {job_id} to job_id_completed_set.")

job_check_loop = False

# loop to check last refresh time

if finish_code == '0':

loop_start = datetime.now(timezone.utc)

while True:

#timeout checks

loop_duration = round((datetime.now(timezone.utc) - loop_start).total_seconds(),2)

refresh_duration = round((datetime.now(timezone.utc) - request_time).total_seconds(),2)

elapsed_time = round((datetime.now(timezone.utc) - start_time).total_seconds(),2)

if elapsed_time > timeout_seconds:

print(f"[Script Timeout reached ({timeout_seconds}s). Skipping {datasource['name']}. Exiting."

f"({datasource['id']}) {datetime.now(timezone.utc).replace(microsecond=0)}]")

# add datasource to list of refreshed data sources

refreshed_datasources.append(datasource_info)

print_job_statuses(job_id_set, datasource, auth_header, site_id)

return f"[Script timeout reached ({timeout_seconds}s). Exiting.]"

# this data source timeout

elif refresh_duration > refresh_timeout:

print(f"[Timeout reached ({refresh_timeout}s). Skipping {datasource['name']}"

f"({datasource['id']}) {datetime.now(timezone.utc).replace(microsecond=0)}]")

# add datasource to list of refreshed data sources

refreshed_datasources.append(datasource_info)

print_job_statuses(job_id_set, datasource, auth_header, site_id)

return (f"[Timeout reached ({refresh_timeout}s). Skipping {datasource['name']}({datasource['id']}) "

f"{datetime.now(timezone.utc).replace(microsecond=0)}]") 

# loop timeout before retry 

elif loop_duration > refresh_loop_timeout:

print(f"[Refresh Loop Timeout reached ({refresh_loop_timeout}s). Trying to refresh {datasource['name']} again."

f"({datasource['id']}) {datetime.now(timezone.utc).replace(microsecond=0)}]")

break

time.sleep(refresh_loop_interval)

print(f"Checking last refresh time of {datasource['name']} (ID: {datasource['id']})")

last_refresh = get_datasource_last_refresh(datasource['id'], auth_header, site_id)

# if the last refresh was after the time of the loop start, tag as successful

if isinstance(last_refresh,datetime):

if last_refresh > loop_start_time:

datasource_info['refresh_success'] = True

# add datasource to list of refreshed data sources

refreshed_datasources.append(datasource_info)

refresh_tries = len(job_id_set)

completed_refresh_tries = len(job_id_completed_set)

refresh_length_s = round((last_refresh - loop_start_time).total_seconds(),2)

# print out all the jobs as a summary for troubleshooting convenience

print_job_statuses(job_id_set, datasource, auth_header, site_id)

return (f"\n***\nRefresh {datasource['name']} ({datasource['id']}) success at: "

f"{last_refresh} Refresh took: {refresh_length_s}s over {completed_refresh_tries}/{refresh_tries} tries.\n***\n")

else:

print(f"{datasource['name']} ({datasource['id']}) still refreshing. "

f"Request time: {request_time}. Last refresh: {last_refresh} "

f"{datetime.now(timezone.utc).replace(microsecond=0)} ({refresh_duration}s)")

else:

print('Unable to get last refresh time')

continue 

# else kickoff another refresh loop

elif finish_code == '1':

error_count += 1

if error_count >= 5:

return(f"{datasource['name']} (ID: {datasource['id']}) returned 5 errors from job. Skipping this data source.")

elif job_info['extractRefreshJob']['notes']:

if 'not allowed' in job_info['extractRefreshJob']['notes']:

return(f"{datasource['name']} (ID: {datasource['id']}) permissions error. Skipping this data source.")

else:

# wait to try again

error_pause = error_count * 60

print(f"Data source {datasource['name']} (ID: {datasource['id']}) "

f"finish code 1. Unknown error for Job ID: {job_id}. Trying again after {error_pause}s.") 

time.sleep(error_pause) 

loop_number += 1

else:

# wait to try again

error_pause = error_count * 60

print(f"Data source {datasource['name']} (ID: {datasource['id']}) "

f"finish code 1. Unknown error for Job ID: {job_id}. Trying again after {error_pause}s.") 

time.sleep(error_pause)

loop_number +=1

else:

loop_number += 1

print(f"Data source {datasource['name']} (ID: {datasource['id']}) "

f"finish code: {finish_code} for Job ID: {job_id}. Trying again.") 

And to manage the multiple threads. We must pass the manage refresh method a lot of the parameters the inner function needs.

# Function to manage the refreshing process

def manage_refresh(datasources_to_refresh, max_concurrent_refreshes, 

start_time, timeout_seconds, refresh_timeout, auth_header, 

site_id, path_to_check, refreshed_datasources):

with concurrent.futures.ThreadPoolExecutor(max_workers=max_concurrent_refreshes) as executor:

future_to_ds = {executor.submit(refresh_and_check, datasource, 

start_time, timeout_seconds, refresh_timeout, 

auth_header, site_id,path_to_check, refreshed_datasources): 

datasource for datasource in datasources_to_refresh}

for future in concurrent.futures.as_completed(future_to_ds):

ds_id = future_to_ds[future]

try:

print(future.result())

except Exception as e:

print(f"Data source {ds_id['name']} (ID:{ds_id['id']}) refresh failed with exception: {e}")

The main function works like this:

Get all the datasource ids by traversing the folder or individual data source you’re trying to refresh.

Build a list of them and have the manage_refresh method go through them and record which ones were successful.

Then at the end summarize the results

def is_in_list(target, lookuplist, keys):

return any(all(target[k] == d.get(k) for k in keys) for d in lookuplist)

# Multithreaded

def main(*args):

# Build project_list and include_subproject or source name from arguments

project_list = list(args[:-1])

print(f"Projects to check: {project_list}")

include_subprojects_or_name = args[-1]

# Sign in to Tableau Server

auth_header, site_id = sign_in()

# Build project hierarchy

project_hierarchy = build_project_hierarchy(auth_header, site_id)

# Get all data sources

all_datasources = get_all_datasources(auth_header, site_id)

#print(f"{all_datasources}")

# Initialize lists to track all requested refreshes

refreshed_datasources = []

datasources_to_refresh, path_to_check = get_datasource_refresh_list(

project_list, project_hierarchy,include_subprojects_or_name,all_datasources)

#print(datasources_to_refresh)

# Periodically check the refresh status. Timeout after 5 hours

start_time = datetime.now(timezone.utc).replace(microsecond=0) 

manage_refresh(datasources_to_refresh, max_concurrent_refreshes, 

start_time, timeout_seconds, refresh_timeout, 

auth_header, site_id, path_to_check, refreshed_datasources)

# Print the final message

total_runtime = round((datetime.now(timezone.utc).replace(microsecond=0) - start_time).total_seconds(),2)

if all(datasource.get('refresh_success') for datasource in refreshed_datasources) == True \

and len(refreshed_datasources) == len(datasources_to_refresh):

print(f"[All data sources refreshed successfully! Total runtime: {total_runtime}s]")

else:

print('[Failed to refresh the following data sources:]')

# list all datasources that failed to refresh

for datasource in refreshed_datasources:

if datasource['refresh_success'] == False:

print(f"[{datasource['name']} (ID: {datasource['id']}) in {datasource['project_name']}].")

print(f"[Total runtime: {total_runtime}s]")

if len(refreshed_datasources) != len(datasources_to_refresh):

not_found = [item for item in datasources_to_refresh if not is_in_list(item, refreshed_datasources, ['id'])] 

print('[Did not attempt to refresh the following data sources:]')

for item in not_found:

print(f"Name: {item['name']}. (ID: {item['id']})")

try:

# Sign out to release the authentication token

sign_out_url = f"{tableau_cloud_url}/api/{api_version}/auth/signout"

sign_out_response = requests.post(sign_out_url)

print('[Signed out successfully]')

except Exception as sign_out_error:

print(f"Sign-out error: {sign_out_error}")

if __name__ == "__main__":

main(*sys.argv[1:])


Then the function is called like this:

main('Production/My Department/Data Sources’, ‘Sales’) for an individual refresh

or:

main('Production/My Department/Data Sources’, ‘False’) to refresh the whole folder but not the subfolders.

I didn’t list all the supporting functions here, but I hope this helps those who are still struggling with this and some of the 7-year open Tableau Support Community Threads.

Connect with the Author | Roy Wang

If you’re looking for help modernizing your data platform, business intelligence, or preparing your AI-data strategy and infrastructure, reach out to us at: contact.us@keyrus.com

Logo - Keyrus
New York City

252 West 37th st., Suite 1400 New York, NY 10018

Phone:+1 646 664 4872