12.0 KB
raw
use crate::checker;
use crate::crawler;
use crate::db::now_ms;
use crate::lighthouse;
use crate::models::PropertyRow;
use crate::Config;
use sqlx::SqlitePool;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Semaphore;
use tokio::task::JoinHandle;
const CYCLE_SECS: u64 = 30;
// Two pools so slow lighthouse/crawler work can't starve quick HTTP pings.
const FAST_PERMITS: usize = 2;
const SLOW_PERMITS: usize = 2;
const CLEANUP_INTERVAL_SECS: i64 = 86_400;
const CHECK_RETENTION_DAYS: i64 = 3;
const CRAWL_WEDGE_SECS: i64 = 900;
const LH_WEDGE_SECS: i64 = 300;
const CRAWL_INTERVAL_DAYS: i64 = 7;
const LH_INTERVAL_DAYS: i64 = 1;
/// Wipe queued/running rows on startup. Anything in those states is
/// leftover from a prior crash; tasks didn't survive the restart so the
/// rows must not block new work.
pub async fn reset_states_on_boot(pool: &SqlitePool) -> anyhow::Result<()> {
sqlx::query("UPDATE properties SET crawl_state = 'idle' WHERE crawl_state IN ('queued', 'running')")
.execute(pool)
.await?;
sqlx::query(
"UPDATE properties SET lighthouse_state = 'idle' WHERE lighthouse_state IN ('queued', 'running')",
)
.execute(pool)
.await?;
Ok(())
}
pub fn spawn(pool: SqlitePool, config: Arc<Config>) -> JoinHandle<()> {
let fast = Arc::new(Semaphore::new(FAST_PERMITS));
let slow = Arc::new(Semaphore::new(SLOW_PERMITS));
tokio::spawn(async move {
let mut last_cleanup: Option<i64> = None;
loop {
if let Err(e) = enqueue_status(&pool, &config, &fast).await {
tracing::warn!("[scheduler] enqueue_status: {e}");
}
if let Err(e) = enqueue_lighthouse(&pool, &config, &slow).await {
tracing::warn!("[scheduler] enqueue_lighthouse: {e}");
}
if let Err(e) = enqueue_crawler(&pool, &config, &slow).await {
tracing::warn!("[scheduler] enqueue_crawler: {e}");
}
if let Err(e) = reset_wedged_states(&pool).await {
tracing::warn!("[scheduler] reset_wedged_states: {e}");
}
if let Err(e) = maybe_cleanup(&pool, &mut last_cleanup).await {
tracing::warn!("[scheduler] cleanup: {e}");
}
tokio::time::sleep(Duration::from_secs(CYCLE_SECS)).await;
}
})
}
async fn reset_wedged_states(pool: &SqlitePool) -> sqlx::Result<()> {
let now = now_ms();
let crawl_cutoff = now - CRAWL_WEDGE_SECS * 1000;
let lh_cutoff = now - LH_WEDGE_SECS * 1000;
sqlx::query(
"UPDATE properties SET crawl_state = 'idle', last_crawl_error = 'Crawl timed out or was interrupted' \
WHERE crawl_state = 'running' AND crawl_started_at IS NOT NULL AND crawl_started_at < ?",
)
.bind(crawl_cutoff)
.execute(pool)
.await?;
sqlx::query(
"UPDATE properties SET lighthouse_state = 'idle', last_lighthouse_error = 'Lighthouse run timed out or was interrupted' \
WHERE lighthouse_state = 'running' AND lighthouse_started_at IS NOT NULL AND lighthouse_started_at < ?",
)
.bind(lh_cutoff)
.execute(pool)
.await?;
Ok(())
}
async fn maybe_cleanup(pool: &SqlitePool, last: &mut Option<i64>) -> sqlx::Result<()> {
let now = now_ms();
if let Some(t) = *last {
if (now - t) / 1000 < CLEANUP_INTERVAL_SECS {
return Ok(());
}
}
let cutoff = now - CHECK_RETENTION_DAYS * 24 * 3600 * 1000;
let res = sqlx::query("DELETE FROM checks WHERE created_at < ?")
.bind(cutoff)
.execute(pool)
.await?;
tracing::info!("[scheduler] cleaned {} checks older than {}d", res.rows_affected(), CHECK_RETENTION_DAYS);
*last = Some(now);
Ok(())
}
async fn enqueue_status(
pool: &SqlitePool,
config: &Arc<Config>,
sem: &Arc<Semaphore>,
) -> sqlx::Result<()> {
let now = now_ms();
let due: Vec<PropertyRow> = sqlx::query_as(
"SELECT * FROM properties \
WHERE last_run_at IS NULL OR next_run_at IS NULL OR next_run_at <= ?",
)
.bind(now)
.fetch_all(pool)
.await?;
for prop in due {
let next = checker::next_3min_boundary();
sqlx::query(
"UPDATE properties SET next_run_at = ?, last_run_at = ?, updated_at = ? WHERE id = ?",
)
.bind(next)
.bind(now)
.bind(now)
.bind(prop.id.clone())
.execute(pool)
.await?;
let pool = pool.clone();
let sem = sem.clone();
let config = config.clone();
tokio::spawn(async move {
let _permit = match sem.acquire_owned().await {
Ok(p) => p,
Err(_) => return,
};
tracing::info!("[scheduler] checking status {}", prop.url);
if let Err(e) = checker::process_check(&pool, &config, &prop).await {
tracing::warn!("[scheduler] status check failed for {}: {e:#}", prop.url);
}
});
}
Ok(())
}
async fn enqueue_lighthouse(
pool: &SqlitePool,
config: &Arc<Config>,
sem: &Arc<Semaphore>,
) -> sqlx::Result<()> {
let now = now_ms();
let due: Vec<PropertyRow> = sqlx::query_as(
"SELECT * FROM properties \
WHERE (last_lighthouse_run_at IS NULL OR next_lighthouse_run_at IS NULL OR next_lighthouse_run_at <= ?) \
AND lighthouse_state NOT IN ('queued', 'running')",
)
.bind(now)
.fetch_all(pool)
.await?;
for prop in due {
let next = now + LH_INTERVAL_DAYS * 24 * 3600 * 1000;
sqlx::query(
"UPDATE properties SET next_lighthouse_run_at = ?, last_lighthouse_run_at = ?, lighthouse_state = 'queued', updated_at = ? \
WHERE id = ?",
)
.bind(next)
.bind(now)
.bind(now)
.bind(prop.id.clone())
.execute(pool)
.await?;
let pool = pool.clone();
let sem = sem.clone();
let config = config.clone();
tokio::spawn(async move {
let _permit = match sem.acquire_owned().await {
Ok(p) => p,
Err(_) => return,
};
tracing::info!("[scheduler] lighthouse {}", prop.url);
run_lighthouse_for(&pool, &config, &prop).await;
});
}
Ok(())
}
async fn run_lighthouse_for(pool: &SqlitePool, config: &Arc<Config>, prop: &PropertyRow) {
let now = now_ms();
let _ = sqlx::query(
"UPDATE properties SET lighthouse_state = 'running', lighthouse_started_at = ?, updated_at = ? WHERE id = ?",
)
.bind(now)
.bind(now)
.bind(prop.id.clone())
.execute(pool)
.await;
let started = std::time::Instant::now();
match lighthouse::fetch(&config.root, &prop.url).await {
Ok(results) => {
match lighthouse::parse_scores(&results) {
Ok(scores) => {
let details = lighthouse::parse_details(&results);
let scores_json = serde_json::to_string(&scores).unwrap_or_else(|_| "{}".into());
let details_json = details
.as_ref()
.and_then(|d| serde_json::to_string(d).ok())
.unwrap_or_else(|| "null".into());
let dur = started.elapsed().as_millis() as i64;
let _ = sqlx::query(
"UPDATE properties SET \
lighthouse_scores = ?, lighthouse_details = ?, \
last_lighthouse_success_at = ?, last_lighthouse_error = NULL, \
last_lighthouse_duration_ms = ?, lighthouse_state = 'idle', updated_at = ? \
WHERE id = ?",
)
.bind(scores_json)
.bind(details_json)
.bind(now_ms())
.bind(dur)
.bind(now_ms())
.bind(prop.id.clone())
.execute(pool)
.await;
}
Err(e) => store_lh_error(pool, prop, &format!("{e}"), started).await,
}
}
Err(e) => store_lh_error(pool, prop, &format!("{e}"), started).await,
}
}
async fn store_lh_error(
pool: &SqlitePool,
prop: &PropertyRow,
msg: &str,
started: std::time::Instant,
) {
tracing::warn!("[scheduler] lighthouse failed for {}: {msg}", prop.url);
let dur = started.elapsed().as_millis() as i64;
let _ = sqlx::query(
"UPDATE properties SET lighthouse_state = 'idle', last_lighthouse_error = ?, last_lighthouse_duration_ms = ?, updated_at = ? \
WHERE id = ?",
)
.bind(msg)
.bind(dur)
.bind(now_ms())
.bind(prop.id.clone())
.execute(pool)
.await;
}
async fn enqueue_crawler(
pool: &SqlitePool,
_config: &Arc<Config>,
sem: &Arc<Semaphore>,
) -> sqlx::Result<()> {
let now = now_ms();
let due: Vec<PropertyRow> = sqlx::query_as(
"SELECT * FROM properties \
WHERE (last_run_at_crawler IS NULL OR next_run_at_crawler IS NULL OR next_run_at_crawler <= ?) \
AND crawl_state NOT IN ('queued', 'running')",
)
.bind(now)
.fetch_all(pool)
.await?;
for prop in due {
let next = now + CRAWL_INTERVAL_DAYS * 24 * 3600 * 1000;
sqlx::query(
"UPDATE properties SET next_run_at_crawler = ?, last_run_at_crawler = ?, crawl_state = 'queued', updated_at = ? \
WHERE id = ?",
)
.bind(next)
.bind(now)
.bind(now)
.bind(prop.id.clone())
.execute(pool)
.await?;
let pool = pool.clone();
let sem = sem.clone();
tokio::spawn(async move {
let _permit = match sem.acquire_owned().await {
Ok(p) => p,
Err(_) => return,
};
tracing::info!("[scheduler] crawler {}", prop.url);
run_crawler_for(&pool, &prop).await;
});
}
Ok(())
}
async fn run_crawler_for(pool: &SqlitePool, prop: &PropertyRow) {
let now = now_ms();
let _ = sqlx::query(
"UPDATE properties SET crawl_state = 'running', crawl_started_at = ?, last_crawl_pages_count = 0, updated_at = ? WHERE id = ?",
)
.bind(now)
.bind(now)
.bind(prop.id.clone())
.execute(pool)
.await;
let pool_for_progress = pool.clone();
let id_for_progress = prop.id.clone();
let progress_cb = move |pages: usize| {
let pool = pool_for_progress.clone();
let id = id_for_progress.clone();
tokio::spawn(async move {
let _ = sqlx::query(
"UPDATE properties SET last_crawl_pages_count = ? WHERE id = ?",
)
.bind(pages as i64)
.bind(id)
.execute(&pool)
.await;
});
};
let started = std::time::Instant::now();
let outcome = crawler::run_seo_spider(&prop.url, progress_cb).await;
let duration_ms = started.elapsed().as_millis() as i64;
match outcome {
Ok(insights) => {
let json = serde_json::to_string(&insights).unwrap_or_else(|_| "[]".into());
let _ = sqlx::query(
"UPDATE properties SET \
crawler_insights = ?, crawl_state = 'idle', \
last_crawl_success_at = ?, last_crawl_error = NULL, \
last_crawl_duration_ms = ?, updated_at = ? \
WHERE id = ?",
)
.bind(json)
.bind(now_ms())
.bind(duration_ms)
.bind(now_ms())
.bind(prop.id.clone())
.execute(pool)
.await;
}
Err(e) => {
tracing::warn!("[scheduler] crawl failed for {}: {e:#}", prop.url);
let _ = sqlx::query(
"UPDATE properties SET crawl_state = 'idle', last_crawl_error = ?, last_crawl_duration_ms = ?, updated_at = ? WHERE id = ?",
)
.bind(format!("{e:#}"))
.bind(duration_ms)
.bind(now_ms())
.bind(prop.id.clone())
.execute(pool)
.await;
}
}
}