modified
properties/models.py
@@ -1,7 +1,7 @@import loggingimport reimport timeimport uuidimport loggingimport requestsfrom django.contrib.auth import get_user_model
@@ -10,15 +10,15 @@ from django.db import models, transactionfrom django.template.loader import render_to_stringfrom django.utils import timezonefrom django.utils.functional import cached_propertyfrom crawler.runner import run_seo_spiderfrom crawler.runner import run_seo_spiderfrom status.lighthouse import ( LighthouseError, fetch_lighthouse_results, parse_lighthouse_results, parse_performance_details,)logger = logging.getLogger(__name__)
@@ -184,23 +184,27 @@ class AlertsMixin: with transaction.atomic(): locked = Property.objects.select_for_update().get(pk=self.pk) if is_currently_up and locked.alert_state == 'down': if is_currently_up and locked.alert_state == "down": self.send_recovery_email() self.send_recovery_discord_message() locked.alert_state = 'up' locked.alert_state = "up" locked.last_alert_sent = timezone.now() locked.save(update_fields=['alert_state', 'last_alert_sent']) locked.save(update_fields=["alert_state", "last_alert_sent"]) self.alert_state = locked.alert_state self.last_alert_sent = locked.last_alert_sent elif not is_currently_up and locked.alert_state == 'up': elif not is_currently_up and locked.alert_state == "up": # Require at least 2 consecutive failures to avoid false positives. checks = self.statuses.order_by("-created_at")[:2] if len(checks) >= 2 and checks[0].status_code != 200 and checks[1].status_code != 200: if ( len(checks) >= 2 and checks[0].status_code != 200 and checks[1].status_code != 200 ): self.send_down_email() self.send_down_discord_message() locked.alert_state = 'down' locked.alert_state = "down" locked.last_alert_sent = timezone.now() locked.save(update_fields=['alert_state', 'last_alert_sent']) locked.save(update_fields=["alert_state", "last_alert_sent"]) self.alert_state = locked.alert_state self.last_alert_sent = locked.last_alert_sent
@@ -274,6 +278,7 @@ class Property(CrawlerMixin, AlertsMixin, SecurityMixin, models.Model): last_crawl_pages_count = models.IntegerField(blank=True, null=True) lighthouse_scores = models.JSONField(blank=True, null=True) lighthouse_details = models.JSONField(blank=True, null=True) last_lighthouse_run_at = models.DateTimeField(blank=True, null=True) last_lighthouse_success_at = models.DateTimeField(blank=True, null=True) last_lighthouse_error = models.TextField(blank=True, null=True)
@@ -289,9 +294,7 @@ class Property(CrawlerMixin, AlertsMixin, SecurityMixin, models.Model): # Alert state tracking last_alert_sent = models.DateTimeField(blank=True, null=True) alert_state = models.CharField( max_length=10, choices=[('up', 'Up'), ('down', 'Down')], default='up' max_length=10, choices=[("up", "Up"), ("down", "Down")], default="up" ) created_at = models.DateTimeField(auto_now_add=True)
@@ -336,7 +339,7 @@ class Property(CrawlerMixin, AlertsMixin, SecurityMixin, models.Model): response_time = response.elapsed.total_seconds() * 1000 status_code = response.status_code headers = response.headers except (requests.exceptions.SSLError): except requests.exceptions.SSLError: response_time = 10000 status_code = 526 headers = {}
@@ -382,6 +385,7 @@ class Property(CrawlerMixin, AlertsMixin, SecurityMixin, models.Model): try: results = fetch_lighthouse_results(self.url) scores = parse_lighthouse_results(results) details = parse_performance_details(results) except LighthouseError as e: logger.warning("Lighthouse failed for %s: %s", self.url, e) Property.objects.filter(pk=self.pk).update(
@@ -401,6 +405,7 @@ class Property(CrawlerMixin, AlertsMixin, SecurityMixin, models.Model): Property.objects.filter(pk=self.pk).update( lighthouse_scores=scores, lighthouse_details=details, last_lighthouse_success_at=timezone.now(), last_lighthouse_error=None, last_lighthouse_duration_ms=int((time.monotonic() - start) * 1000),
modified
status/lighthouse.py
@@ -4,6 +4,7 @@ A wrapper around the lighthouse node CLI.Raises LighthouseError with a descriptive message on failure so callers canlog/persist the reason instead of silently dropping the result."""import jsonimport loggingimport shutil
@@ -11,7 +12,6 @@ import subprocessfrom django.conf import settingslogger = logging.getLogger(__name__)
@@ -59,7 +59,9 @@ def fetch_lighthouse_results(url): env=env, ) except subprocess.TimeoutExpired: raise LighthouseError(f"lighthouse timed out after {SUBPROCESS_TIMEOUT_SECONDS}s") raise LighthouseError( f"lighthouse timed out after {SUBPROCESS_TIMEOUT_SECONDS}s" ) except subprocess.CalledProcessError as e: stderr = (e.stderr or b"").decode("utf-8", errors="replace").strip() raise LighthouseError(f"lighthouse exited {e.returncode}: {stderr[-500:]}")
@@ -88,3 +90,74 @@ def parse_lighthouse_results(results): raise LighthouseError(f"null score(s) returned by lighthouse: {missing}") return {k: round(v * 100) for k, v in scores.items()}def parse_performance_details(results): """ Extract the weighted metrics and top opportunities behind the Performance score. Returns None if the category is missing — callers should treat that as "no breakdown available" rather than an error. """ try: category = results["categories"]["performance"] audits = results["audits"] except KeyError: return None metrics = [] opportunities = [] for ref in category.get("auditRefs", []): audit = audits.get(ref.get("id")) if not audit: continue group = ref.get("group") score = audit.get("score") weight = ref.get("weight", 0) if group == "metrics" and weight > 0: metrics.append( { "id": audit.get("id"), "acronym": ref.get("acronym") or audit.get("id"), "title": audit.get("title"), "display_value": audit.get("displayValue"), "score": score, "weight": weight, } ) continue # Opportunities/diagnostics: skip passing, manual, and not-applicable # audits — we only want actionable findings. mode = audit.get("scoreDisplayMode") if mode in ("manual", "notApplicable", "informative"): continue if score is None or score >= 0.9: continue savings_ms = 0 details = audit.get("details") or {} if isinstance(details, dict): savings_ms = details.get("overallSavingsMs") or 0 opportunities.append( { "id": audit.get("id"), "title": audit.get("title"), "display_value": audit.get("displayValue"), "score": score, "savings_ms": savings_ms, "weight": weight, } ) # Sort metrics by weight desc so the most impactful ones lead. metrics.sort(key=lambda m: m["weight"], reverse=True) # Sort opportunities by estimated savings, then by how badly they failed. opportunities.sort(key=lambda o: (o["savings_ms"], -o["score"]), reverse=True) return { "metrics": metrics, "opportunities": opportunities[:10], }