heartwood every commit a ring
9.1 KB raw
//! In-process SEO crawler. Fetches up to PAGE_CAP pages from the same host,
//! collects metadata, and runs a fixed set of checks. Designed to be invoked
//! from the scheduler with a progress callback.
mod checks;
mod fetcher;
mod parser;

use anyhow::Result;
use serde_json::Value;
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
use std::time::Instant;
use url::Url;

pub use fetcher::PAGE_CAP;
use fetcher::{
    fetch, head_status, load_robots, load_sitemap, make_client, make_probe_client,
    is_crawler_hostile, probe_compression, same_site, FetchResult, CRAWL_DEADLINE_SECS,
    CONCURRENCY,
};
use parser::parse_html;

#[derive(Debug, Clone, serde::Serialize)]
pub struct Page {
    pub url: String,
    pub requested_url: String,
    pub status: u16,
    pub content_type: String,
    pub elapsed_ms: i64,
    pub bytes: usize,
    pub headers: HashMap<String, String>,
    pub redirect_chain: Vec<(u16, String)>,
    pub error: String,
    pub is_html: bool,
    #[serde(flatten)]
    pub html: Option<parser::ParsedHtml>,
}

#[derive(Debug, serde::Serialize)]
pub struct CrawlResult {
    pub start_url: String,
    pub host: String,
    pub pages: Vec<Page>,
    pub external_link_status: HashMap<String, u16>,
    pub sitemap_urls: Vec<String>,
    pub robots: RobotsCtx,
    /// Server's `Content-Encoding` for the start URL (e.g. "gzip", "br",
    /// "zstd"). `None` means the server returned the response uncompressed.
    /// Probed with a separate non-decompressing client because reqwest's auto-
    /// decompression strips the header.
    pub compression: Option<String>,
}

#[derive(Debug, serde::Serialize)]
pub struct RobotsCtx {
    pub url: String,
    pub exists: bool,
    pub raw: Option<String>,
    pub references_sitemap: bool,
}

fn normalize(url: &str) -> String {
    if let Ok(mut u) = Url::parse(url) {
        u.set_fragment(None);
        let s = u.to_string();
        let trimmed = s.trim_end_matches('/');
        if trimmed.is_empty() { s } else { trimmed.to_string() }
    } else {
        url.to_string()
    }
}

/// Crawl `start_url`, run all checks, return the flat insight list.
pub async fn run_seo_spider<F>(start_url: &str, mut progress_cb: F) -> Result<Vec<Value>>
where
    F: FnMut(usize) + Send + 'static,
{
    let start = Instant::now();
    tracing::info!("[crawler] starting {start_url}");
    let result = crawl(start_url, &mut progress_cb).await?;
    let insights = run_checks(&result);
    tracing::info!(
        "[crawler] done {start_url} - {} pages, {} insights, {:.1}s",
        result.pages.len(),
        insights.len(),
        start.elapsed().as_secs_f64()
    );
    Ok(insights)
}

async fn crawl<F>(start_url: &str, progress_cb: &mut F) -> Result<CrawlResult>
where
    F: FnMut(usize) + Send,
{
    let parsed = Url::parse(start_url)?;
    let host = parsed.host_str().unwrap_or("").to_string();
    let base_origin = format!("{}://{}", parsed.scheme(), parsed.host_str().unwrap_or(""));

    let client = make_client();
    let probe_client = make_probe_client();
    let compression = probe_compression(&probe_client, start_url).await;
    let (robots, robots_url, robots_text) = load_robots(&client, &base_origin).await;
    let robots = Arc::new(robots);
    let sitemap_urls = load_sitemap(&client, &base_origin, robots_text.as_deref()).await;

    let references_sitemap = robots_text
        .as_deref()
        .map(|t| {
            t.lines()
                .any(|l| l.trim().to_lowercase().starts_with("sitemap:"))
        })
        .unwrap_or(false);

    let mut seen: HashSet<String> = HashSet::new();
    let mut queue: VecDeque<String> = VecDeque::new();
    let mut pages: Vec<Page> = Vec::new();
    let mut fetched: HashSet<String> = HashSet::new();
    let deadline = Instant::now() + std::time::Duration::from_secs(CRAWL_DEADLINE_SECS);

    let enqueue = |url: String, seen: &mut HashSet<String>, queue: &mut VecDeque<String>| {
        let n = normalize(&url);
        if !seen.contains(&n) {
            seen.insert(n);
            queue.push_back(url);
        }
    };

    enqueue(start_url.to_string(), &mut seen, &mut queue);
    for url in sitemap_urls.iter().take(PAGE_CAP) {
        if same_site(url, &host) {
            enqueue(url.clone(), &mut seen, &mut queue);
        }
    }

    while !queue.is_empty() && pages.len() < PAGE_CAP && Instant::now() < deadline {
        // Pull a batch up to CONCURRENCY, respecting robots.
        let mut batch: Vec<String> = Vec::new();
        while let Some(url) = queue.pop_front() {
            if !robots.allowed(&url) {
                continue;
            }
            batch.push(url);
            if batch.len() >= CONCURRENCY || pages.len() + batch.len() >= PAGE_CAP {
                break;
            }
        }
        if batch.is_empty() {
            break;
        }

        let futs = batch
            .into_iter()
            .map(|u| {
                let client = client.clone();
                async move { fetch(&client, &u).await }
            })
            .collect::<Vec<_>>();
        let results: Vec<FetchResult> = futures_util::future::join_all(futs).await;

        for r in results {
            let final_key = normalize(&r.url);
            if fetched.contains(&final_key) {
                seen.insert(final_key);
                continue;
            }
            fetched.insert(final_key.clone());
            let is_html = r.status == 200 && r.content_type.contains("text/html");
            let mut page = Page {
                url: r.url.clone(),
                requested_url: r.requested_url.clone(),
                status: r.status,
                content_type: r.content_type.clone(),
                elapsed_ms: r.elapsed_ms,
                bytes: r.body.len(),
                headers: r.headers.clone(),
                redirect_chain: r.redirect_chain.clone(),
                error: r.error.clone(),
                is_html,
                html: None,
            };
            if is_html {
                match parse_html(&r.body, &r.url) {
                    Ok(parsed) => {
                        for link in &parsed.links {
                            if same_site(&link.url, &host) {
                                let n = normalize(&link.url);
                                if !seen.contains(&n) {
                                    seen.insert(n);
                                    queue.push_back(link.url.clone());
                                }
                            }
                        }
                        page.html = Some(parsed);
                    }
                    Err(e) => {
                        tracing::warn!("[crawler] parse failed for {}: {e}", r.url);
                        page.is_html = false;
                    }
                }
            }
            seen.insert(normalize(&r.url));
            pages.push(page);
        }
        progress_cb(pages.len());
    }

    if Instant::now() >= deadline {
        tracing::warn!(
            "[crawler] hit deadline for {start_url} after {} pages",
            pages.len()
        );
    }

    // External link HEAD check.
    let mut external_links: HashSet<String> = HashSet::new();
    for p in &pages {
        if !p.is_html {
            continue;
        }
        if let Some(html) = &p.html {
            for link in &html.links {
                if same_site(&link.url, &host) || is_crawler_hostile(&link.url) {
                    continue;
                }
                external_links.insert(link.url.clone());
            }
        }
    }
    let mut external_link_status: HashMap<String, u16> = HashMap::new();
    if !external_links.is_empty() && Instant::now() < deadline {
        let urls: Vec<String> = external_links.into_iter().collect();
        for chunk in urls.chunks(CONCURRENCY) {
            let futs = chunk.iter().cloned().map(|u| {
                let client = client.clone();
                async move {
                    let s = head_status(&client, &u).await;
                    (u, s)
                }
            });
            let results = futures_util::future::join_all(futs).await;
            for (u, s) in results {
                external_link_status.insert(u, s);
            }
        }
    }

    Ok(CrawlResult {
        start_url: start_url.to_string(),
        host,
        pages,
        external_link_status,
        sitemap_urls,
        robots: RobotsCtx {
            url: robots_url,
            exists: robots_text.is_some(),
            raw: robots_text,
            references_sitemap,
        },
        compression,
    })
}

fn run_checks(result: &CrawlResult) -> Vec<Value> {
    let html_pages: Vec<&Page> = result.pages.iter().filter(|p| p.is_html).collect();
    let mut status_map: HashMap<String, u16> = HashMap::new();
    for p in &result.pages {
        status_map.insert(p.url.clone(), p.status);
    }
    let ctx = checks::Ctx {
        start_url: &result.start_url,
        host: &result.host,
        pages: &result.pages,
        html_pages: &html_pages,
        status_map: &status_map,
        external_link_status: &result.external_link_status,
        sitemap_urls: &result.sitemap_urls,
        robots: &result.robots,
        compression: result.compression.as_deref(),
    };
    checks::run_all(&ctx)
}