Skip to content

Commit

Permalink
document the different searchers
Browse files Browse the repository at this point in the history
  • Loading branch information
mikkeldenker committed Dec 5, 2024
1 parent 9a2e98f commit 7e30129
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 96 deletions.
2 changes: 1 addition & 1 deletion crates/core/src/entrypoint/search_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl_search!([
]);

pub struct SearchService {
local_searcher: LocalSearcher<Arc<RwLock<Index>>>,
local_searcher: LocalSearcher,
// dropping the handle leaves the cluster
#[allow(unused)]
cluster_handle: Arc<Cluster>,
Expand Down
8 changes: 8 additions & 0 deletions crates/core/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ impl Index {
})
}

pub fn inverted_index(&self) -> &InvertedIndex {
&self.inverted_index
}

pub fn region_count(&self) -> &Mutex<RegionCount> {
&self.region_count
}

pub fn path(&self) -> PathBuf {
PathBuf::from(&self.path)
}
Expand Down
10 changes: 5 additions & 5 deletions crates/core/src/searcher/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use crate::{
entity_search_server, live_index::LiveIndexService, search_server::{self, RetrieveReq, SearchService}
},
generic_query::{self, Collector},
index::Index,
inverted_index::{RetrievedWebpage, ShardId, WebpagePointer},
ranking::pipeline::{PrecisionRankingWebpage, RecallRankingWebpage},
Result,
Expand All @@ -44,7 +43,7 @@ use futures::{future::join_all, stream::FuturesUnordered, StreamExt};
use itertools::Itertools;
use std::future::Future;
use thiserror::Error;
use tokio::sync::{Mutex, RwLock};
use tokio::sync::Mutex;

use super::{InitialWebsiteResult, LocalSearcher, SearchQuery};

Expand Down Expand Up @@ -284,6 +283,7 @@ impl ReusableClientManager for LiveIndexService {
}
}

/// A searcher that runs the search on a remote cluster.
pub struct DistributedSearcher {
client: Mutex<ReusableShardedClient<SearchService>>,
}
Expand Down Expand Up @@ -584,9 +584,9 @@ impl SearchClient for DistributedSearcher {
}

/// This should only be used for testing and benchmarks.
pub struct LocalSearchClient(LocalSearcher<Arc<RwLock<Index>>>);
impl From<LocalSearcher<Arc<RwLock<Index>>>> for LocalSearchClient {
fn from(searcher: LocalSearcher<Arc<RwLock<Index>>>) -> Self {
pub struct LocalSearchClient(LocalSearcher);
impl From<LocalSearcher> for LocalSearchClient {
fn from(searcher: LocalSearcher) -> Self {
Self(searcher)
}
}
Expand Down
33 changes: 0 additions & 33 deletions crates/core/src/searcher/local/guard.rs

This file was deleted.

44 changes: 21 additions & 23 deletions crates/core/src/searcher/local/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,31 @@

use crate::{
generic_query::{self, GenericQuery},
index::Index,
inverted_index,
ranking::{LocalRanker, SignalComputer},
searcher::InitialWebsiteResult,
Result,
};
use std::sync::Arc;
use tokio::sync::{OwnedRwLockReadGuard, RwLock};

use crate::{
config::CollectorConfig, models::dual_encoder::DualEncoder, query::Query,
ranking::models::linear::LinearRegression, search_ctx::Ctx, searcher::SearchQuery,
};

use super::{InvertedIndexResult, ReadGuard, SearchableIndex};
use super::InvertedIndexResult;

pub struct InnerLocalSearcher<I: SearchableIndex> {
index: I,
pub struct InnerLocalSearcher {
index: Arc<RwLock<Index>>,
linear_regression: Option<Arc<LinearRegression>>,
dual_encoder: Option<Arc<DualEncoder>>,
collector_config: CollectorConfig,
}

impl<I> InnerLocalSearcher<I>
where
I: SearchableIndex,
{
pub fn new(index: I) -> Self {
impl InnerLocalSearcher {
pub fn new(index: Arc<RwLock<Index>>) -> Self {
Self {
index,
linear_regression: None,
Expand All @@ -50,8 +49,8 @@ where
}
}

pub async fn guard(&self) -> I::ReadGuard {
self.index.read_guard().await
pub async fn guard(&self) -> OwnedRwLockReadGuard<Index> {
self.index.clone().read_owned().await
}

pub fn set_linear_model(&mut self, model: LinearRegression) {
Expand All @@ -66,19 +65,19 @@ where
self.collector_config = config;
}

fn parse_query<G: ReadGuard>(
fn parse_query(
&self,
ctx: &Ctx,
guard: &G,
guard: &OwnedRwLockReadGuard<Index>,
query: &SearchQuery,
) -> Result<Query> {
Query::parse(ctx, query, guard.inverted_index())
}

fn ranker<G: ReadGuard>(
fn ranker(
&self,
query: &Query,
guard: &G,
guard: &OwnedRwLockReadGuard<Index>,
de_rank_similar: bool,
computer: SignalComputer,
) -> Result<LocalRanker> {
Expand All @@ -99,10 +98,10 @@ where
.with_offset(query.offset()))
}

fn search_inverted_index<G: ReadGuard>(
fn search_inverted_index(
&self,
ctx: &Ctx,
guard: &G,
guard: &OwnedRwLockReadGuard<Index>,
query: &SearchQuery,
de_rank_similar: bool,
) -> Result<InvertedIndexResult> {
Expand All @@ -112,8 +111,7 @@ where

computer.set_region_count(
guard
.search_index()
.region_count
.region_count()
.lock()
.unwrap_or_else(|e| e.into_inner())
.clone(),
Expand Down Expand Up @@ -149,7 +147,7 @@ where
pub fn search_initial(
&self,
query: &SearchQuery,
guard: &I::ReadGuard,
guard: &OwnedRwLockReadGuard<Index>,
de_rank_similar: bool,
) -> Result<InitialWebsiteResult> {
let query = query.clone();
Expand All @@ -168,7 +166,7 @@ where
&self,
websites: &[inverted_index::WebpagePointer],
query: &str,
guard: &I::ReadGuard,
guard: &OwnedRwLockReadGuard<Index>,
) -> Result<Vec<inverted_index::RetrievedWebpage>> {
let ctx = guard.inverted_index().local_search_ctx();
let query = SearchQuery {
Expand All @@ -183,7 +181,7 @@ where
pub fn search_initial_generic<Q: GenericQuery>(
&self,
query: &Q,
guard: &I::ReadGuard,
guard: &OwnedRwLockReadGuard<Index>,
) -> Result<<Q::Collector as generic_query::Collector>::Fruit> {
guard.inverted_index().search_initial_generic(query)
}
Expand All @@ -192,15 +190,15 @@ where
&self,
query: &Q,
fruit: <Q::Collector as generic_query::Collector>::Fruit,
guard: &I::ReadGuard,
guard: &OwnedRwLockReadGuard<Index>,
) -> Result<Q::IntermediateOutput> {
guard.inverted_index().retrieve_generic(query, fruit)
}

pub fn search_generic<Q: GenericQuery>(
&self,
query: Q,
guard: &I::ReadGuard,
guard: &OwnedRwLockReadGuard<Index>,
) -> Result<Q::Output> {
let fruit = self.search_initial_generic(&query, guard)?;
Ok(Q::merge_results(vec![
Expand Down
46 changes: 12 additions & 34 deletions crates/core/src/searcher/local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

mod guard;
use guard::ReadGuard;
//! The local searcher runs the search against a local index.
mod inner;
use inner::InnerLocalSearcher;
use tokio::sync::{OwnedRwLockReadGuard, RwLock};
use tokio::sync::RwLock;

use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;

use itertools::Itertools;
Expand All @@ -44,29 +42,12 @@ use crate::{inverted_index, Result};
use super::WebsitesResult;
use super::{InitialWebsiteResult, SearchQuery};

pub trait SearchableIndex: Send + Sync + 'static {
type ReadGuard: ReadGuard;

fn read_guard(&self) -> impl Future<Output = Self::ReadGuard>;
}

impl SearchableIndex for Arc<RwLock<Index>> {
type ReadGuard = OwnedRwLockReadGuard<Index>;

async fn read_guard(&self) -> Self::ReadGuard {
self.clone().read_owned().await
}
}

pub struct LocalSearcherBuilder<I: SearchableIndex> {
inner: InnerLocalSearcher<I>,
pub struct LocalSearcherBuilder {
inner: InnerLocalSearcher,
}

impl<I> LocalSearcherBuilder<I>
where
I: SearchableIndex,
{
pub fn new(index: I) -> Self {
impl LocalSearcherBuilder {
pub fn new(index: Arc<RwLock<Index>>) -> Self {
Self {
inner: InnerLocalSearcher::new(index),
}
Expand All @@ -87,22 +68,19 @@ where
self
}

pub fn build(self) -> LocalSearcher<I> {
pub fn build(self) -> LocalSearcher {
LocalSearcher {
inner: Arc::new(self.inner),
}
}
}

pub struct LocalSearcher<I: SearchableIndex> {
inner: Arc<InnerLocalSearcher<I>>,
pub struct LocalSearcher {
inner: Arc<InnerLocalSearcher>,
}

impl<I> LocalSearcher<I>
where
I: SearchableIndex,
{
pub fn builder(index: I) -> LocalSearcherBuilder<I> {
impl LocalSearcher {
pub fn builder(index: Arc<RwLock<Index>>) -> LocalSearcherBuilder {
LocalSearcherBuilder::new(index)
}

Expand Down Expand Up @@ -203,7 +181,7 @@ where
})
}

/// This function is mainly used for tests and benchmarks
/// This function is only used for tests and benchmarks
pub fn search_sync(&self, query: &SearchQuery) -> Result<WebsitesResult> {
crate::block_on(self.search(query))
}
Expand Down
6 changes: 6 additions & 0 deletions crates/core/src/searcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! Searchers are responsible for executing search queries against an index.
//! There are two types of searchers:
//! - [`local::LocalSearcher`] which runs the search on the local machine.
//! - [`distributed::DistributedSearcher`] which runs the search on a remote cluster. Each node
//! will run a local searcher and then the results are merged on the coordinator node.
pub mod api;
pub mod distributed;
pub mod local;
Expand Down

0 comments on commit 7e30129

Please sign in to comment.