Skip to content

Commit

Permalink
Static
Browse files Browse the repository at this point in the history
  • Loading branch information
qarmin committed Feb 24, 2025
1 parent fd797d2 commit 179bc52
Show file tree
Hide file tree
Showing 10 changed files with 142 additions and 46 deletions.
27 changes: 14 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions czkawka_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ log = "0.4.22"
handsome_logger = "0.8"
fun_time = { version = "0.3", features = ["log"] }
itertools = "0.14"
static_assertions = "1.1.0"

# Don't update anymore! This crate has a bug. I've submitted a patch upstream, but the change is breaking. The current code relies on the bug to work correctly!
# Warning by CalunVier 2024.7.15
Expand Down
5 changes: 3 additions & 2 deletions czkawka_core/benches/hash_calculation_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::env::temp_dir;
use std::fs::File;
use std::io::Write;
use std::path::PathBuf;
use std::sync::Arc;

use criterion::{Criterion, black_box, criterion_group, criterion_main};
use czkawka_core::duplicate::{DuplicateEntry, HashType, hash_calculation};
Expand Down Expand Up @@ -31,7 +32,7 @@ fn benchmark_hash_calculation_vec<const FILE_SIZE: u64, const BUFFER_SIZE: usize
c.bench_function(&function_name, |b| {
b.iter(|| {
let mut buffer = vec![0u8; BUFFER_SIZE];
hash_calculation(black_box(&mut buffer), black_box(&file_entry), black_box(HashType::Blake3), black_box(u64::MAX)).expect("Failed to calculate hash");
hash_calculation(black_box(&mut buffer), black_box(&file_entry), black_box(HashType::Blake3), &Arc::default(), None).expect("Failed to calculate hash");
});
});
}
Expand All @@ -43,7 +44,7 @@ fn benchmark_hash_calculation_arr<const FILE_SIZE: u64, const BUFFER_SIZE: usize
c.bench_function(&function_name, |b| {
b.iter(|| {
let mut buffer = [0u8; BUFFER_SIZE];
hash_calculation(black_box(&mut buffer), black_box(&file_entry), black_box(HashType::Blake3), black_box(u64::MAX)).expect("Failed to calculate hash");
hash_calculation(black_box(&mut buffer), black_box(&file_entry), black_box(HashType::Blake3), &Arc::default(), None).expect("Failed to calculate hash");
});
});
}
Expand Down
112 changes: 87 additions & 25 deletions czkawka_core/src/duplicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use std::io::{self, Error, ErrorKind};
#[cfg(target_family = "unix")]
use std::os::unix::fs::MetadataExt;
use std::path::{Path, PathBuf};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::{fs, mem};

use crossbeam_channel::{Receiver, Sender};
Expand All @@ -17,6 +18,7 @@ use humansize::{BINARY, format_size};
use log::debug;
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use static_assertions::const_assert;
use xxhash_rust::xxh3::Xxh3;

use crate::common::{WorkContinueStatus, check_if_stop_received, delete_files_custom, prepare_thread_handler_common, send_info_and_wait_for_ending_all_threads};
Expand All @@ -28,8 +30,11 @@ use crate::progress_data::{CurrentStage, ProgressData};

const TEMP_HARDLINK_FILE: &str = "rzeczek.rxrxrxl";

pub const PREHASHING_BUFFER_SIZE: u64 = 1024 * 32;
pub const THREAD_BUFFER_SIZE: usize = 2 * 1024 * 1024;

thread_local! {
static THREAD_BUFFER: RefCell<Vec<u8>> = RefCell::new(vec![0u8; 1024 * 2024]);
static THREAD_BUFFER: RefCell<Vec<u8>> = RefCell::new(vec![0u8; THREAD_BUFFER_SIZE]);
}

#[derive(PartialEq, Eq, Clone, Debug, Copy, Default)]
Expand Down Expand Up @@ -565,12 +570,15 @@ impl DuplicateFinder {
if check_if_stop_received(stop_receiver) {
return WorkContinueStatus::Stop;
}
let (progress_thread_handle, progress_thread_run, items_counter, check_was_stopped, _size_counter) = prepare_thread_handler_common(
let (progress_thread_handle, progress_thread_run, items_counter, check_was_stopped, size_counter) = prepare_thread_handler_common(
progress_sender,
CurrentStage::DuplicatePreHashing,
non_cached_files_to_check.values().map(Vec::len).sum(),
self.get_test_type(),
0,
non_cached_files_to_check
.iter()
.map(|(size, items)| items.len() as u64 * PREHASHING_BUFFER_SIZE.min(*size))
.sum::<u64>(),
);

debug!("Starting calculating prehash");
Expand All @@ -588,20 +596,25 @@ impl DuplicateFinder {
}
THREAD_BUFFER.with_borrow_mut(|buffer| {
for mut file_entry in vec_file_entry {
match hash_calculation(buffer, &file_entry, check_type, 1024 * 32) {
match hash_calculation_limit(buffer, &file_entry, check_type, PREHASHING_BUFFER_SIZE, &size_counter) {
Ok(hash_string) => {
file_entry.hash = hash_string.clone();
hashmap_with_hash.entry(hash_string).or_default().push(file_entry);
}
Err(s) => errors.push(s),
}
if check_if_stop_received(stop_receiver) {
check_was_stopped.store(true, Ordering::Relaxed);
return;
}
}
});

Some((size, hashmap_with_hash, errors))
})
.while_some()
.collect();

debug!("Completed calculating prehash");

send_info_and_wait_for_ending_all_threads(&progress_thread_run, progress_thread_handle);
Expand Down Expand Up @@ -780,19 +793,20 @@ impl DuplicateFinder {
return WorkContinueStatus::Stop;
}

let (progress_thread_handle, progress_thread_run, items_counter, check_was_stopped, _size_counter) = prepare_thread_handler_common(
let (progress_thread_handle, progress_thread_run, items_counter, check_was_stopped, size_counter) = prepare_thread_handler_common(
progress_sender,
CurrentStage::DuplicateFullHashing,
non_cached_files_to_check.values().map(Vec::len).sum(),
self.get_test_type(),
0,
non_cached_files_to_check.iter().map(|(size, items)| (*size) * items.len() as u64).sum::<u64>(),
);

let check_type = self.get_params().hash_type;
debug!("Starting full hashing of {} files", non_cached_files_to_check.values().map(Vec::len).sum::<usize>());
let mut full_hash_results: Vec<(u64, BTreeMap<String, Vec<DuplicateEntry>>, Vec<String>)> = non_cached_files_to_check
.into_par_iter()
.map(|(size, vec_file_entry)| {
let size_counter = size_counter.clone();
let mut hashmap_with_hash: BTreeMap<String, Vec<DuplicateEntry>> = Default::default();
let mut errors: Vec<String> = Vec::new();
let mut exam_stopped = false;
Expand All @@ -806,10 +820,15 @@ impl DuplicateFinder {
break;
}

match hash_calculation(buffer, &file_entry, check_type, u64::MAX) {
match hash_calculation(buffer, &file_entry, check_type, &size_counter, stop_receiver) {
Ok(hash_string) => {
file_entry.hash = hash_string.clone();
hashmap_with_hash.entry(hash_string.clone()).or_default().push(file_entry);
if let Some(hash_string) = hash_string {
file_entry.hash = hash_string.clone();
hashmap_with_hash.entry(hash_string).or_default().push(file_entry);
} else {
exam_stopped = true;
break;
}
}
Err(s) => errors.push(s),
}
Expand Down Expand Up @@ -1302,28 +1321,59 @@ pub trait MyHasher {
fn finalize(&self) -> String;
}

pub fn hash_calculation(buffer: &mut [u8], file_entry: &DuplicateEntry, hash_type: HashType, limit: u64) -> Result<String, String> {
pub fn hash_calculation_limit(buffer: &mut [u8], file_entry: &DuplicateEntry, hash_type: HashType, limit: u64, size_counter: &Arc<AtomicU64>) -> Result<String, String> {
// This function is used only to calculate hash of file with limit
// We must ensure that buffer is big enough to store all data
// We don't need to check that each time
const_assert!(PREHASHING_BUFFER_SIZE <= THREAD_BUFFER_SIZE as u64);

let mut file_handler = match File::open(&file_entry.path) {
Ok(t) => t,
Err(e) => return Err(format!("Unable to check hash of file {:?}, reason {e}", file_entry.path)),
Err(e) => {
size_counter.fetch_add(limit, Ordering::Relaxed);
return Err(format!("Unable to check hash of file {:?}, reason {e}", file_entry.path));
}
};
let hasher = &mut *hash_type.hasher();
let n = match file_handler.read(&mut buffer[..limit as usize]) {
Ok(t) => t,
Err(e) => return Err(format!("Error happened when checking hash of file {:?}, reason {}", file_entry.path, e)),
};

hasher.update(&buffer[..n]);
size_counter.fetch_add(n as u64, Ordering::Relaxed);
Ok(hasher.finalize())
}

pub fn hash_calculation(
buffer: &mut [u8],
file_entry: &DuplicateEntry,
hash_type: HashType,
size_counter: &Arc<AtomicU64>,
stop_receiver: Option<&Receiver<()>>,
) -> Result<Option<String>, String> {
let mut file_handler = match File::open(&file_entry.path) {
Ok(t) => t,
Err(e) => {
size_counter.fetch_add(file_entry.size, Ordering::Relaxed);
return Err(format!("Unable to check hash of file {:?}, reason {e}", file_entry.path));
}
};
let hasher = &mut *hash_type.hasher();
let mut current_file_read_bytes: u64 = 0;
loop {
let n = match file_handler.read(buffer) {
Ok(0) => break,
Ok(t) => t,
Err(e) => return Err(format!("Error happened when checking hash of file {:?}, reason {}", file_entry.path, e)),
};

current_file_read_bytes += n as u64;
hasher.update(&buffer[..n]);

if current_file_read_bytes >= limit {
break;
size_counter.fetch_add(n as u64, Ordering::Relaxed);
if check_if_stop_received(stop_receiver) {
return Ok(None);
}
}
Ok(hasher.finalize())
Ok(Some(hasher.finalize()))
}

impl MyHasher for blake3::Hasher {
Expand Down Expand Up @@ -1464,26 +1514,38 @@ mod tests {
let mut buf = [0u8; 1 << 10];
let src = dir.path().join("a");
let mut file = File::create(&src)?;
file.write_all(b"aa")?;
file.write_all(b"aaAAAAAAAAAAAAAAFFFFFFFFFFFFFFFFFFFFGGGGGGGGG")?;
let e = DuplicateEntry { path: src, ..Default::default() };
let r = hash_calculation(&mut buf, &e, HashType::Blake3, 0).expect("hash_calculation failed");
let size_counter = Arc::new(AtomicU64::new(0));
let r = hash_calculation(&mut buf, &e, HashType::Blake3, &size_counter, None)
.expect("hash_calculation failed")
.expect("hash_calculation returned None");
assert!(!r.is_empty());
assert_eq!(size_counter.load(Ordering::Relaxed), 45);
Ok(())
}

#[test]
fn test_hash_calculation_limit() -> io::Result<()> {
let dir = tempfile::Builder::new().tempdir()?;
let mut buf = [0u8; 1];
let mut buf = [0u8; 1000];
let src = dir.path().join("a");
let mut file = File::create(&src)?;
file.write_all(b"aa")?;
let e = DuplicateEntry { path: src, ..Default::default() };
let r1 = hash_calculation(&mut buf, &e, HashType::Blake3, 1).expect("hash_calculation failed");
let r2 = hash_calculation(&mut buf, &e, HashType::Blake3, 2).expect("hash_calculation failed");
let r3 = hash_calculation(&mut buf, &e, HashType::Blake3, u64::MAX).expect("hash_calculation failed");
let size_counter_1 = Arc::new(AtomicU64::new(0));
let size_counter_2 = Arc::new(AtomicU64::new(0));
let size_counter_3 = Arc::new(AtomicU64::new(0));
let r1 = hash_calculation_limit(&mut buf, &e, HashType::Blake3, 1, &size_counter_1).expect("hash_calculation failed");
let r2 = hash_calculation_limit(&mut buf, &e, HashType::Blake3, 2, &size_counter_2).expect("hash_calculation failed");
let r3 = hash_calculation_limit(&mut buf, &e, HashType::Blake3, 1000, &size_counter_3).expect("hash_calculation failed");
assert_ne!(r1, r2);
assert_eq!(r2, r3);

assert_eq!(1, size_counter_1.load(Ordering::Relaxed));
assert_eq!(2, size_counter_2.load(Ordering::Relaxed));
assert_eq!(2, size_counter_3.load(Ordering::Relaxed));

Ok(())
}

Expand All @@ -1493,7 +1555,7 @@ mod tests {
let mut buf = [0u8; 1 << 10];
let src = dir.path().join("a");
let e = DuplicateEntry { path: src, ..Default::default() };
let r = hash_calculation(&mut buf, &e, HashType::Blake3, 0).expect_err("hash_calculation succeeded");
let r = hash_calculation(&mut buf, &e, HashType::Blake3, &Arc::default(), None).expect_err("hash_calculation succeeded");
assert!(!r.is_empty());
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions czkawka_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#[macro_use]
extern crate bitflags;
extern crate core;

pub mod big_file;
pub mod broken_files;
Expand Down
10 changes: 10 additions & 0 deletions czkawka_core/src/progress_data.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use log::error;

use crate::common_dir_traversal::{CheckingMethod, ToolType};
// Empty files
// 0 - Collecting files
Expand Down Expand Up @@ -123,6 +125,14 @@ impl ProgressData {
self.sstage
);
}

// This could be an assert, but it is possible that in duplicate finder, file that will
// be checked, will increase the size of the file between collecting file to scan and
// scanning it. So it is better to just log it
if self.bytes_checked <= self.bytes_to_check {
error!("Bytes checked: {}, bytes to check: {}, stage {:?}", self.bytes_checked, self.bytes_to_check, self.sstage);
};

let tool_type_checking_method: Option<ToolType> = match self.checking_method {
CheckingMethod::AudioTags | CheckingMethod::AudioContent => Some(ToolType::SameMusic),
CheckingMethod::Name | CheckingMethod::SizeName | CheckingMethod::Size | CheckingMethod::Hash => Some(ToolType::Duplicate),
Expand Down
Loading

0 comments on commit 179bc52

Please sign in to comment.