heartwood every commit a ring

Refactor scheduler with thread pools and graceful shutdown

5fb81051 by Isaac Bythewood · 24 days ago

modified properties/management/commands/scheduler.py
@@ -1,33 +1,82 @@import loggingimport signalimport threadingimport timeimport queuefrom concurrent.futures import ThreadPoolExecutorfrom django import dbfrom django.core.management.base import BaseCommandfrom django.db.models import Qfrom django.utils import timezonefrom properties.models import Property, Checkfrom properties.models import Check, Propertyq = queue.Queue()q_status = queue.Queue()logger = logging.getLogger(__name__)class Command(BaseCommand):    def clean_checks(self):        """        Clean checks older than 3 days.        """        self.stdout.write("[Scheduler] Cleaning checks older than 3 days...")        Check.objects.filter(created_at__lt=timezone.now() - timezone.timedelta(days=3)).delete()        self.stdout.write("[Scheduler] Cleaned checks older than 3 days.")    # Two pools so slow lighthouse/crawler work can't starve quick HTTP pings.    SLOW_WORKERS = 2    FAST_WORKERS = 2    CYCLE_SECONDS = 30    CLEANUP_INTERVAL_SECONDS = 86400    def __init__(self):        super().__init__()        self._stop = threading.Event()        self._last_cleanup = None    def handle(self, *args, **options):        self.stdout.write("[Scheduler] Starting scheduler...")        signal.signal(signal.SIGTERM, self._on_signal)        signal.signal(signal.SIGINT, self._on_signal)        # Clear any running/queued states left over from a prior crash so        # rows don't sit stuck and block new runs.        Property.objects.filter(crawl_state__in=["queued", "running"]).update(            crawl_state="idle"        )        Property.objects.filter(lighthouse_state__in=["queued", "running"]).update(            lighthouse_state="idle"        )        slow = ThreadPoolExecutor(            max_workers=self.SLOW_WORKERS, thread_name_prefix="slow"        )        fast = ThreadPoolExecutor(            max_workers=self.FAST_WORKERS, thread_name_prefix="fast"        )        try:            while not self._stop.is_set():                try:                    self._enqueue_status(fast)                    self._enqueue_lighthouse(slow)                    self._enqueue_crawler(slow)                    self.reset_wedged_states()                    self._maybe_cleanup()                except Exception:                    logger.exception("[Scheduler] cycle error")                self.stdout.write(                    f"[Scheduler] Sleeping scheduler for {self.CYCLE_SECONDS} seconds..."                )                self._stop.wait(self.CYCLE_SECONDS)        finally:            self.stdout.write("[Scheduler] Stopping scheduler...")            slow.shutdown(wait=False, cancel_futures=True)            fast.shutdown(wait=False, cancel_futures=True)    def _on_signal(self, signum, frame):        self.stdout.write(f"[Scheduler] Received signal {signum}, shutting down...")        self._stop.set()    def reset_wedged_states(self):        """Flip stale running/queued states back to idle.        Runs on startup (catches states left over from a crashed scheduler)        and each cycle (catches threads that overran JOIN_TIMEOUT).        Runs every cycle to catch threads that overran their deadline.        The startup path in handle() also wipes state unconditionally to        cover crashes.        """        now = timezone.now()        crawl_cutoff = now - timezone.timedelta(seconds=900)
@@ -45,177 +94,115 @@ class Command(BaseCommand):        Property.objects.filter(            lighthouse_state__in=["queued", "running"],        ).filter(            Q(lighthouse_started_at__isnull=True) | Q(lighthouse_started_at__lt=lh_cutoff)            Q(lighthouse_started_at__isnull=True)            | Q(lighthouse_started_at__lt=lh_cutoff)        ).update(            lighthouse_state="idle",            last_lighthouse_error="Lighthouse run timed out or was interrupted",        )    def thread_target(self, property_id):        property = Property.objects.get(id=property_id)        self.stdout.write("[Scheduler] Checking status {}".format(property.url))        property.process_check()    def thread_target_lighthouse(self, property_id):        property = Property.objects.get(id=property_id)        self.stdout.write("[Scheduler] Checking lighthouse {}".format(property.url))        property.process_check_lighthouse()    def thread_target_crawler(self, property_id):        property = Property.objects.get(id=property_id)        self.stdout.write("[Scheduler] Checking crawler {}".format(property.url))        property.crawl_site()    def queue_add(self, property_id, property_type):        q.put((property_id, property_type))    def queue_add_status(self, property_id, property_type):        q_status.put((property_id, property_type))    def queue_process(self):        # Cap on join() so a wedged lighthouse/crawler can't freeze the queue        # indefinitely. Must exceed both status.lighthouse.SUBPROCESS_TIMEOUT_SECONDS        # (180s) and crawler.fetcher.CRAWL_DEADLINE_SECONDS (540s) so a normal        # slow run still completes inside the window.        JOIN_TIMEOUT = 900        while True:            if not q.empty():                threads = []                for i in range(2):                    if q.empty():                        break                    q_data = q.get()                    if q_data[1] == "lighthouse":                        t = threading.Thread(target=self.thread_target_lighthouse, args=(q_data[0],))                    elif q_data[1] == "crawler":                        t = threading.Thread(target=self.thread_target_crawler, args=(q_data[0],))                    t.daemon = True                    t.start()                    threads.append(t)                for t in threads:                    t.join(timeout=JOIN_TIMEOUT)                    if t.is_alive():                        self.stdout.write(                            "[Scheduler] Thread still running after {}s, abandoning".format(JOIN_TIMEOUT)                        )                    q.task_done()            time.sleep(1)    def queue_process_status(self):        while True:            if not q_status.empty():                threads = []                for i in range(2):                    if q_status.empty():                        break                    q_data = q_status.get()                    if q_data[1] != "status":                        q_status.task_done()                        continue                    t = threading.Thread(target=self.thread_target, args=(q_data[0],))                    t.daemon = True                    t.start()                    threads.append(t)                for t in threads:                    t.join()                    q_status.task_done()            time.sleep(1)    def queue_check_status(self):    def _maybe_cleanup(self):        now = timezone.now()        due = Property.objects.filter(            Q(last_run_at__isnull=True) | Q(next_run_at__isnull=True) | Q(next_run_at__lte=now)        if (            self._last_cleanup            and (now - self._last_cleanup).total_seconds()            < self.CLEANUP_INTERVAL_SECONDS        ):            return        self.stdout.write("[Scheduler] Cleaning checks older than 3 days...")        Check.objects.filter(            created_at__lt=now - timezone.timedelta(days=3)        ).delete()        self._last_cleanup = now    def _enqueue_status(self, pool):        now = timezone.now()        due = list(            Property.objects.filter(                Q(last_run_at__isnull=True)                | Q(next_run_at__isnull=True)                | Q(next_run_at__lte=now)            )        )        properties = list(due)        for p in properties:        for p in due:            p.next_run_at = p.get_next_run_at()            p.last_run_at = timezone.now()            p.save(update_fields=["next_run_at", "last_run_at"])        properties = [p.id for p in properties]            pool.submit(self._run_status, p.id)        db.connections.close_all()        for p_id in properties:            self.queue_add_status(p_id, "status")    def queue_check_lighthouse(self):    def _enqueue_lighthouse(self, pool):        now = timezone.now()        due = Property.objects.filter(            Q(last_lighthouse_run_at__isnull=True)            | Q(next_lighthouse_run_at__isnull=True)            | Q(next_lighthouse_run_at__lte=now)        ).exclude(lighthouse_state__in=["queued", "running"])        properties = list(due)        for p in properties:        due = list(            Property.objects.filter(                Q(last_lighthouse_run_at__isnull=True)                | Q(next_lighthouse_run_at__isnull=True)                | Q(next_lighthouse_run_at__lte=now)            ).exclude(lighthouse_state__in=["queued", "running"])        )        for p in due:            p.next_lighthouse_run_at = p.get_next_run_at_lighthouse()            p.last_lighthouse_run_at = timezone.now()            p.lighthouse_state = "queued"            p.save(update_fields=[                "next_lighthouse_run_at",                "last_lighthouse_run_at",                "lighthouse_state",            ])        properties = [p.id for p in properties]            p.save(                update_fields=[                    "next_lighthouse_run_at",                    "last_lighthouse_run_at",                    "lighthouse_state",                ]            )            pool.submit(self._run_lighthouse, p.id)        db.connections.close_all()        for p_id in properties:            self.queue_add(p_id, "lighthouse")    def queue_check_crawler(self):    def _enqueue_crawler(self, pool):        now = timezone.now()        due = Property.objects.filter(            Q(last_run_at_crawler__isnull=True)            | Q(next_run_at_crawler__isnull=True)            | Q(next_run_at_crawler__lte=now)        ).exclude(crawl_state__in=["queued", "running"])        properties = list(due)        for p in properties:        due = list(            Property.objects.filter(                Q(last_run_at_crawler__isnull=True)                | Q(next_run_at_crawler__isnull=True)                | Q(next_run_at_crawler__lte=now)            ).exclude(crawl_state__in=["queued", "running"])        )        for p in due:            p.next_run_at_crawler = p.get_next_run_at_crawl()            p.last_run_at_crawler = timezone.now()            p.crawl_state = "queued"            p.save(update_fields=[                "next_run_at_crawler",                "last_run_at_crawler",                "crawl_state",            ])        properties = [p.id for p in properties]            p.save(                update_fields=[                    "next_run_at_crawler",                    "last_run_at_crawler",                    "crawl_state",                ]            )            pool.submit(self._run_crawler, p.id)        db.connections.close_all()        for p_id in properties:            self.queue_add(p_id, "crawler")    def handle(self, *args, **options):        self.stdout.write("[Scheduler] Starting scheduler...")        # Clear any running/queued states left over from a prior crash so        # that rows don't sit stuck and block new runs.        Property.objects.filter(crawl_state__in=["queued", "running"]).update(            crawl_state="idle"        )        Property.objects.filter(lighthouse_state__in=["queued", "running"]).update(            lighthouse_state="idle"        )        # Start queue_process thread        t = threading.Thread(target=self.queue_process)        t.daemon = True        t.start()        # Start queue_process thread        t = threading.Thread(target=self.queue_process_status)        t.daemon = True        t.start()        # Start our loop to check properties every 30 seconds        while True:            self.queue_check_status()            self.queue_check_lighthouse()            self.queue_check_crawler()            self.reset_wedged_states()            self.clean_checks()            self.stdout.write("[Scheduler] Sleeping scheduler for 30 seconds...")            try:                time.sleep(30)            except KeyboardInterrupt:                self.stdout.write("[Scheduler] Stopping scheduler...")                break    def _run_status(self, property_id):        try:            prop = Property.objects.get(id=property_id)            self.stdout.write(f"[Scheduler] Checking status {prop.url}")            prop.process_check()        except Exception:            logger.exception("[Scheduler] status check failed for %s", property_id)        finally:            db.close_old_connections()    def _run_lighthouse(self, property_id):        try:            prop = Property.objects.get(id=property_id)            self.stdout.write(f"[Scheduler] Checking lighthouse {prop.url}")            prop.process_check_lighthouse()        except Exception:            logger.exception("[Scheduler] lighthouse failed for %s", property_id)        finally:            db.close_old_connections()    def _run_crawler(self, property_id):        try:            prop = Property.objects.get(id=property_id)            self.stdout.write(f"[Scheduler] Checking crawler {prop.url}")            prop.crawl_site()        except Exception:            logger.exception("[Scheduler] crawler failed for %s", property_id)        finally:            db.close_old_connections()