Status Checking Affiliate Links

Goal

I'd like to make an application that would check in case any of the pages that make us money are down.

For non-obvious reasons, we'd never had a need for ping checks for any of our affiliate pages before. This sounds quite dumb, so let's think of a good way to do this.

Requirements

At a high level the resulting app should tell me:


Initial Steps

I've seen affiliate links use 2-3 levels of redirects before, so our code will probably need to travel through all of those to get to the final page. Let's see if we can witness this happening

BurpSuite

Proxy: On
Intercept: On
Pasted image 20240404212246.png

The first 302 looks good, we can go through that with Python's requests library and use allow-redirects=true if needed. But that 200 looks odd. I haven't seen a page redirecting without being 3xx-ed before...

Looking at the response headers, I couldn't find any info on how this magic is done - It's just a bunch of Set-Cookies.
Pasted image 20240404213618.png
... that is, until I scrolled down the HTML response a bit more:
Pasted image 20240404213733.png

Hidden at the bottom of the page was this code:

<HTML><head></head><body>

	
		<script LANGUAGE="JavaScript1.2">
		window.location.replace('https:\/\/www.northwestregisteredagent.com\/northwest-truic-llc?sscid=41k8_588dr ')
		</script>
	

</body></html>

This is the first time I've heard of JavaScript being used for redirection. The requests library is probably not going to cut it since the JS needs to be executed[1], and we want to mimic the user as much as possible.

Python + Selenium

We don't want to render the browser, so let's go for a headless setup

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager

def get_final_url(url):
    # Set up Chrome options
    chrome_options = Options()
    chrome_options.add_argument("--headless")  # Run in headless mode

    # Set up the driver
    service = Service(ChromeDriverManager().install())
    driver = webdriver.Chrome(service=service, options=chrome_options)

    try:
        driver.get(url)  # Load the page
        final_url = driver.current_url  # Get the final URL after redirection
        print(f"The final destination URL is: {final_url}")
    finally:
        driver.quit()  # Make sure to quit the driver to free resources

user_url = input("Enter the URL to check: ")
get_final_url(user_url)

--> Looking good:
Pasted image 20240404215327.png

Let's clean this up though and hide the 3rd party cookie warning:

    chrome_options.add_argument("--headless")  # Run in headless mode
    chrome_options.add_argument("--log-level=3")  # Suppress logs

    service = Service(ChromeDriverManager().install(), log_path='NUL' if platform.system() == 'Windows' else '/dev/null')
    service.creationflags = 0x08000000  # This is for Windows to prevent the console window from appearing

Much better:
Pasted image 20240404215541.png

Now let's add some code to check the runtime duration from start to end of the 200 check

import time
# More code...

    try:
        start_time = time.time()
        driver.get(url)
        end_time = time.time()
        final_url = driver.current_url 

        duration = end_time - start_time
        print(f"The final destination URL is: {final_url}")
        print(f"##### Total time taken to load the final URL: {duration:.2f} seconds. #####")
    finally:
        print(f"Quitting the driver...")
        driver.quit()  # Make sure to quit the driver to free resources

# The rest...

Button Check

Alrighty, now that we can confirm the final URL is reachable, let's make sure the actual page has functional buttons users can click on to get to the vendor. It may look like this in normal circumstances:
Pasted image 20240404221249.png
Let's check to see if the GET STARTED button is available

        # Check for the "Get Started" button
        buttons = driver.find_elements(By.XPATH, "//button[contains(text(), 'Get Started')]")
        if buttons:
            print("Get Started Button found.")
        else:
            print("Get Started Button not found.")

Python -> Azure Function

I'd prefer not having to run this .py as a Scheduled Task, so let's turn it into a Timed Azure Function instead and put the URL inside a .env

def main(mytimer: func.TimerRequest) -> None:
    utc_timestamp = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat()
    logging.info('Python timer trigger function ran at %s', utc_timestamp)

    url = os.environ("URL_TO_CHECK")

    # Set up Chrome options
    chrome_options = Options()
    chrome_options.add_argument("--headless")
    chrome_options.add_argument("--log-level=3")

    # Set up the driver
    log_path = 'NUL' if platform.system() == 'Windows' else '/dev/null'
    service = Service(ChromeDriverManager().install(), log_path=log_path)
    if platform.system() == 'Windows':
        service.creationflags = 0x08000000
    
    driver = webdriver.Chrome(service=service, options=chrome_options)

    try:
        start_time = time.time()
        driver.get(url)
        end_time = time.time()

        final_url = driver.current_url
        duration = end_time - start_time
        logging.info(f"Final URL: {final_url} - Duration: {duration} seconds")

        if duration > 15:
            logging.error(f"Final URL took more than 15 seconds to load. Duration: {duration} seconds\nFinal URL: {final_url}")

        buttons = driver.find_elements(By.XPATH, "//button[contains(text(), 'Get Started')]")
        if not buttons:
            logging.error("Get Started Button not found.")

    finally:
        print("Quitting the driver...")
        driver.quit()

Send email notification

We might as well use sendgrid to notify when the app detects high latency or if the link has issues.

from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail

def send_email(subject, body):
    message = Mail(
        from_email='[email protected]',
        to_emails='[email protected]',
        subject=subject,
        html_content=body)
    try:
        sg = os.environ("SENDGRID_API_KEY")
        response = sg.send(message)
        print(response.status_code)
        print(response.body)
        print(response.headers)
    except Exception as e:
        print(e.message)

# The rest of the code...
        if duration > 15:
            logging.error(f"Final URL took more than 15 seconds to load. Duration: {duration} seconds\nFinal URL: {final_url}")
            send_email(f"Affcheck: High Latency", f"Final URL: {final_url} took too long")

# ...
# If unsuccessful
        if not buttons:
            logging.error("xxxxxx Get Started Button not found. xxxxxx")
            send_email("Affcheck Test Failed: Button Not Found", f"Final URL: {final_url} button not found. Duration: {duration} seconds")

Full code

import datetime
import logging
import os
import azure.functions as func
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager
import platform
import time
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail

def send_email(subject, body):
    message = Mail(
        from_email='[email protected]',
        to_emails='[email protected]',
        subject=subject,
        html_content=body)
    try:
        sg = SendGridAPIClient(os.environ.get("SENDGRID_API_KEY"))
        response = sg.send(message)
        print(response.status_code)
        print(response.body)
        print(response.headers)
    except Exception as e:
        print(str(e))

def main(mytimer: func.TimerRequest) -> None:
    utc_timestamp = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat()
    logging.info('Python timer trigger function ran at %s', utc_timestamp)

    test_cases = [
        {"url": os.environ.get("Afflink_1"), "button_text": "Get Started"},
        {"url": os.environ.get("Afflink_2"), "button_text": "Form my LLC for free"}
    ]

    # Set up Chrome options
    chrome_options = Options()
    chrome_options.add_argument("--headless")
    chrome_options.add_argument("--log-level=3")

    # Set up the driver
    log_path = 'NUL' if platform.system() == 'Windows' else '/dev/null'
    service = Service(ChromeDriverManager().install(), log_path=log_path)
    if platform.system() == 'Windows':
        service.creationflags = 0x08000000
    
    driver = webdriver.Chrome(service=service, options=chrome_options)

    try:
        for test_case in test_cases:
            url = test_case["url"]
            button_text = test_case["button_text"]

            logging.info(f"######## Starting the test for {url}... ########")
            start_time = time.time()
            driver.get(url)
            end_time = time.time()

            final_url = driver.current_url
            duration = end_time - start_time
            logging.info(f"Final URL: {final_url} - Duration: {duration} seconds")

            if duration > 15:
                logging.error(f"Final URL took more than 15 seconds to load. Duration: {duration} seconds\nFinal URL: {final_url}")
                send_email(f"Affcheck: High Latency for {url}", f"Final URL: {final_url} took too long")

            buttons = driver.find_elements(By.XPATH, f"//button[contains(text(), '{button_text}')]|//a[contains(text(), '{button_text}')]")

            if buttons:
                logging.info(f"oooooo Button '{button_text}' found at {url} oooooo")
            else:
                logging.error(f"xxxxxx Button '{button_text}' not found at {url} xxxxxx")
                send_email(f"Affcheck Test Failed: Button '{button_text}' Not Found at {url}", f"Button '{button_text}' not found at {final_url}\n\t Duration: {duration} seconds")

    finally:
        logging.info("###### Quitting the driver... ######")
        driver.quit()

Results look pretty good
Pasted image 20240405200824.png


  1. Technically it can just look for the string inside window.location.replace() function ↩ī¸Ž