I am trying to use a webhook to send data from hubspot when a certain piece of data is updated on a deal. I have custom code that is getting the access token (this vendor's token only lasts an hour so it requires frequent refreshing - and this is not an oAuth endpoint). If I store the secret manually and use it - it works perfectly - but once the token expires I have to manually refresh. Is there a way I can get the access token from my custom code and send that? I am hoping there is something I am missing because tokens frequently need refreshing. Any help would be greatly appreciated! Thank you!
It might be possible to handle token expiration by implementing a caching mechanism in your code. One approach could be to store the token in memory or a database along with its expiration time, so your webhook always retrieves a valid token before sending data.
You might try creating a function that checks whether the token is still valid before making a request. If it’s expired, the function could automatically request a new token from the vendor’s API and update the stored value. Something like this might work:
import time
import requests
TOKEN_CACHE = {
"access_token": None,
"expires_at": 0 # Timestamp when the token expires
}
VENDOR_API_URL = "https://yourvendor.com/api/token" # Replace with actual token endpoint
VENDOR_API_SECRET = "your_secret_key" # Store securely
def get_access_token():
current_time = time.time()
# Maybe check if the token is still valid
if TOKEN_CACHE["access_token"] and TOKEN_CACHE["expires_at"] > current_time:
return TOKEN_CACHE["access_token"]
# If expired, possibly request a new token
response = requests.post(VENDOR_API_URL, json={"secret": VENDOR_API_SECRET})
if response.status_code == 200:
token_data = response.json()
TOKEN_CACHE["access_token"] = token_data.get("access_token")
TOKEN_CACHE["expires_at"] = current_time + 3600 # Assuming 1-hour validity
return token_data.get("access_token")
else:
print("Could not retrieve token, might need to debug further.")
return None
For the webhook request, you could modify it so that it always fetches the latest token before sending data:
def send_webhook_data(payload):
token = get_access_token()
if not token:
print("No valid token available.")
return
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
webhook_url = "https://yourwebhook.com/receive-data" # Replace with actual webhook URL
response = requests.post(webhook_url, json=payload, headers=headers)
if response.status_code == 401: # Maybe try refreshing the token
TOKEN_CACHE["access_token"] = None # Force token refresh
token = get_access_token()
if not token:
print("Still no valid token, request might fail.")
return
headers["Authorization"] = f"Bearer {token}"
response = requests.post(webhook_url, json=payload, headers=headers)
return response.status_code, response.json()
This might help by ensuring that your webhook always has a fresh token without requiring manual intervention. However, if the vendor’s token mechanism is more complex, this approach might not be sufficient, and additional error handling could be necessary.
It might be possible to handle token expiration by implementing a caching mechanism in your code. One approach could be to store the token in memory or a database along with its expiration time, so your webhook always retrieves a valid token before sending data.
You might try creating a function that checks whether the token is still valid before making a request. If it’s expired, the function could automatically request a new token from the vendor’s API and update the stored value. Something like this might work:
import time
import requests
TOKEN_CACHE = {
"access_token": None,
"expires_at": 0 # Timestamp when the token expires
}
VENDOR_API_URL = "https://yourvendor.com/api/token" # Replace with actual token endpoint
VENDOR_API_SECRET = "your_secret_key" # Store securely
def get_access_token():
current_time = time.time()
# Maybe check if the token is still valid
if TOKEN_CACHE["access_token"] and TOKEN_CACHE["expires_at"] > current_time:
return TOKEN_CACHE["access_token"]
# If expired, possibly request a new token
response = requests.post(VENDOR_API_URL, json={"secret": VENDOR_API_SECRET})
if response.status_code == 200:
token_data = response.json()
TOKEN_CACHE["access_token"] = token_data.get("access_token")
TOKEN_CACHE["expires_at"] = current_time + 3600 # Assuming 1-hour validity
return token_data.get("access_token")
else:
print("Could not retrieve token, might need to debug further.")
return None
For the webhook request, you could modify it so that it always fetches the latest token before sending data:
def send_webhook_data(payload):
token = get_access_token()
if not token:
print("No valid token available.")
return
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
webhook_url = "https://yourwebhook.com/receive-data" # Replace with actual webhook URL
response = requests.post(webhook_url, json=payload, headers=headers)
if response.status_code == 401: # Maybe try refreshing the token
TOKEN_CACHE["access_token"] = None # Force token refresh
token = get_access_token()
if not token:
print("Still no valid token, request might fail.")
return
headers["Authorization"] = f"Bearer {token}"
response = requests.post(webhook_url, json=payload, headers=headers)
return response.status_code, response.json()
This might help by ensuring that your webhook always has a fresh token without requiring manual intervention. However, if the vendor’s token mechanism is more complex, this approach might not be sufficient, and additional error handling could be necessary.