|
12 | 12 | // See the License for the specific language governing permissions and
|
13 | 13 | // limitations under the License.
|
14 | 14 |
|
| 15 | +use std::{sync::Arc, sync::Mutex, sync::mpsc, thread}; |
| 16 | + |
15 | 17 | // ANCHOR: setup
|
16 |
| -use reqwest::blocking::{get, Response}; |
17 |
| -use reqwest::Url; |
| 18 | +use reqwest::{blocking::Client, Url}; |
18 | 19 | use scraper::{Html, Selector};
|
19 | 20 | use thiserror::Error;
|
20 | 21 |
|
21 | 22 | #[derive(Error, Debug)]
|
22 | 23 | enum Error {
|
23 | 24 | #[error("request error: {0}")]
|
24 | 25 | ReqwestError(#[from] reqwest::Error),
|
| 26 | + #[error("bad http response: {0}")] |
| 27 | + BadResponse(String), |
25 | 28 | }
|
26 | 29 | // ANCHOR_END: setup
|
27 | 30 |
|
28 |
| -// ANCHOR: extract_links |
29 |
| -fn extract_links(response: Response) -> Result<Vec<Url>, Error> { |
| 31 | +// ANCHOR: visit_page |
| 32 | +#[derive(Debug)] |
| 33 | +struct CrawlCommand { |
| 34 | + url: Url, |
| 35 | + extract_links: bool, |
| 36 | +} |
| 37 | + |
| 38 | +fn visit_page(client: &Client, command: &CrawlCommand) -> Result<Vec<Url>, Error> { |
| 39 | + println!("Checking {:#}", command.url); |
| 40 | + let response = client.get(command.url.clone()).send()?; |
| 41 | + if !response.status().is_success() { |
| 42 | + return Err(Error::BadResponse(response.status().to_string())); |
| 43 | + } |
| 44 | + |
| 45 | + let mut link_urls = Vec::new(); |
| 46 | + if !command.extract_links { |
| 47 | + return Ok(link_urls); |
| 48 | + } |
| 49 | + |
30 | 50 | let base_url = response.url().to_owned();
|
31 |
| - let document = response.text()?; |
32 |
| - let html = Html::parse_document(&document); |
33 |
| - let selector = Selector::parse("a").unwrap(); |
| 51 | + let body_text = response.text()?; |
| 52 | + let document = Html::parse_document(&body_text); |
34 | 53 |
|
35 |
| - let mut valid_urls = Vec::new(); |
36 |
| - for element in html.select(&selector) { |
37 |
| - if let Some(href) = element.value().attr("href") { |
38 |
| - match base_url.join(href) { |
39 |
| - Ok(url) => valid_urls.push(url), |
40 |
| - Err(err) => { |
41 |
| - println!("On {base_url}: could not parse {href:?}: {err} (ignored)",); |
42 |
| - } |
| 54 | + let selector = Selector::parse("a").unwrap(); |
| 55 | + let href_values = document |
| 56 | + .select(&selector) |
| 57 | + .filter_map(|element| element.value().attr("href")); |
| 58 | + for href in href_values { |
| 59 | + match base_url.join(href) { |
| 60 | + Ok(link_url) => { |
| 61 | + link_urls.push(link_url); |
| 62 | + } |
| 63 | + Err(err) => { |
| 64 | + println!("On {base_url:#}: ignored unparsable {href:?}: {err}"); |
43 | 65 | }
|
44 | 66 | }
|
45 | 67 | }
|
46 |
| - |
47 |
| - Ok(valid_urls) |
| 68 | + Ok(link_urls) |
48 | 69 | }
|
49 |
| -// ANCHOR_END: extract_links |
| 70 | +// ANCHOR_END: visit_page |
50 | 71 |
|
51 |
| -fn check_links(url: Url) -> Result<Vec<Url>, Error> { |
52 |
| - println!("Checking {url}"); |
| 72 | +struct CrawlState { |
| 73 | + domain: String, |
| 74 | + visited_pages: std::collections::HashSet<String>, |
| 75 | +} |
| 76 | +impl CrawlState { |
| 77 | + fn new(start_url: &Url) -> CrawlState { |
| 78 | + let mut visited_pages = std::collections::HashSet::new(); |
| 79 | + visited_pages.insert(start_url.as_str().to_string()); |
| 80 | + CrawlState { |
| 81 | + domain: start_url.domain().unwrap().to_string(), |
| 82 | + visited_pages, |
| 83 | + } |
| 84 | + } |
53 | 85 |
|
54 |
| - let response = get(url.to_owned())?; |
| 86 | + fn visit_links(&self, url: &Url) -> bool { |
| 87 | + let Some(url_domain) = url.domain() else { |
| 88 | + return false; |
| 89 | + }; |
| 90 | + url_domain == self.domain |
| 91 | + } |
55 | 92 |
|
56 |
| - if !response.status().is_success() { |
57 |
| - return Ok(vec![url.to_owned()]); |
| 93 | + fn mark_visited(&mut self, url: &Url) -> bool { |
| 94 | + self.visited_pages.insert(url.as_str().to_string()) |
58 | 95 | }
|
| 96 | +} |
| 97 | + |
| 98 | +type CrawlResult = Result<Vec<Url>, (Url, Error)>; |
| 99 | +fn spawn_crawler_threads( |
| 100 | + command_receiver: mpsc::Receiver<CrawlCommand>, |
| 101 | + result_sender: mpsc::Sender<CrawlResult>, |
| 102 | + thread_count: u32, |
| 103 | +) { |
| 104 | + let command_receiver = Arc::new(Mutex::new(command_receiver)); |
59 | 105 |
|
60 |
| - let links = extract_links(response)?; |
61 |
| - for link in &links { |
62 |
| - println!("{link}, {:?}", link.domain()); |
| 106 | + for _ in 0..thread_count { |
| 107 | + let result_sender = result_sender.clone(); |
| 108 | + let command_receiver = command_receiver.clone(); |
| 109 | + thread::spawn(move || { |
| 110 | + let client = Client::new(); |
| 111 | + loop { |
| 112 | + let command_result = { |
| 113 | + let receiver_guard = command_receiver.lock().unwrap(); |
| 114 | + receiver_guard.recv() |
| 115 | + }; |
| 116 | + let Ok(crawl_command) = command_result else { |
| 117 | + // The sender got dropped. No more commands coming in. |
| 118 | + break; |
| 119 | + }; |
| 120 | + let crawl_result = match visit_page(&client, &crawl_command) { |
| 121 | + Ok(link_urls) => Ok(link_urls), |
| 122 | + Err(error) => Err((crawl_command.url, error)), |
| 123 | + }; |
| 124 | + result_sender.send(crawl_result).unwrap(); |
| 125 | + } |
| 126 | + }); |
63 | 127 | }
|
| 128 | +} |
| 129 | + |
| 130 | +fn control_crawl( |
| 131 | + start_url: Url, |
| 132 | + command_sender: mpsc::Sender<CrawlCommand>, |
| 133 | + result_receiver: mpsc::Receiver<CrawlResult>, |
| 134 | +) -> Vec<Url> { |
| 135 | + let mut crawl_state = CrawlState::new(&start_url); |
| 136 | + let start_command = CrawlCommand { url: start_url, extract_links: true }; |
| 137 | + command_sender.send(start_command).unwrap(); |
| 138 | + let mut pending_urls = 1; |
64 | 139 |
|
65 |
| - let mut failed_links = Vec::new(); |
66 |
| - for link in links { |
67 |
| - if link.domain() != url.domain() { |
68 |
| - println!("Checking external link: {link}"); |
69 |
| - let response = get(link.clone())?; |
70 |
| - if !response.status().is_success() { |
71 |
| - println!("Error on {url}: {link} failed: {}", response.status()); |
72 |
| - failed_links.push(link); |
| 140 | + let mut bad_urls = Vec::new(); |
| 141 | + while pending_urls > 0 { |
| 142 | + let crawl_result = result_receiver.recv().unwrap(); |
| 143 | + pending_urls -= 1; |
| 144 | + |
| 145 | + match crawl_result { |
| 146 | + Ok(link_urls) => { |
| 147 | + for url in link_urls { |
| 148 | + if crawl_state.mark_visited(&url) { |
| 149 | + let extract_links = crawl_state.visit_links(&url); |
| 150 | + let crawl_command = CrawlCommand { url, extract_links }; |
| 151 | + command_sender.send(crawl_command).unwrap(); |
| 152 | + pending_urls += 1; |
| 153 | + } |
| 154 | + } |
| 155 | + } |
| 156 | + Err((url, error)) => { |
| 157 | + bad_urls.push(url); |
| 158 | + println!("Got crawling error: {:#}", error); |
| 159 | + continue; |
73 | 160 | }
|
74 |
| - } else { |
75 |
| - println!("Checking link in same domain: {link}"); |
76 |
| - failed_links.extend(check_links(link)?) |
77 | 161 | }
|
78 | 162 | }
|
| 163 | + bad_urls |
| 164 | +} |
79 | 165 |
|
80 |
| - Ok(failed_links) |
| 166 | +fn check_links(start_url: Url) -> Vec<Url> { |
| 167 | + let (result_sender, result_receiver) = mpsc::channel::<CrawlResult>(); |
| 168 | + let (command_sender, command_receiver) = mpsc::channel::<CrawlCommand>(); |
| 169 | + spawn_crawler_threads(command_receiver, result_sender, 16); |
| 170 | + control_crawl(start_url, command_sender, result_receiver) |
81 | 171 | }
|
82 | 172 |
|
83 | 173 | fn main() {
|
84 |
| - let start_url = Url::parse("https://www.google.org").unwrap(); |
85 |
| - match check_links(start_url) { |
86 |
| - Ok(links) => println!("Links: {links:#?}"), |
87 |
| - Err(err) => println!("Could not extract links: {err:#}"), |
88 |
| - } |
| 174 | + let start_url = reqwest::Url::parse("https://www.google.org").unwrap(); |
| 175 | + let bad_urls = check_links(start_url); |
| 176 | + println!("Bad URLs: {:#?}", bad_urls); |
89 | 177 | }
|
0 commit comments