Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve code docs #246

Merged
merged 6 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/core/src/ampc/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use super::{DhtConn, Finisher, Job, JobScheduled, RemoteWorker, Setup, Worker, W
use crate::{distributed::retry_strategy::ExponentialBackoff, Result};
use anyhow::anyhow;

/// A coordinator is responsible for scheduling jobs on workers and coordinating
/// between rounds of computation.
pub struct Coordinator<J>
where
J: Job,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/ampc/dht/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
//! with multiple shards. Each shard cluster
//! is a Raft cluster, and each key is then routed to the correct
//! cluster based on hash(key) % number_of_shards. The keys
//! are currently *not* rebalanced if the number of shards change, so
//! are currently *not* re-balanced if the number of shards change, so
//! if an entire shard becomes unavailable or a new shard is added, all
//! keys in the entire DHT is essentially lost as the
//! keys might hash incorrectly.
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/ampc/dht/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>

use std::collections::BTreeMap;
use std::fmt::Debug;
use std::io::Cursor;
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/ampc/finisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

use super::prelude::Job;

/// A finisher is responsible for determining if the computation is finished
/// or if another round of computation is needed.
pub trait Finisher {
type Job: Job;

Expand Down
1 change: 1 addition & 0 deletions crates/core/src/ampc/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

use super::{prelude::Job, DhtConn};

/// A mapper is the specific computation to be run on the graph.
pub trait Mapper: bincode::Encode + bincode::Decode + Send + Sync + Clone {
type Job: Job<Mapper = Self>;

Expand Down
27 changes: 27 additions & 0 deletions crates/core/src/ampc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,33 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! # Framework for Adaptive Massively Parallel Computation (AMPC).
//!
//! AMPC is a system for implementing large-scale distributed graph algorithms efficiently.
//! It provides a framework for parallel computation across clusters of machines.
//!
//! While similar in concept to MapReduce, AMPC uses a distributed hash table (DHT) as its
//! underlying data structure rather than the traditional map and reduce phases. This key
//! architectural difference enables more flexible and efficient computation patterns.
//!
//! The main advantage over MapReduce is that workers can dynamically access any keys in
//! the DHT during computation. This is in contrast to MapReduce where the keyspace must
//! be statically partitioned between reducers before computation begins. The dynamic
//! access pattern allows for more natural expression of graph algorithms in a distributed
//! setting.
//!
//! This is roughly inspired by
//! [Massively Parallel Graph Computation: From Theory to Practice](https://research.google/blog/massively-parallel-graph-computation-from-theory-to-practice/)
//!
//! ## Key concepts
//!
//! * **DHT**: A distributed hash table is used to store the result of the computation for
//! each round.
//! * **Worker**: A worker owns a subset of the overall graph and is responsible for
//! executing mappers on its portion of the graph and sending results to the DHT.
//! * **Mapper**: A mapper is the specific computation to be run on the graph.
//! * **Coordinator**: The coordinator is responsible for scheduling the jobs on the workers.

use self::{job::Job, worker::WorkerRef};
use crate::distributed::sonic;

Expand Down
12 changes: 12 additions & 0 deletions crates/core/src/ampc/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,24 @@

use super::DhtConn;

/// A setup is responsible for initializing the DHT before each round of computation.
pub trait Setup {
type DhtTables;

/// Setup initial state of the DHT.
fn init_dht(&self) -> DhtConn<Self::DhtTables>;

/// Setup state for a new round.
///
/// This is called once for each round of computation.
/// The first round will run `setup_first_round` first
/// but will still call `setup_round` after that.
#[allow(unused_variables)] // reason = "dht might be used by implementors"
fn setup_round(&self, dht: &Self::DhtTables) {}

/// Setup state for the first round.
///
/// This is called once before the first round of computation.
fn setup_first_round(&self, dht: &Self::DhtTables) {
self.setup_round(dht);
}
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/ampc/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use crate::Result;
use anyhow::anyhow;
use tokio::net::ToSocketAddrs;

/// A worker is responsible for executing a mapper on its portion of the graph and
/// sending results to the DHT.
pub trait Worker: Send + Sync {
type Remote: RemoteWorker<Job = Self::Job>;

Expand Down
13 changes: 11 additions & 2 deletions crates/core/src/crawler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! # Crawler
//!
//! The crawler is responsible for fetching webpages and storing them in WARC files
//! for later processing.
//!
//! Before starting a crawl, a plan needs to be created. This plan is then used by
//! the crawler coordinator to assign sites to crawl to different workers.
//! A site is only assigned to one worker at a time for politeness.

use std::{collections::VecDeque, future::Future, net::SocketAddr, sync::Arc};

type HashMap<K, V> = std::collections::HashMap<K, V, ahash::RandomState>;
Expand All @@ -35,7 +44,7 @@ pub use router::Router;
mod file_queue;
pub mod planner;
pub mod robot_client;
mod wander_prirotiser;
mod wander_prioritiser;
mod warc_writer;
mod worker;

Expand Down Expand Up @@ -304,7 +313,7 @@ impl Crawler {
}
}

pub trait DatumStream: Send + Sync {
pub trait DatumSink: Send + Sync {
fn write(&self, crawl_datum: CrawlDatum) -> impl Future<Output = Result<()>> + Send;
fn finish(&self) -> impl Future<Output = Result<()>> + Send;
}
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/crawler/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use anyhow::{anyhow, Result};
use futures::stream::FuturesOrdered;
use futures::StreamExt;
Expand Down Expand Up @@ -71,6 +72,7 @@ impl From<StoredUrl> for Url {
}
}

/// Store urls in groups on disk based on their harmonic rank.
struct UrlGrouper {
groups: Vec<speedy_kv::Db<StoredUrl, ()>>,
folder: std::path::PathBuf,
Expand Down Expand Up @@ -169,6 +171,7 @@ struct Budget {
remaining_schedulable: u64,
}

/// Create a crawl plan based on the harmonic rank of the hosts.
pub struct CrawlPlanner {
host_centrality: Arc<speedy_kv::Db<NodeID, f64>>,
host_centrality_rank: Arc<speedy_kv::Db<NodeID, u64>>,
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/crawler/robot_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub(super) fn reqwest_client(config: &CrawlerConfig) -> Result<reqwest::Client>
.map_err(|e| Error::from(anyhow!(e)))
}

/// Reqwest client that respects robots.txt for each request.
#[derive(Clone)]
pub struct RobotClient {
robots_txt_manager: RobotsTxtManager,
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/crawler/warc_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
warc,
};

use super::{CrawlDatum, DatumStream, Error, Result};
use super::{CrawlDatum, DatumSink, Error, Result};
use anyhow::anyhow;

/// The WarcWriter is responsible for storing the crawl datums
Expand All @@ -30,7 +30,7 @@ pub struct WarcWriter {
tx: tokio::sync::mpsc::Sender<WarcWriterMessage>,
}

impl DatumStream for WarcWriter {
impl DatumSink for WarcWriter {
async fn write(&self, crawl_datum: CrawlDatum) -> Result<()> {
self.tx
.send(WarcWriterMessage::Crawl(crawl_datum))
Expand Down
9 changes: 5 additions & 4 deletions crates/core/src/crawler/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use crate::{
};

use super::{
encoded_body, robot_client::RobotClient, wander_prirotiser::WanderPrioritiser, CrawlDatum,
DatumStream, Domain, Error, Result, RetrieableUrl, Site, WarcWriter, WeightedUrl, WorkerJob,
encoded_body, robot_client::RobotClient, wander_prioritiser::WanderPrioritiser, CrawlDatum,
DatumSink, Domain, Error, Result, RetrieableUrl, Site, WarcWriter, WeightedUrl, WorkerJob,
MAX_CONTENT_LENGTH, MAX_OUTGOING_URLS_PER_PAGE,
};

Expand Down Expand Up @@ -126,7 +126,8 @@ impl WorkerThread {
}
}

pub struct JobExecutor<S: DatumStream> {
/// JobExecutor receives a job from the coordinator and crawls the urls in the job.
pub struct JobExecutor<S: DatumSink> {
writer: Arc<S>,
client: RobotClient,
has_gotten_429_response: bool,
Expand All @@ -144,7 +145,7 @@ pub struct JobExecutor<S: DatumStream> {
job: WorkerJob,
}

impl<S: DatumStream> JobExecutor<S> {
impl<S: DatumSink> JobExecutor<S> {
pub fn new(
job: WorkerJob,
config: Arc<CrawlerConfig>,
Expand Down
11 changes: 6 additions & 5 deletions crates/core/src/entrypoint/configure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use tokio::fs::File;
use tokio::io;
use tokio_stream::StreamExt;
use tracing::{debug, info};
use tracing::info;

use crate::config::{
defaults, IndexerConfig, IndexerDualEncoderConfig, IndexerGraphConfig, LocalConfig,
Expand Down Expand Up @@ -73,7 +73,7 @@ fn download_files() {
}

fn build_spellchecker() -> Result<()> {
debug!("Building spellchecker");
info!("Building spellchecker");
let spellchecker_path = Path::new(DATA_PATH).join("web_spell");

if !spellchecker_path.exists() {
Expand All @@ -97,7 +97,7 @@ fn build_spellchecker() -> Result<()> {
}

fn create_webgraph() -> Result<()> {
debug!("Creating webgraph");
info!("Creating webgraph");
let out_path = Path::new(DATA_PATH).join("webgraph");

if out_path.exists() {
Expand Down Expand Up @@ -128,7 +128,7 @@ fn create_webgraph() -> Result<()> {
}

fn calculate_centrality() {
debug!("Calculating centrality");
info!("Calculating centrality");
let webgraph_path = Path::new(DATA_PATH).join("webgraph");
let out_path = Path::new(DATA_PATH).join("centrality");

Expand All @@ -144,7 +144,7 @@ fn calculate_centrality() {
}

fn create_inverted_index() -> Result<()> {
debug!("Creating inverted index");
info!("Creating inverted index");
let out_path = Path::new(DATA_PATH).join("index");

if out_path.exists() {
Expand Down Expand Up @@ -209,6 +209,7 @@ fn create_inverted_index() -> Result<()> {
}

fn create_entity_index() -> Result<()> {
info!("Creating entity index");
let out_path = Path::new(DATA_PATH).join("entity");
if out_path.exists() {
std::fs::remove_dir_all(&out_path)?;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/entrypoint/search_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl_search!([
]);

pub struct SearchService {
local_searcher: LocalSearcher<Arc<RwLock<Index>>>,
local_searcher: LocalSearcher,
// dropping the handle leaves the cluster
#[allow(unused)]
cluster_handle: Arc<Cluster>,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/generic_query/get_homepage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl GenericQuery for GetHomepageQuery {
FirstDocCollector::with_shard_id(ctx.shard_id)
}

fn remote_collector(&self) -> Self::Collector {
fn coordinator_collector(&self) -> Self::Collector {
FirstDocCollector::without_shard_id()
}

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/generic_query/get_site_urls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl GenericQuery for GetSiteUrlsQuery {
.disable_offset()
}

fn remote_collector(&self) -> Self::Collector {
fn coordinator_collector(&self) -> Self::Collector {
Self::Collector::new()
.with_limit(self.limit as usize)
.with_offset(self.offset.unwrap_or(0) as usize)
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/generic_query/get_webpage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl GenericQuery for GetWebpageQuery {
FirstDocCollector::with_shard_id(ctx.shard_id)
}

fn remote_collector(&self) -> Self::Collector {
fn coordinator_collector(&self) -> Self::Collector {
FirstDocCollector::without_shard_id()
}

Expand Down
24 changes: 23 additions & 1 deletion crates/core/src/generic_query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,26 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! # Main flow
//! ```md
//! `coordinator` <------> `searcher`
//! -----------------------------------
//! send query to searcher
//! search index
//! collect fruits
//! send fruits to coordinator
//! merge fruits
//! filter fruits
//! for each shard
//! send fruits to searchers
//! construct intermediate output
//! from fruits
//! send intermediate output to coordinator
//! merge intermediate outputs
//! return final output
//! ---------------------------------------------------
//! ```

use crate::{inverted_index::ShardId, search_ctx, Result};

pub mod top_key_phrases;
Expand All @@ -34,6 +54,8 @@ pub use get_site_urls::GetSiteUrlsQuery;
pub mod collector;
pub use collector::Collector;

/// A generic query that can be executed on a searcher
/// against an index.
pub trait GenericQuery: Send + Sync + bincode::Encode + bincode::Decode + Clone {
type Collector: Collector;
type TantivyQuery: tantivy::query::Query;
Expand All @@ -42,7 +64,7 @@ pub trait GenericQuery: Send + Sync + bincode::Encode + bincode::Decode + Clone

fn tantivy_query(&self, ctx: &search_ctx::Ctx) -> Self::TantivyQuery;
fn collector(&self, ctx: &search_ctx::Ctx) -> Self::Collector;
fn remote_collector(&self) -> Self::Collector;
fn coordinator_collector(&self) -> Self::Collector;

fn filter_fruit_shards(
&self,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/generic_query/size.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl GenericQuery for SizeQuery {
SizeCollector::new().with_shard_id(ctx.shard_id)
}

fn remote_collector(&self) -> Self::Collector {
fn coordinator_collector(&self) -> Self::Collector {
SizeCollector::new()
}

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/generic_query/top_key_phrases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl GenericQuery for TopKeyPhrasesQuery {
TopKeyPhrasesCollector::new(self.top_n).with_shard_id(ctx.shard_id)
}

fn remote_collector(&self) -> Self::Collector {
fn coordinator_collector(&self) -> Self::Collector {
TopKeyPhrasesCollector::new(self.top_n)
}

Expand Down
8 changes: 8 additions & 0 deletions crates/core/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ impl Index {
})
}

pub fn inverted_index(&self) -> &InvertedIndex {
&self.inverted_index
}

pub fn region_count(&self) -> &Mutex<RegionCount> {
&self.region_count
}

pub fn path(&self) -> PathBuf {
PathBuf::from(&self.path)
}
Expand Down
Loading
Loading