Skip to content

Commit

Permalink
progress messages when starting workers
Browse files Browse the repository at this point in the history
  • Loading branch information
mikkeldenker committed Nov 27, 2024
1 parent 9d507de commit 761eba7
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ impl RemoteWorker for RemoteApproxCentralityWorker {
}

pub fn run(config: ApproxHarmonicWorkerConfig) -> Result<()> {
tracing::info!("starting worker");
let tokio_conf = config.clone();

let graph = Webgraph::builder(config.graph_path, config.shard).open()?;
Expand All @@ -138,6 +139,7 @@ pub fn run(config: ApproxHarmonicWorkerConfig) -> Result<()> {
};
crate::start_gossip_cluster_thread(tokio_conf.gossip, Some(service));

tracing::info!("worker ready");
worker.run(config.host)?;

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,16 @@ pub fn run(config: HarmonicCoordinatorConfig) -> Result<()> {
.build()?
.block_on(setup_gossip(tokio_conf))?;

let jobs = cluster
let jobs: Vec<_> = cluster
.workers
.iter()
.map(|worker| CentralityJob {
shard: worker.shard(),
})
.collect();

tracing::info!("starting {} jobs", jobs.len());

let coordinator = build(&cluster.dht, cluster.workers.clone());
let res = coordinator.run(jobs, CentralityFinish)?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub struct CentralityWorker {
changed_nodes: Arc<Mutex<U64BloomFilter>>,
round: AtomicU64,
has_updated_meta_for_round: AtomicBool,
num_nodes: u64,
}

impl CentralityWorker {
Expand All @@ -52,6 +53,7 @@ impl CentralityWorker {
changed_nodes.fill();

Self {
num_nodes,
shard,
graph,
changed_nodes: Arc::new(Mutex::new(changed_nodes)),
Expand Down Expand Up @@ -107,7 +109,7 @@ impl Message<CentralityWorker> for NumNodes {
type Response = u64;

fn handle(self, worker: &CentralityWorker) -> Self::Response {
worker.graph.host_nodes().len() as u64
worker.num_nodes
}
}

Expand Down Expand Up @@ -173,6 +175,7 @@ impl RemoteWorker for RemoteCentralityWorker {
}

pub fn run(config: HarmonicWorkerConfig) -> Result<()> {
tracing::info!("starting worker");
let tokio_conf = config.clone();

let graph = Webgraph::builder(config.graph_path, config.shard).open()?;
Expand All @@ -183,6 +186,7 @@ pub fn run(config: HarmonicWorkerConfig) -> Result<()> {
};
crate::start_gossip_cluster_thread(tokio_conf.gossip, Some(service));

tracing::info!("worker ready");
worker.run(config.host)?;

Ok(())
Expand Down

0 comments on commit 761eba7

Please sign in to comment.