use crate::checker;
use crate::crawler;
use crate::db::now_ms;
use crate::lighthouse;
use crate::models::PropertyRow;
use crate::Config;
use sqlx::SqlitePool;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Semaphore;
use tokio::task::JoinHandle;

const CYCLE_SECS: u64 = 30;
// Two pools so slow lighthouse/crawler work can't starve quick HTTP pings.
const FAST_PERMITS: usize = 2;
const SLOW_PERMITS: usize = 2;
const CLEANUP_INTERVAL_SECS: i64 = 86_400;
const CHECK_RETENTION_DAYS: i64 = 3;
const CRAWL_WEDGE_SECS: i64 = 900;
const LH_WEDGE_SECS: i64 = 300;
const CRAWL_INTERVAL_DAYS: i64 = 7;
const LH_INTERVAL_DAYS: i64 = 1;

/// Wipe queued/running rows on startup. Anything in those states is
/// leftover from a prior crash; tasks didn't survive the restart so the
/// rows must not block new work.
pub async fn reset_states_on_boot(pool: &SqlitePool) -> anyhow::Result<()> {
    sqlx::query("UPDATE properties SET crawl_state = 'idle' WHERE crawl_state IN ('queued', 'running')")
        .execute(pool)
        .await?;
    sqlx::query(
        "UPDATE properties SET lighthouse_state = 'idle' WHERE lighthouse_state IN ('queued', 'running')",
    )
    .execute(pool)
    .await?;
    Ok(())
}

pub fn spawn(pool: SqlitePool, config: Arc<Config>) -> JoinHandle<()> {
    let fast = Arc::new(Semaphore::new(FAST_PERMITS));
    let slow = Arc::new(Semaphore::new(SLOW_PERMITS));
    tokio::spawn(async move {
        let mut last_cleanup: Option<i64> = None;
        loop {
            if let Err(e) = enqueue_status(&pool, &config, &fast).await {
                tracing::warn!("[scheduler] enqueue_status: {e}");
            }
            if let Err(e) = enqueue_lighthouse(&pool, &config, &slow).await {
                tracing::warn!("[scheduler] enqueue_lighthouse: {e}");
            }
            if let Err(e) = enqueue_crawler(&pool, &config, &slow).await {
                tracing::warn!("[scheduler] enqueue_crawler: {e}");
            }
            if let Err(e) = reset_wedged_states(&pool).await {
                tracing::warn!("[scheduler] reset_wedged_states: {e}");
            }
            if let Err(e) = maybe_cleanup(&pool, &mut last_cleanup).await {
                tracing::warn!("[scheduler] cleanup: {e}");
            }
            tokio::time::sleep(Duration::from_secs(CYCLE_SECS)).await;
        }
    })
}

async fn reset_wedged_states(pool: &SqlitePool) -> sqlx::Result<()> {
    let now = now_ms();
    let crawl_cutoff = now - CRAWL_WEDGE_SECS * 1000;
    let lh_cutoff = now - LH_WEDGE_SECS * 1000;

    sqlx::query(
        "UPDATE properties SET crawl_state = 'idle', last_crawl_error = 'Crawl timed out or was interrupted' \
         WHERE crawl_state = 'running' AND crawl_started_at IS NOT NULL AND crawl_started_at < ?",
    )
    .bind(crawl_cutoff)
    .execute(pool)
    .await?;

    sqlx::query(
        "UPDATE properties SET lighthouse_state = 'idle', last_lighthouse_error = 'Lighthouse run timed out or was interrupted' \
         WHERE lighthouse_state = 'running' AND lighthouse_started_at IS NOT NULL AND lighthouse_started_at < ?",
    )
    .bind(lh_cutoff)
    .execute(pool)
    .await?;
    Ok(())
}

async fn maybe_cleanup(pool: &SqlitePool, last: &mut Option<i64>) -> sqlx::Result<()> {
    let now = now_ms();
    if let Some(t) = *last {
        if (now - t) / 1000 < CLEANUP_INTERVAL_SECS {
            return Ok(());
        }
    }
    let cutoff = now - CHECK_RETENTION_DAYS * 24 * 3600 * 1000;
    let res = sqlx::query("DELETE FROM checks WHERE created_at < ?")
        .bind(cutoff)
        .execute(pool)
        .await?;
    tracing::info!("[scheduler] cleaned {} checks older than {}d", res.rows_affected(), CHECK_RETENTION_DAYS);
    *last = Some(now);
    Ok(())
}

async fn enqueue_status(
    pool: &SqlitePool,
    config: &Arc<Config>,
    sem: &Arc<Semaphore>,
) -> sqlx::Result<()> {
    let now = now_ms();
    let due: Vec<PropertyRow> = sqlx::query_as(
        "SELECT * FROM properties \
         WHERE last_run_at IS NULL OR next_run_at IS NULL OR next_run_at <= ?",
    )
    .bind(now)
    .fetch_all(pool)
    .await?;
    for prop in due {
        let next = checker::next_3min_boundary();
        sqlx::query(
            "UPDATE properties SET next_run_at = ?, last_run_at = ?, updated_at = ? WHERE id = ?",
        )
        .bind(next)
        .bind(now)
        .bind(now)
        .bind(prop.id.clone())
        .execute(pool)
        .await?;

        let pool = pool.clone();
        let sem = sem.clone();
        let config = config.clone();
        tokio::spawn(async move {
            let _permit = match sem.acquire_owned().await {
                Ok(p) => p,
                Err(_) => return,
            };
            tracing::info!("[scheduler] checking status {}", prop.url);
            if let Err(e) = checker::process_check(&pool, &config, &prop).await {
                tracing::warn!("[scheduler] status check failed for {}: {e:#}", prop.url);
            }
        });
    }
    Ok(())
}

async fn enqueue_lighthouse(
    pool: &SqlitePool,
    config: &Arc<Config>,
    sem: &Arc<Semaphore>,
) -> sqlx::Result<()> {
    let now = now_ms();
    let due: Vec<PropertyRow> = sqlx::query_as(
        "SELECT * FROM properties \
         WHERE (last_lighthouse_run_at IS NULL OR next_lighthouse_run_at IS NULL OR next_lighthouse_run_at <= ?) \
         AND lighthouse_state NOT IN ('queued', 'running')",
    )
    .bind(now)
    .fetch_all(pool)
    .await?;

    for prop in due {
        let next = now + LH_INTERVAL_DAYS * 24 * 3600 * 1000;
        sqlx::query(
            "UPDATE properties SET next_lighthouse_run_at = ?, last_lighthouse_run_at = ?, lighthouse_state = 'queued', updated_at = ? \
             WHERE id = ?",
        )
        .bind(next)
        .bind(now)
        .bind(now)
        .bind(prop.id.clone())
        .execute(pool)
        .await?;

        let pool = pool.clone();
        let sem = sem.clone();
        let config = config.clone();
        tokio::spawn(async move {
            let _permit = match sem.acquire_owned().await {
                Ok(p) => p,
                Err(_) => return,
            };
            tracing::info!("[scheduler] lighthouse {}", prop.url);
            run_lighthouse_for(&pool, &config, &prop).await;
        });
    }
    Ok(())
}

async fn run_lighthouse_for(pool: &SqlitePool, config: &Arc<Config>, prop: &PropertyRow) {
    let now = now_ms();
    let _ = sqlx::query(
        "UPDATE properties SET lighthouse_state = 'running', lighthouse_started_at = ?, updated_at = ? WHERE id = ?",
    )
    .bind(now)
    .bind(now)
    .bind(prop.id.clone())
    .execute(pool)
    .await;

    let started = std::time::Instant::now();
    match lighthouse::fetch(&config.root, &prop.url).await {
        Ok(results) => {
            match lighthouse::parse_scores(&results) {
                Ok(scores) => {
                    let details = lighthouse::parse_details(&results);
                    let scores_json = serde_json::to_string(&scores).unwrap_or_else(|_| "{}".into());
                    let details_json = details
                        .as_ref()
                        .and_then(|d| serde_json::to_string(d).ok())
                        .unwrap_or_else(|| "null".into());
                    let dur = started.elapsed().as_millis() as i64;
                    let _ = sqlx::query(
                        "UPDATE properties SET \
                           lighthouse_scores = ?, lighthouse_details = ?, \
                           last_lighthouse_success_at = ?, last_lighthouse_error = NULL, \
                           last_lighthouse_duration_ms = ?, lighthouse_state = 'idle', updated_at = ? \
                         WHERE id = ?",
                    )
                    .bind(scores_json)
                    .bind(details_json)
                    .bind(now_ms())
                    .bind(dur)
                    .bind(now_ms())
                    .bind(prop.id.clone())
                    .execute(pool)
                    .await;
                }
                Err(e) => store_lh_error(pool, prop, &format!("{e}"), started).await,
            }
        }
        Err(e) => store_lh_error(pool, prop, &format!("{e}"), started).await,
    }
}

async fn store_lh_error(
    pool: &SqlitePool,
    prop: &PropertyRow,
    msg: &str,
    started: std::time::Instant,
) {
    tracing::warn!("[scheduler] lighthouse failed for {}: {msg}", prop.url);
    let dur = started.elapsed().as_millis() as i64;
    let _ = sqlx::query(
        "UPDATE properties SET lighthouse_state = 'idle', last_lighthouse_error = ?, last_lighthouse_duration_ms = ?, updated_at = ? \
         WHERE id = ?",
    )
    .bind(msg)
    .bind(dur)
    .bind(now_ms())
    .bind(prop.id.clone())
    .execute(pool)
    .await;
}

async fn enqueue_crawler(
    pool: &SqlitePool,
    _config: &Arc<Config>,
    sem: &Arc<Semaphore>,
) -> sqlx::Result<()> {
    let now = now_ms();
    let due: Vec<PropertyRow> = sqlx::query_as(
        "SELECT * FROM properties \
         WHERE (last_run_at_crawler IS NULL OR next_run_at_crawler IS NULL OR next_run_at_crawler <= ?) \
         AND crawl_state NOT IN ('queued', 'running')",
    )
    .bind(now)
    .fetch_all(pool)
    .await?;

    for prop in due {
        let next = now + CRAWL_INTERVAL_DAYS * 24 * 3600 * 1000;
        sqlx::query(
            "UPDATE properties SET next_run_at_crawler = ?, last_run_at_crawler = ?, crawl_state = 'queued', updated_at = ? \
             WHERE id = ?",
        )
        .bind(next)
        .bind(now)
        .bind(now)
        .bind(prop.id.clone())
        .execute(pool)
        .await?;

        let pool = pool.clone();
        let sem = sem.clone();
        tokio::spawn(async move {
            let _permit = match sem.acquire_owned().await {
                Ok(p) => p,
                Err(_) => return,
            };
            tracing::info!("[scheduler] crawler {}", prop.url);
            run_crawler_for(&pool, &prop).await;
        });
    }
    Ok(())
}

async fn run_crawler_for(pool: &SqlitePool, prop: &PropertyRow) {
    let now = now_ms();
    let _ = sqlx::query(
        "UPDATE properties SET crawl_state = 'running', crawl_started_at = ?, last_crawl_pages_count = 0, updated_at = ? WHERE id = ?",
    )
    .bind(now)
    .bind(now)
    .bind(prop.id.clone())
    .execute(pool)
    .await;

    let pool_for_progress = pool.clone();
    let id_for_progress = prop.id.clone();
    let progress_cb = move |pages: usize| {
        let pool = pool_for_progress.clone();
        let id = id_for_progress.clone();
        tokio::spawn(async move {
            let _ = sqlx::query(
                "UPDATE properties SET last_crawl_pages_count = ? WHERE id = ?",
            )
            .bind(pages as i64)
            .bind(id)
            .execute(&pool)
            .await;
        });
    };

    let started = std::time::Instant::now();
    let outcome = crawler::run_seo_spider(&prop.url, progress_cb).await;
    let duration_ms = started.elapsed().as_millis() as i64;
    match outcome {
        Ok(insights) => {
            let json = serde_json::to_string(&insights).unwrap_or_else(|_| "[]".into());
            let _ = sqlx::query(
                "UPDATE properties SET \
                   crawler_insights = ?, crawl_state = 'idle', \
                   last_crawl_success_at = ?, last_crawl_error = NULL, \
                   last_crawl_duration_ms = ?, updated_at = ? \
                 WHERE id = ?",
            )
            .bind(json)
            .bind(now_ms())
            .bind(duration_ms)
            .bind(now_ms())
            .bind(prop.id.clone())
            .execute(pool)
            .await;
        }
        Err(e) => {
            tracing::warn!("[scheduler] crawl failed for {}: {e:#}", prop.url);
            let _ = sqlx::query(
                "UPDATE properties SET crawl_state = 'idle', last_crawl_error = ?, last_crawl_duration_ms = ?, updated_at = ? WHERE id = ?",
            )
            .bind(format!("{e:#}"))
            .bind(duration_ms)
            .bind(now_ms())
            .bind(prop.id.clone())
            .execute(pool)
            .await;
        }
    }
}
