diff --git a/Cargo.lock b/Cargo.lock index afd2104..2428881 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,23 +1,91 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +version = 3 + [[package]] name = "argparse" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37bb99f5e39ee8b23b6e227f5b8f024207e8616f44aa4b8c76ecd828011667ef" [[package]] name = "byteorder" -version = "0.5.3" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + +[[package]] +name = "displaydoc" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] [[package]] name = "fingertips" version = "0.1.0" dependencies = [ - "argparse 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", - "byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", + "argparse", + "byteorder", + "displaydoc", + "thiserror", ] -[metadata] -"checksum argparse 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "37bb99f5e39ee8b23b6e227f5b8f024207e8616f44aa4b8c76ecd828011667ef" -"checksum byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "0fc10e8cc6b2580fda3f36eb6dc5316657f812a3df879a44a66fc9f0fdbc4855" +[[package]] +name = "proc-macro2" +version = "1.0.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835ff2298f5721608eb1a980ecaee1aef2c132bf95ecc026a11b7bf3c01c02e" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "syn" +version = "2.0.53" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7383cd0e49fff4b6b90ca5670bfd3e9d6a733b3f90c686605aa7eec8c4996032" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "thiserror" +version = "1.0.58" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03468839009160513471e86a034bb2c5c0e4baae3b43f79ffc55c4a5427b3297" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.58" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "unicode-ident" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" diff --git a/Cargo.toml b/Cargo.toml index a5991f6..3c86d60 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,8 +2,25 @@ name = "fingertips" version = "0.1.0" authors = ["Jason Orendorff "] -edition = "2018" +edition = "2021" [dependencies] argparse = "0.2.1" -byteorder = "0.5.3" +byteorder = "1.5.0" +displaydoc = "0.2.4" +thiserror = "1.0.58" + +[lints.rust] +unsafe_code = "forbid" +missing_docs = "warn" +rust_2018_idioms = "warn" +trivial_casts = "warn" +unused_lifetimes = "warn" +unused_qualifications = "warn" +bad_style = "warn" +dead_code = "warn" + +[lints.clippy] +all = "warn" +unwrap_used = "warn" +expect_used = "warn" diff --git a/src/bin/fingertips.rs b/src/bin/fingertips.rs new file mode 100644 index 0000000..f9e87d7 --- /dev/null +++ b/src/bin/fingertips.rs @@ -0,0 +1,45 @@ +//! `fingertips` creates an inverted index for a set of text files. +//! +//! Most of the actual work is done by the modules `index`, `read`, `write`, +//! and `merge`. In this file, `main.rs`, we put the pieces together in two +//! different ways. +//! +//! * `run_single_threaded` simply does everything in one thread, in +//! the most straightforward possible way. +//! +//! * Then, we break the work into a five-stage pipeline so that we can run +//! it on multiple CPUs. `run_pipeline` puts the five stages together. +//! +//! This is the `main` function that handles command-line arguments. It calls one +//! of the two functions above to do the work. + +use argparse::{ArgumentParser, Collect, StoreTrue}; +use fingertips::run; + +fn main() { + let mut single_threaded = false; + let mut filenames = vec![]; + + { + let mut ap = ArgumentParser::new(); + ap.set_description("Make an inverted index for searching documents."); + _ = ap.refer(&mut single_threaded).add_option( + &["-1", "--single-threaded"], + StoreTrue, + "Do all the work on a single thread.", + ); + _ = ap.refer(&mut filenames).add_argument( + "filenames", + Collect, + "Names of files/directories to index. \ + For directories, all .txt files immediately \ + under the directory are indexed.", + ); + ap.parse_args_or_exit(); + } + + match run(filenames, single_threaded) { + Ok(()) => {} + Err(err) => println!("error: {err}"), + } +} diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..15db482 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,55 @@ +use std::error::Error; + +/// Result type that is being returned from methods that can fail and thus have [`FingertipsError`]s. +pub type FingertipsResult = Result; + +/// Errors that can result from Fingertips. +// [`Error`] is public, but opaque and easy to keep compatible. +#[derive(thiserror::Error, Debug)] +#[error(transparent)] +pub struct FingertipsError(#[from] FingertipsErrorKind); + +// Accessors for anything we do want to expose publicly. +impl FingertipsError { + /// Expose the inner error kind. + /// + /// This is useful for matching on the error kind. + pub fn into_inner(self) -> FingertipsErrorKind { + self.0 + } +} + +/// [`FingertipsErrorKind`] describes the errors that can happen while executing a high-level command. +/// +/// This is a non-exhaustive enum, so additional variants may be added in future. It is +/// recommended to match against the wildcard `_` instead of listing all possible variants, +/// to avoid problems when new variants are added. +#[non_exhaustive] +#[derive(thiserror::Error, Debug, displaydoc::Display)] +pub enum FingertipsErrorKind { + /// An error occurred while reading from or writing to a file. + #[error(transparent)] + Io(#[from] std::io::Error), + /// An error occurred while parsing a file + TermEmpty, + /// An error occured in the algorithm + AlgorithmError, + /// No entry to move + NoEntryToMove, + /// Computer not big enough to hold index entry, you may be on 32bit platform + PlatformLimitExceeded, +} + +trait FingertipsErrorMarker: Error {} + +// impl FingertipsErrorMarker for FingertipErrorsInTheCodeBase {} + +impl From for FingertipsError +where + E: FingertipsErrorMarker, + FingertipsErrorKind: From, +{ + fn from(value: E) -> Self { + Self(FingertipsErrorKind::from(value)) + } +} diff --git a/src/index.rs b/src/index.rs index 68b1b6b..55fbbe9 100644 --- a/src/index.rs +++ b/src/index.rs @@ -4,8 +4,10 @@ //! `InMemoryIndex` can be used to do that, up to the size of the machine's //! memory. -use std::collections::HashMap; use byteorder::{LittleEndian, WriteBytesExt}; +use std::collections::HashMap; + +use crate::error::{FingertipsErrorKind, FingertipsResult}; /// Break a string into words. fn tokenize(text: &str) -> Vec<&str> { @@ -21,6 +23,7 @@ fn tokenize(text: &str) -> Vec<&str> { /// answer simple search queries. And you can use the `read`, `write`, and /// `merge` modules to save an in-memory index to disk and merge it with other /// indices, producing a large index. +#[derive(Default, Debug, Clone, PartialEq, Eq)] pub struct InMemoryIndex { /// The total number of words in the indexed documents. pub word_count: usize, @@ -34,7 +37,7 @@ pub struct InMemoryIndex { /// document id in increasing order. This is handy for some algorithms you /// might want to run on the index, so we preserve this property wherever /// possible. - pub map: HashMap> + pub map: HashMap>, } /// A `Hit` indicates that a particular document contains some term, how many @@ -47,40 +50,50 @@ pub type Hit = Vec; impl InMemoryIndex { /// Create a new, empty index. - pub fn new() -> InMemoryIndex { - InMemoryIndex { - word_count: 0, - map: HashMap::new() - } + pub fn new() -> Self { + Self::default() } /// Index a single document. /// /// The resulting index contains exactly one `Hit` per term. - pub fn from_single_document(document_id: usize, text: String) -> InMemoryIndex { + pub fn from_single_document(document_id: usize, text: String) -> FingertipsResult { let document_id = document_id as u32; - let mut index = InMemoryIndex::new(); + let mut index = Self::new(); let text = text.to_lowercase(); let tokens = tokenize(&text); + + let hits_list = { + let mut hits = Vec::with_capacity(4 + 4); + hits.write_u32::(document_id) + .map_err(FingertipsErrorKind::Io)?; + + vec![hits] + }; + for (i, token) in tokens.iter().enumerate() { - let hits = - index.map - .entry(token.to_string()) - .or_insert_with(|| { - let mut hits = Vec::with_capacity(4 + 4); - hits.write_u32::(document_id).unwrap(); - vec![hits] - }); - hits[0].write_u32::(i as u32).unwrap(); + let hits = index + .map + .entry((*token).to_string()) + .or_insert(hits_list.clone()); + hits[0] + .write_u32::(i as u32) + .map_err(FingertipsErrorKind::Io)?; + index.word_count += 1; } if document_id % 100 == 0 { - println!("indexed document {}, {} bytes, {} words", document_id, text.len(), index.word_count); + println!( + "indexed document {}, {} bytes, {} words", + document_id, + text.len(), + index.word_count + ); } - index + Ok(index) } /// Add all search hits from `other` to this index. @@ -88,11 +101,9 @@ impl InMemoryIndex { /// If both `*self` and `other` are sorted by document id, and all document /// ids in `other` are greater than every document id in `*self`, then /// `*self` remains sorted by document id after merging. - pub fn merge(&mut self, other: InMemoryIndex) { + pub fn merge(&mut self, other: Self) { for (term, hits) in other.map { - self.map.entry(term) - .or_insert_with(|| vec![]) - .extend(hits) + self.map.entry(term).or_default().extend(hits); } self.word_count += other.word_count; } diff --git a/src/main.rs b/src/lib.rs similarity index 71% rename from src/main.rs rename to src/lib.rs index ab45791..3c29952 100644 --- a/src/main.rs +++ b/src/lib.rs @@ -1,23 +1,24 @@ -/// `fingertips` creates an inverted index for a set of text files. -/// -/// Most of the actual work is done by the modules `index`, `read`, `write`, -/// and `merge`. In this file, `main.rs`, we put the pieces together in two -/// different ways. -/// -/// * `run_single_threaded` simply does everything in one thread, in -/// the most straightforward possible way. -/// -/// * Then, we break the work into a five-stage pipeline so that we can run -/// it on multiple CPUs. `run_pipeline` puts the five stages together. -/// -/// The `main` function at the end handles command-line arguments. It calls one -/// of the two functions above to do the work. - +//! `fingertips` creates an inverted index for a set of text files. +//! +//! Most of the actual work is done by the modules `index`, `read`, `write`, +//! and `merge`. In this file, `main.rs`, we put the pieces together in two +//! different ways. +//! +//! * `run_single_threaded` simply does everything in one thread, in +//! the most straightforward possible way. +//! +//! * Then, we break the work into a five-stage pipeline so that we can run +//! it on multiple CPUs. `run_pipeline` puts the five stages together. +//! +//! The `main` function at the end handles command-line arguments. It calls one +//! of the two functions above to do the work. + +mod error; mod index; -mod read; -mod write; mod merge; +mod read; mod tmp; +mod write; use std::fs::File; use std::io; @@ -25,18 +26,15 @@ use std::io::prelude::*; use std::path::{Path, PathBuf}; use std::sync::mpsc::{channel, Receiver}; use std::thread::{spawn, JoinHandle}; -use argparse::{ArgumentParser, StoreTrue, Collect}; -use crate::index::InMemoryIndex; -use crate::write::write_index_to_tmp_file; -use crate::merge::FileMerge; use crate::tmp::TmpDir; +use crate::write::write_index_to_tmp_file; +use crate::{error::FingertipsErrorKind, merge::FileMerge}; +use crate::{error::FingertipsResult, index::InMemoryIndex}; /// Create an inverted index for the given list of `documents`, /// storing it in the specified `output_dir`. -fn run_single_threaded(documents: Vec, output_dir: PathBuf) - -> io::Result<()> -{ +fn run_single_threaded(documents: Vec, output_dir: PathBuf) -> FingertipsResult<()> { // If all the documents fit comfortably in memory, we'll create the whole // index in memory. let mut accumulated_index = InMemoryIndex::new(); @@ -52,16 +50,19 @@ fn run_single_threaded(documents: Vec, output_dir: PathBuf) // For each document in the set... for (doc_id, filename) in documents.into_iter().enumerate() { // ...load it into memory... - let mut f = File::open(filename)?; + let mut f = File::open(filename).map_err(FingertipsErrorKind::Io)?; let mut text = String::new(); - f.read_to_string(&mut text)?; + _ = f + .read_to_string(&mut text) + .map_err(FingertipsErrorKind::Io)?; // ...and add its contents to the in-memory `accumulated_index`. - let index = InMemoryIndex::from_single_document(doc_id, text); + let index = InMemoryIndex::from_single_document(doc_id, text)?; accumulated_index.merge(index); if accumulated_index.is_large() { // To avoid running out of memory, dump `accumulated_index` to disk. - let file = write_index_to_tmp_file(accumulated_index, &mut tmp_dir)?; + let file = write_index_to_tmp_file(accumulated_index, &mut tmp_dir) + .map_err(FingertipsErrorKind::Io)?; merge.add_file(file)?; accumulated_index = InMemoryIndex::new(); } @@ -70,10 +71,13 @@ fn run_single_threaded(documents: Vec, output_dir: PathBuf) // Done reading documents! Save the last data set to disk, then merge the // temporary index files if there are more than one. if !accumulated_index.is_empty() { - let file = write_index_to_tmp_file(accumulated_index, &mut tmp_dir)?; + let file = write_index_to_tmp_file(accumulated_index, &mut tmp_dir) + .map_err(FingertipsErrorKind::Io)?; merge.add_file(file)?; } - merge.finish() + merge.finish()?; + + Ok(()) } /// Start a thread that loads documents from the filesystem into memory. @@ -83,16 +87,18 @@ fn run_single_threaded(documents: Vec, output_dir: PathBuf) /// This returns a pair of values: a receiver that receives the documents, as /// Strings; and a `JoinHandle` that can be used to wait for this thread to /// exit and to get the `io::Error` value if anything goes wrong. -fn start_file_reader_thread(documents: Vec) - -> (Receiver, JoinHandle>) -{ +fn start_file_reader_thread( + documents: Vec, +) -> (Receiver, JoinHandle>) { let (sender, receiver) = channel(); let handle = spawn(move || { for filename in documents { - let mut f = File::open(filename)?; + let mut f = File::open(filename).map_err(FingertipsErrorKind::Io)?; let mut text = String::new(); - f.read_to_string(&mut text)?; + _ = f + .read_to_string(&mut text) + .map_err(FingertipsErrorKind::Io)?; if sender.send(text).is_err() { break; @@ -113,14 +119,16 @@ fn start_file_reader_thread(documents: Vec) /// receiver, the sequence of in-memory indexes; and a `JoinHandle` that can be /// used to wait for this thread to exit. This stage of the pipeline is /// infallible (it performs no I/O, so there are no possible errors). -fn start_file_indexing_thread(texts: Receiver) - -> (Receiver, JoinHandle<()>) -{ +#[allow(clippy::expect_used)] +fn start_file_indexing_thread( + texts: Receiver, +) -> (Receiver, JoinHandle<()>) { let (sender, receiver) = channel(); let handle = spawn(move || { for (doc_id, text) in texts.into_iter().enumerate() { - let index = InMemoryIndex::from_single_document(doc_id, text); + let index = InMemoryIndex::from_single_document(doc_id, text) + .expect("InMemoryIndex::from_single_document should not fail in this context"); if sender.send(index).is_err() { break; } @@ -143,9 +151,9 @@ fn start_file_indexing_thread(texts: Receiver) /// merging the input indexes; and a `JoinHandle` that can be used to wait for /// this thread to exit. This stage of the pipeline is infallible (it performs /// no I/O). -fn start_in_memory_merge_thread(file_indexes: Receiver) - -> (Receiver, JoinHandle<()>) -{ +fn start_in_memory_merge_thread( + file_indexes: Receiver, +) -> (Receiver, JoinHandle<()>) { let (sender, receiver) = channel(); let handle = spawn(move || { @@ -175,16 +183,17 @@ fn start_in_memory_merge_thread(file_indexes: Receiver) /// This returns a pair: a receiver that receives the filenames; and a /// `JoinHandle` that can be used to wait for this thread to exit and receive /// any I/O errors it encountered. -fn start_index_writer_thread(big_indexes: Receiver, - output_dir: &Path) - -> (Receiver, JoinHandle>) -{ +fn start_index_writer_thread( + big_indexes: Receiver, + output_dir: &Path, +) -> (Receiver, JoinHandle>) { let (sender, receiver) = channel(); let mut tmp_dir = TmpDir::new(output_dir); let handle = spawn(move || { for index in big_indexes { - let file = write_index_to_tmp_file(index, &mut tmp_dir)?; + let file = + write_index_to_tmp_file(index, &mut tmp_dir).map_err(FingertipsErrorKind::Io)?; if sender.send(file).is_err() { break; } @@ -197,14 +206,14 @@ fn start_index_writer_thread(big_indexes: Receiver, /// Given a sequence of filenames of index data files, merge all the files /// into a single index data file. -fn merge_index_files(files: Receiver, output_dir: &Path) - -> io::Result<()> -{ +fn merge_index_files(files: Receiver, output_dir: &Path) -> FingertipsResult<()> { let mut merge = FileMerge::new(output_dir); for file in files { merge.add_file(file)?; } - merge.finish() + merge.finish()?; + + Ok(()) } /// Create an inverted index for the given list of `documents`, @@ -213,27 +222,27 @@ fn merge_index_files(files: Receiver, output_dir: &Path) /// On success this does exactly the same thing as `run_single_threaded`, but /// faster since it uses multiple CPUs and keeps them busy while I/O is /// happening. -fn run_pipeline(documents: Vec, output_dir: PathBuf) - -> io::Result<()> -{ +#[allow(clippy::expect_used)] +fn run_pipeline(documents: Vec, output_dir: PathBuf) -> FingertipsResult<()> { // Launch all five stages of the pipeline. - let (texts, h1) = start_file_reader_thread(documents); - let (pints, h2) = start_file_indexing_thread(texts); + let (texts, h1) = start_file_reader_thread(documents); + let (pints, h2) = start_file_indexing_thread(texts); let (gallons, h3) = start_in_memory_merge_thread(pints); - let (files, h4) = start_index_writer_thread(gallons, &output_dir); + let (files, h4) = start_index_writer_thread(gallons, &output_dir); let result = merge_index_files(files, &output_dir); // Wait for threads to finish, holding on to any errors that they encounter. - let r1 = h1.join().unwrap(); - h2.join().unwrap(); - h3.join().unwrap(); - let r4 = h4.join().unwrap(); + let r1 = h1.join().expect("File reader thread panicked!"); + h2.join().expect("In-memory indexing thread panicked!"); + h3.join().expect("In-memory indexing thread panicked!"); + let r4 = h4.join().expect("Index writer thread panicked!"); // Return the first error encountered, if any. // (As it happens, h2 and h3 can't fail: those threads // are pure in-memory data processing.) r1?; r4?; + result } @@ -263,9 +272,9 @@ fn expand_filename_arguments(args: Vec) -> io::Result> { } /// Generate an index for a bunch of text files. -fn run(filenames: Vec, single_threaded: bool) -> io::Result<()> { +pub fn run(filenames: Vec, single_threaded: bool) -> FingertipsResult<()> { let output_dir = PathBuf::from("."); - let documents = expand_filename_arguments(filenames)?; + let documents = expand_filename_arguments(filenames).map_err(FingertipsErrorKind::Io)?; if single_threaded { run_single_threaded(documents, output_dir) @@ -273,27 +282,3 @@ fn run(filenames: Vec, single_threaded: bool) -> io::Result<()> { run_pipeline(documents, output_dir) } } - -fn main() { - let mut single_threaded = false; - let mut filenames = vec![]; - - { - let mut ap = ArgumentParser::new(); - ap.set_description("Make an inverted index for searching documents."); - ap.refer(&mut single_threaded) - .add_option(&["-1", "--single-threaded"], StoreTrue, - "Do all the work on a single thread."); - ap.refer(&mut filenames) - .add_argument("filenames", Collect, - "Names of files/directories to index. \ - For directories, all .txt files immediately \ - under the directory are indexed."); - ap.parse_args_or_exit(); - } - - match run(filenames, single_threaded) { - Ok(()) => {} - Err(err) => println!("error: {}", err) - } -} diff --git a/src/merge.rs b/src/merge.rs index 2a1adc7..7bc3102 100644 --- a/src/merge.rs +++ b/src/merge.rs @@ -1,43 +1,60 @@ -use std::fs::{self, File}; -use std::io::{self, BufWriter}; -use std::mem; use std::path::{Path, PathBuf}; +use std::{fmt, mem}; +use std::{ + fmt::Debug, + fs::{self, File}, +}; +use std::{ + fmt::Formatter, + io::{self, BufWriter}, +}; -use crate::tmp::TmpDir; -use crate::read::IndexFileReader; use crate::write::IndexFileWriter; +use crate::{error::FingertipsErrorKind, tmp::TmpDir}; +use crate::{error::FingertipsResult, read::IndexFileReader}; +pub(crate) mod constants { + // How many files to merge at a time, at most. + pub const NSTREAMS: usize = 8; + pub const MERGED_FILENAME: &str = "index.dat"; +} + +#[derive(Clone)] pub struct FileMerge { output_dir: PathBuf, tmp_dir: TmpDir, - stacks: Vec> + stacks: Vec>, } -// How many files to merge at a time, at most. -const NSTREAMS: usize = 8; - -const MERGED_FILENAME: &'static str = "index.dat"; +impl Debug for FileMerge { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("FileMerge") + .field("output_dir", &self.output_dir) + .field("stacks", &self.stacks) + .finish() + } +} impl FileMerge { - pub fn new(output_dir: &Path) -> FileMerge { - FileMerge { + pub fn new(output_dir: &Path) -> Self { + Self { output_dir: output_dir.to_owned(), - tmp_dir: TmpDir::new(output_dir.to_owned()), - stacks: vec![] + tmp_dir: TmpDir::new(output_dir), + stacks: vec![], } } - pub fn add_file(&mut self, mut file: PathBuf) -> io::Result<()> { + pub fn add_file(&mut self, mut file: PathBuf) -> FingertipsResult<()> { let mut level = 0; loop { if level == self.stacks.len() { self.stacks.push(vec![]); } self.stacks[level].push(file); - if self.stacks[level].len() < NSTREAMS { + if self.stacks[level].len() < constants::NSTREAMS { break; } - let (filename, out) = self.tmp_dir.create()?; + let (filename, out) = self.tmp_dir.create().map_err(FingertipsErrorKind::Io)?; let mut to_merge = vec![]; mem::swap(&mut self.stacks[level], &mut to_merge); merge_streams(to_merge, out)?; @@ -47,12 +64,12 @@ impl FileMerge { Ok(()) } - pub fn finish(mut self) -> io::Result<()> { - let mut tmp = Vec::with_capacity(NSTREAMS); + pub fn finish(mut self) -> FingertipsResult<()> { + let mut tmp = Vec::with_capacity(constants::NSTREAMS); for stack in self.stacks { for file in stack.into_iter().rev() { tmp.push(file); - if tmp.len() == NSTREAMS { + if tmp.len() == constants::NSTREAMS { merge_reversed(&mut tmp, &mut self.tmp_dir)?; } } @@ -62,25 +79,28 @@ impl FileMerge { merge_reversed(&mut tmp, &mut self.tmp_dir)?; } assert!(tmp.len() <= 1); - match tmp.pop() { - Some(last_file) => - fs::rename(last_file, self.output_dir.join(MERGED_FILENAME)), - None => - Err(io::Error::new(io::ErrorKind::Other, - "no documents were parsed or none contained any words")) + + if let Some(last_file) = tmp.pop() { + fs::rename(last_file, self.output_dir.join(constants::MERGED_FILENAME)) + .map_err(|err| FingertipsErrorKind::Io(err).into()) + } else { + Err(FingertipsErrorKind::from(io::Error::new( + io::ErrorKind::Other, + "no documents were parsed or none contained any words", + )) + .into()) } } } -fn merge_streams(files: Vec, out: BufWriter) - -> io::Result<()> -{ - let mut streams: Vec = - files.into_iter() - .map(IndexFileReader::open_and_delete) - .collect::>()?; +fn merge_streams(files: Vec, out: BufWriter) -> FingertipsResult<()> { + let mut streams: Vec = files + .into_iter() + .map(IndexFileReader::open_and_delete) + .map(|result| result.map_err(|err| FingertipsErrorKind::Io(err).into())) + .collect::>()?; - let mut output = IndexFileWriter::new(out)?; + let mut output = IndexFileWriter::new(out).map_err(FingertipsErrorKind::Io)?; let mut point: u64 = 0; let mut count = streams.iter().filter(|s| s.peek().is_some()).count(); @@ -92,18 +112,20 @@ fn merge_streams(files: Vec, out: BufWriter) match s.peek() { None => {} Some(entry) => { - if term.is_none() || entry.term < *term.as_ref().unwrap() { + if term.is_none() + || entry.term < *term.as_ref().ok_or(FingertipsErrorKind::TermEmpty)? + { term = Some(entry.term.clone()); // XXX LAME clone nbytes = entry.nbytes; df = entry.df; - } else if entry.term == *term.as_ref().unwrap() { + } else if entry.term == *term.as_ref().ok_or(FingertipsErrorKind::TermEmpty)? { nbytes += entry.nbytes; df += entry.df; } } } } - let term = term.expect("bug in algorithm!"); + let term = term.ok_or(FingertipsErrorKind::AlgorithmError)?; for s in &mut streams { if s.is_at(&term) { @@ -113,18 +135,22 @@ fn merge_streams(files: Vec, out: BufWriter) } } } - output.write_contents_entry(term, df, point, nbytes as u64); - point += nbytes as u64; + output + .write_contents_entry(term, df, point, nbytes) + .map_err(FingertipsErrorKind::Io)?; + + point += nbytes; } assert!(streams.iter().all(|s| s.peek().is_none())); - output.finish() + + Ok(output.finish().map_err(FingertipsErrorKind::Io)?) } -fn merge_reversed(filenames: &mut Vec, tmp_dir: &mut TmpDir) -> io::Result<()> { +fn merge_reversed(filenames: &mut Vec, tmp_dir: &mut TmpDir) -> FingertipsResult<()> { filenames.reverse(); - let (merged_filename, out) = tmp_dir.create()?; - let mut to_merge = Vec::with_capacity(NSTREAMS); + let (merged_filename, out) = tmp_dir.create().map_err(FingertipsErrorKind::Io)?; + let mut to_merge = Vec::with_capacity(constants::NSTREAMS); mem::swap(filenames, &mut to_merge); merge_streams(to_merge, out)?; filenames.push(merged_filename); diff --git a/src/read.rs b/src/read.rs index 460a797..a99bd3a 100644 --- a/src/read.rs +++ b/src/read.rs @@ -1,12 +1,15 @@ //! Reading index files linearly from disk, a capability needed for merging //! index files. +use crate::{ + error::{FingertipsErrorKind, FingertipsResult}, + write::IndexFileWriter, +}; +use byteorder::{LittleEndian, ReadBytesExt}; use std::fs::{self, File}; use std::io::prelude::*; use std::io::{self, BufReader, SeekFrom}; use std::path::Path; -use byteorder::{LittleEndian, ReadBytesExt}; -use crate::write::IndexFileWriter; /// A `IndexFileReader` does a single linear pass over an index file from /// beginning to end. Needless to say, this is not how an index is normally @@ -30,7 +33,7 @@ pub struct IndexFileReader { /// The next entry in the table of contents, if any; or `None` if we've /// reached the end of the table. `IndexFileReader` always reads ahead one /// entry in the contents and stores it here. - next: Option + next: Option, } /// An entry in the table of contents of an index file. @@ -38,6 +41,7 @@ pub struct IndexFileReader { /// Each entry in the table of contents is small. It consists of a string, the /// `term`; summary information about that term, as used in the corpus (`df`); /// and a pointer to bulkier data that tells more (`offset` and `nbytes`). +#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)] pub struct Entry { /// The term is a word that appears in one or more documents in the corpus. /// The index file contains information about the documents that use this @@ -51,7 +55,7 @@ pub struct Entry { pub offset: u64, /// Length of the index data for this term, in bytes. - pub nbytes: u64 + pub nbytes: u64, } impl IndexFileReader { @@ -62,31 +66,35 @@ impl IndexFileReader { /// from its directory, but it'll still take up space on disk until the /// file is closed, which normally happens when the `IndexFileReader` is /// dropped. - pub fn open_and_delete>(filename: P) -> io::Result { + pub fn open_and_delete>(filename: P) -> io::Result { let filename = filename.as_ref(); let mut main_raw = File::open(filename)?; // Read the file header. let contents_offset = main_raw.read_u64::()?; - println!("opened {}, table of contents starts at {}", filename.display(), contents_offset); + println!( + "opened {}, table of contents starts at {}", + filename.display(), + contents_offset + ); // Open again so we have two read heads; // move the contents read head to its starting position. // Set up buffering. let mut contents_raw = File::open(filename)?; - contents_raw.seek(SeekFrom::Start(contents_offset))?; + let _start = contents_raw.seek(SeekFrom::Start(contents_offset))?; let main = BufReader::new(main_raw); let mut contents = BufReader::new(contents_raw); // We always read ahead one entry, so load the first entry right away. - let first = IndexFileReader::read_entry(&mut contents)?; + let first = Self::read_entry(&mut contents)?; - fs::remove_file(filename)?; // YOLO + fs::remove_file(filename)?; // YOLO - Ok(IndexFileReader { - main: main, - contents: contents, - next: first + Ok(Self { + main, + contents, + next: first, }) } @@ -98,30 +106,30 @@ impl IndexFileReader { // that's considered a success, with no entry read. let offset = match f.read_u64::() { Ok(value) => value, - Err(err) => + Err(err) => { if err.kind() == io::ErrorKind::UnexpectedEof { - return Ok(None) + return Ok(None); } else { - return Err(err) + return Err(err); } + } }; let nbytes = f.read_u64::()?; let df = f.read_u32::()?; let term_len = f.read_u32::()? as usize; - let mut bytes = Vec::with_capacity(term_len); - bytes.resize(term_len, 0); + let mut bytes = vec![0; term_len]; f.read_exact(&mut bytes)?; let term = match String::from_utf8(bytes) { Ok(s) => s, - Err(_) => return Err(io::Error::new(io::ErrorKind::Other, "unicode fail")) + Err(_) => return Err(io::Error::new(io::ErrorKind::Other, "unicode fail")), }; Ok(Some(Entry { - term: term, - df: df, - offset: offset, - nbytes: nbytes + term, + df, + offset, + nbytes, })) } @@ -129,35 +137,41 @@ impl IndexFileReader { /// (Since we always read ahead one entry, this method can't fail.) /// /// Returns `None` if we've reached the end of the file. - pub fn peek(&self) -> Option<&Entry> { self.next.as_ref() } + pub fn peek(&self) -> Option<&Entry> { + self.next.as_ref() + } /// True if the next entry is for the given term. pub fn is_at(&self, term: &str) -> bool { match self.next { Some(ref e) => e.term == term, - None => false + None => false, } } /// Copy the current entry to the specified output stream, then read the /// header for the next entry. - pub fn move_entry_to(&mut self, out: &mut IndexFileWriter) -> io::Result<()> { + pub fn move_entry_to(&mut self, out: &mut IndexFileWriter) -> FingertipsResult<()> { // This block limits the scope of borrowing `self.next` (for `e`), // because after this block is over we'll want to assign to `self.next`. { - let e = self.next.as_ref().expect("no entry to move"); + let e = self + .next + .as_ref() + .ok_or(FingertipsErrorKind::NoEntryToMove)?; if e.nbytes > usize::max_value() as u64 { // This can only happen on 32-bit platforms. - return Err(io::Error::new(io::ErrorKind::Other, - "computer not big enough to hold index entry")); + return Err(FingertipsErrorKind::PlatformLimitExceeded.into()); } - let mut buf = Vec::with_capacity(e.nbytes as usize); - buf.resize(e.nbytes as usize, 0); - self.main.read_exact(&mut buf)?; - out.write_main(&buf)?; + let mut buf = vec![0; e.nbytes as usize]; + self.main + .read_exact(&mut buf) + .map_err(FingertipsErrorKind::Io)?; + out.write_main(&buf).map_err(FingertipsErrorKind::Io)?; } - self.next = Self::read_entry(&mut self.contents)?; + self.next = Self::read_entry(&mut self.contents).map_err(FingertipsErrorKind::Io)?; + Ok(()) } } diff --git a/src/tmp.rs b/src/tmp.rs index 54d4b97..6de1498 100644 --- a/src/tmp.rs +++ b/src/tmp.rs @@ -1,39 +1,41 @@ -use std::io::{self, BufWriter}; use std::fs::{self, File}; +use std::io::{self, BufWriter}; use std::path::{Path, PathBuf}; -#[derive(Clone)] +#[derive(Default, Debug, Clone, PartialEq, Eq)] pub struct TmpDir { dir: PathBuf, - n: usize + n: usize, } impl TmpDir { - pub fn new>(dir: P) -> TmpDir { - TmpDir { + pub fn new>(dir: P) -> Self { + Self { dir: dir.as_ref().to_owned(), - n: 1 + n: 1, } } pub fn create(&mut self) -> io::Result<(PathBuf, BufWriter)> { let mut r#try = 1; loop { - let filename = self.dir.join(PathBuf::from(format!("tmp{:08x}.dat", self.n))); + let filename = self + .dir + .join(PathBuf::from(format!("tmp{:08x}.dat", self.n))); self.n += 1; match fs::OpenOptions::new() - .write(true) - .create_new(true) - .open(&filename) + .write(true) + .create_new(true) + .open(&filename) { - Ok(f) => - return Ok((filename, BufWriter::new(f))), - Err(exc) => + Ok(f) => return Ok((filename, BufWriter::new(f))), + Err(exc) => { if r#try < 999 && exc.kind() == io::ErrorKind::AlreadyExists { // keep going } else { return Err(exc); } + } } r#try += 1; } diff --git a/src/write.rs b/src/write.rs index b42f384..4f10f1e 100644 --- a/src/write.rs +++ b/src/write.rs @@ -1,10 +1,10 @@ -use std::fs::File; -use std::io::{self, BufWriter, SeekFrom}; -use std::io::prelude::*; -use std::path::PathBuf; use crate::index::InMemoryIndex; use crate::tmp::TmpDir; use byteorder::{LittleEndian, WriteBytesExt}; +use std::fs::File; +use std::io::prelude::*; +use std::io::{self, BufWriter, SeekFrom}; +use std::path::PathBuf; /// Writer for saving an index to a binary file. /// @@ -15,6 +15,7 @@ use byteorder::{LittleEndian, WriteBytesExt}; /// An index file has two parts. The main part of the file is a sequence of /// entries, stored back-to-back; the +#[derive(Debug)] pub struct IndexFileWriter { /// The number of bytes written so far. offset: u64, @@ -23,17 +24,17 @@ pub struct IndexFileWriter { writer: BufWriter, /// The table of contents for this file. - contents_buf: Vec + contents_buf: Vec, } impl IndexFileWriter { - pub fn new(mut f: BufWriter) -> io::Result { + pub fn new(mut f: BufWriter) -> io::Result { const HEADER_SIZE: u64 = 8; f.write_u64::(0)?; - Ok(IndexFileWriter { + Ok(Self { offset: HEADER_SIZE, writer: f, - contents_buf: vec![] + contents_buf: vec![], }) } @@ -43,21 +44,34 @@ impl IndexFileWriter { Ok(()) } - pub fn write_contents_entry(&mut self, term: String, df: u32, offset: u64, nbytes: u64) { - self.contents_buf.write_u64::(offset).unwrap(); - self.contents_buf.write_u64::(nbytes).unwrap(); - self.contents_buf.write_u32::(df).unwrap(); + pub fn write_contents_entry( + &mut self, + term: String, + df: u32, + offset: u64, + nbytes: u64, + ) -> io::Result<()> { + self.contents_buf.write_u64::(offset)?; + self.contents_buf.write_u64::(nbytes)?; + self.contents_buf.write_u32::(df)?; let bytes = term.bytes(); - self.contents_buf.write_u32::(bytes.len() as u32).unwrap(); + self.contents_buf + .write_u32::(bytes.len() as u32)?; self.contents_buf.extend(bytes); + + Ok(()) } /// Finish writing the index file and close it. pub fn finish(mut self) -> io::Result<()> { let contents_start = self.offset; self.writer.write_all(&self.contents_buf)?; - println!("{} bytes main, {} bytes total", contents_start, contents_start + self.contents_buf.len() as u64); - self.writer.seek(SeekFrom::Start(0))?; + println!( + "{} bytes main, {} bytes total", + contents_start, + contents_start + self.contents_buf.len() as u64 + ); + let _start = self.writer.seek(SeekFrom::Start(0))?; self.writer.write_u64::(contents_start)?; Ok(()) } @@ -70,7 +84,7 @@ pub fn write_index_to_tmp_file(index: InMemoryIndex, tmp_dir: &mut TmpDir) -> io // The merge algorithm requires the entries within each file to be sorted by term. // Sort before writing anything. let mut index_as_vec: Vec<_> = index.map.into_iter().collect(); - index_as_vec.sort_by(|&(ref a, _), &(ref b, _)| a.cmp(b)); + index_as_vec.sort_by(|(a, _), (b, _)| a.cmp(b)); for (term, hits) in index_as_vec { let df = hits.len() as u32; @@ -79,10 +93,11 @@ pub fn write_index_to_tmp_file(index: InMemoryIndex, tmp_dir: &mut TmpDir) -> io writer.write_main(&buffer)?; } let stop = writer.offset; - writer.write_contents_entry(term, df, start, stop - start); + writer.write_contents_entry(term, df, start, stop - start)?; } writer.finish()?; - println!("wrote file {:?}", filename); + println!("wrote file {filename:?}"); + Ok(filename) }