heartwood every commit a ring

Harden scheduler, alert state machine, and crawl pipeline

f9d2bf46 by Isaac Bythewood · 25 days ago

Harden scheduler, alert state machine, and crawl pipeline

- Fix scheduler queue_process_status: guard against empty queue
  blocking on get() and unbound thread var when type mismatches
- Push Property due-check filtering to the database for status,
  lighthouse, and crawler queues (was scanning all rows in Python)
- Wrap send_alerts in transaction.atomic + select_for_update on the
  property to prevent concurrent checks double-firing transitions
- Split HTTP check timeout into (3, 10) connect/read so redirect
  loops can't burn the full window
- Add timeout=5 and try/except around Discord webhook posts and
  email.send so an unresponsive endpoint can't wedge a check
- Precompute duplicate sets in parse_crawl so per-page uniqueness
  checks are O(1) instead of O(n) per field
- Add composite index on Check(property, -created_at) for the
  common "latest checks for property" query
- Bound crawler subprocess at 600s and log on TimeoutExpired
modified crawler/runner.py
@@ -1,9 +1,17 @@import loggingimport subprocessimport osfrom django.conf import settingslogger = logging.getLogger(__name__)# Cap a single SEO crawl. Spiders that wedge on a slow site otherwise hold a# scheduler thread forever.SUBPROCESS_TIMEOUT_SECONDS = 600def run_seo_spider(url):    """    Run the SEO spider on the given URL.
@@ -23,16 +31,22 @@ def run_seo_spider(url):        os.remove(filename)    # use the jsonlines format to store the results    subprocess.run([        'uv',        'run',        'scrapy',        'crawl',        'seo_spider',        '-a',        'url=' + url,        '-t',        'jsonlines',        '-o',        filename,    ])    try:        subprocess.run(            [                'uv',                'run',                'scrapy',                'crawl',                'seo_spider',                '-a',                'url=' + url,                '-t',                'jsonlines',                '-o',                filename,            ],            timeout=SUBPROCESS_TIMEOUT_SECONDS,        )    except subprocess.TimeoutExpired:        logger.warning("SEO spider timed out after %ss for %s", SUBPROCESS_TIMEOUT_SECONDS, url)
modified properties/management/commands/scheduler.py
@@ -4,6 +4,7 @@ import queuefrom django import dbfrom django.core.management.base import BaseCommandfrom django.db.models import Qfrom django.utils import timezonefrom properties.models import Property, Check
@@ -76,9 +77,13 @@ class Command(BaseCommand):            if not q_status.empty():                threads = []                for i in range(2):                    if q_status.empty():                        break                    q_data = q_status.get()                    if q_data[1] == "status":                        t = threading.Thread(target=self.thread_target, args=(q_data[0],))                    if q_data[1] != "status":                        q_status.task_done()                        continue                    t = threading.Thread(target=self.thread_target, args=(q_data[0],))                    t.daemon = True                    t.start()                    threads.append(t)
@@ -88,7 +93,11 @@ class Command(BaseCommand):            time.sleep(1)    def queue_check_status(self):        properties = [p for p in Property.objects.all() if p.should_check()]        now = timezone.now()        due = Property.objects.filter(            Q(last_run_at__isnull=True) | Q(next_run_at__isnull=True) | Q(next_run_at__lte=now)        )        properties = list(due)        for p in properties:            p.next_run_at = p.get_next_run_at()            p.last_run_at = timezone.now()
@@ -100,7 +109,13 @@ class Command(BaseCommand):            self.queue_add_status(p_id, "status")    def queue_check_lighthouse(self):        properties = [p for p in Property.objects.all() if p.should_check_lighthouse()]        now = timezone.now()        due = Property.objects.filter(            Q(last_lighthouse_run_at__isnull=True)            | Q(next_lighthouse_run_at__isnull=True)            | Q(next_lighthouse_run_at__lte=now)        )        properties = list(due)        for p in properties:            p.next_lighthouse_run_at = p.get_next_run_at_lighthouse()            p.last_lighthouse_run_at = timezone.now()
@@ -112,7 +127,13 @@ class Command(BaseCommand):            self.queue_add(p_id, "lighthouse")    def queue_check_crawler(self):        properties = [p for p in Property.objects.all() if p.should_check_crawl()]        now = timezone.now()        due = Property.objects.filter(            Q(last_run_at_crawler__isnull=True)            | Q(next_run_at_crawler__isnull=True)            | Q(next_run_at_crawler__lte=now)        )        properties = list(due)        for p in properties:            p.next_run_at_crawler = p.get_next_run_at_crawl()            p.last_run_at_crawler = timezone.now()
added properties/migrations/0009_check_properties__propert_b3e3ac_idx.py
@@ -0,0 +1,20 @@# Generated by Django 6.0.4 on 2026-04-16 23:01from django.db import migrations, modelsclass Migration(migrations.Migration):    dependencies = [        ("properties", "0008_lighthouse_error_tracking"),    ]    operations = [        migrations.AddIndex(            model_name="check",            index=models.Index(                fields=["property", "-created_at"],                name="properties__propert_b3e3ac_idx",            ),        ),    ]
modified properties/models.py
@@ -7,7 +7,7 @@ import loggingimport requestsfrom django.contrib.auth import get_user_modelfrom django.core.mail import EmailMessagefrom django.db import modelsfrom django.db import models, transactionfrom django.template.loader import render_to_stringfrom django.utils import timezonefrom django.utils.functional import cached_property
@@ -119,7 +119,10 @@ class AlertsMixin:        to_emails = [self.user.email]        email = EmailMessage(subject, message, from_email, to_emails)        email.content_subtype = "html"        email.send()        try:            email.send()        except Exception:            logger.exception("Failed to send down email for %s", self.url)    def send_recovery_email(self):        subject = f"Status: {self.name} is back up!"
@@ -128,7 +131,10 @@ class AlertsMixin:        to_emails = [self.user.email]        email = EmailMessage(subject, message, from_email, to_emails)        email.content_subtype = "html"        email.send()        try:            email.send()        except Exception:            logger.exception("Failed to send recovery email for %s", self.url)    def send_down_discord_message(self):        if self.user.discord_webhook_url:
@@ -143,7 +149,10 @@ class AlertsMixin:                    }                ],            }            requests.post(self.user.discord_webhook_url, json=payload)            try:                requests.post(self.user.discord_webhook_url, json=payload, timeout=5)            except requests.RequestException:                logger.exception("Discord down webhook failed for %s", self.url)    def send_recovery_discord_message(self):        if self.user.discord_webhook_url:
@@ -158,7 +167,10 @@ class AlertsMixin:                    }                ],            }            requests.post(self.user.discord_webhook_url, json=payload)            try:                requests.post(self.user.discord_webhook_url, json=payload, timeout=5)            except requests.RequestException:                logger.exception("Discord recovery webhook failed for %s", self.url)    def send_alerts(self, current_status_code):        """
@@ -169,24 +181,30 @@ class AlertsMixin:        """        is_currently_up = current_status_code == 200        # Determine if we need to send an alert based on state change        if is_currently_up and self.alert_state == 'down':            # Site recovered: was down, now up            self.send_recovery_email()            self.send_recovery_discord_message()            self.alert_state = 'up'            self.last_alert_sent = timezone.now()            self.save(update_fields=['alert_state', 'last_alert_sent'])        elif not is_currently_up and self.alert_state == 'up':            # Site went down: was up, now down            # Only send if we have at least 2 consecutive failures to avoid false positives            checks = self.statuses.order_by("-created_at")[:2]            if len(checks) >= 2 and checks[0].status_code != 200 and checks[1].status_code != 200:                self.send_down_email()                self.send_down_discord_message()                self.alert_state = 'down'                self.last_alert_sent = timezone.now()                self.save(update_fields=['alert_state', 'last_alert_sent'])        # Lock the property row so concurrent checks can't both observe the        # same alert_state and double-fire transitions.        with transaction.atomic():            locked = Property.objects.select_for_update().get(pk=self.pk)            if is_currently_up and locked.alert_state == 'down':                self.send_recovery_email()                self.send_recovery_discord_message()                locked.alert_state = 'up'                locked.last_alert_sent = timezone.now()                locked.save(update_fields=['alert_state', 'last_alert_sent'])                self.alert_state = locked.alert_state                self.last_alert_sent = locked.last_alert_sent            elif not is_currently_up and locked.alert_state == 'up':                # Require at least 2 consecutive failures to avoid false positives.                checks = self.statuses.order_by("-created_at")[:2]                if len(checks) >= 2 and checks[0].status_code != 200 and checks[1].status_code != 200:                    self.send_down_email()                    self.send_down_discord_message()                    locked.alert_state = 'down'                    locked.last_alert_sent = timezone.now()                    locked.save(update_fields=['alert_state', 'last_alert_sent'])                    self.alert_state = locked.alert_state                    self.last_alert_sent = locked.last_alert_sentclass CrawlerMixin:
@@ -230,13 +248,15 @@ class CrawlerMixin:            return True        return self.next_run_at_crawler <= now    def parse_page(self, page):    def parse_page(self, page, duplicates=None):        insights = []        # Make sure the content type is text/html else skip        if "text/html" not in page.get("content_type", ""):            return insights        duplicates = duplicates or {"title": set(), "description": set(), "h1": set()}        # Make sure all pages have a title        if page['title'] == '':            logger.warning(f"Page {page['url']} has no title")
@@ -257,7 +277,7 @@ class CrawlerMixin:            })        # Make sure pages have a unique title        if page['title'] in [p['title'] for p in self.get_crawl_output if p['url'] != page['url']]:        if page['title'] in duplicates['title']:            logger.warning(f"Page {page['url']} has duplicate title")            insights.append({                'url': page['url'],
@@ -286,7 +306,7 @@ class CrawlerMixin:            })        # Make sure pages have a unique description        if page['description'] in [p['description'] for p in self.get_crawl_output if p['url'] != page['url']]:        if page['description'] in duplicates['description']:            logger.warning(f"Page {page['url']} has duplicate description")            insights.append({                'url': page['url'],
@@ -315,7 +335,7 @@ class CrawlerMixin:            })        # Make sure pages have a unique h1        if page['h1'] in [p['h1'] for p in self.get_crawl_output if p['url'] != page['url']]:        if page['h1'] in duplicates['h1']:            logger.warning(f"Page {page['url']} has duplicate h1")            insights.append({                'url': page['url'],
@@ -336,9 +356,21 @@ class CrawlerMixin:        return insights    def parse_crawl(self):        # Pre-compute the set of values that appear on more than one page so the        # per-page uniqueness check is O(1) instead of scanning the full crawl.        duplicates = {"title": set(), "description": set(), "h1": set()}        for field in duplicates:            seen = set()            for p in self.get_crawl_output:                value = p.get(field, "")                if value in seen:                    duplicates[field].add(value)                else:                    seen.add(value)        insights = []        for page in self.get_crawl_output:            insights.extend(self.parse_page(page))            insights.extend(self.parse_page(page, duplicates=duplicates))        self.crawler_insights = insights        self.save(update_fields=['crawler_insights'])
@@ -414,7 +446,7 @@ class Property(CrawlerMixin, AlertsMixin, SecurityMixin, models.Model):            headers = {                "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.5005.115 Safari/537.36 Status/1.0.0"            }            response = requests.get(self.url, timeout=10, headers=headers)            response = requests.get(self.url, timeout=(3, 10), headers=headers)            response_time = response.elapsed.total_seconds() * 1000            status_code = response.status_code            headers = response.headers
@@ -536,6 +568,7 @@ class Check(models.Model):        verbose_name_plural = "Checks"        indexes = [            models.Index(fields=["created_at"]),            models.Index(fields=["property", "-created_at"]),        ]        get_latest_by = "created_at"