From eb3f68d92661ab02199adb940e86cf0d153779b7 Mon Sep 17 00:00:00 2001 From: dafnapension Date: Sat, 8 Feb 2025 17:57:23 +0200 Subject: [PATCH 1/5] try with lazy loader, although encountered loading problems also on vanila (identical to main, just different printouts in compare bluebench results) from github, and after bluebench profile ran OK on my dev env Signed-off-by: dafnapension --- .../compare_benchmark_performance_results.py | 4 +- src/unitxt/loaders.py | 314 +++++++++++------- utils/.secrets.baseline | 4 +- 3 files changed, 198 insertions(+), 124 deletions(-) diff --git a/performance/compare_benchmark_performance_results.py b/performance/compare_benchmark_performance_results.py index d2e33495e2..5765727d4e 100644 --- a/performance/compare_benchmark_performance_results.py +++ b/performance/compare_benchmark_performance_results.py @@ -70,8 +70,8 @@ "\n**Warning**: Performance degradation in Dataset Generation and/or Evaluation exceeds 5%!" ) print( - "Explore branch performance via 'python performance/bluebench_profiler.py --output_file='," - "followed by 'snakeviz '." + "Explore branch performance via 'python performance/bluebench_profiler.py --output_file=``path to json file``'," + "followed by 'snakeviz ``the performance.prof file specified in the output json file``'." ) sys.exit(1) diff --git a/src/unitxt/loaders.py b/src/unitxt/loaders.py index 4f570be75b..86740d225a 100644 --- a/src/unitxt/loaders.py +++ b/src/unitxt/loaders.py @@ -43,6 +43,7 @@ from typing import ( Any, Dict, + Generator, Iterable, List, Literal, @@ -54,7 +55,13 @@ import pandas as pd import requests -from datasets import DatasetDict, DownloadConfig, IterableDatasetDict +from datasets import ( + Dataset, + DatasetDict, + IterableDataset, + IterableDatasetDict, + load_dataset_builder, +) from datasets import load_dataset as hf_load_dataset from huggingface_hub import HfApi from tqdm import tqdm @@ -62,6 +69,7 @@ from .dataclass import OptionalField from .error_utils import UnitxtError from .fusion import FixedFusion +from .generator_utils import ReusableGenerator from .logging_utils import get_logger from .operator import SourceOperator from .operators import Set @@ -90,11 +98,13 @@ class Loader(SourceOperator): loader_limit: Optional integer to specify a limit on the number of records to load. streaming: Bool indicating if streaming should be used. num_proc: Optional integer to specify the number of processes to use for parallel dataset loading. Adjust the value according to the number of CPU cores available and the specific needs of your processing task. + splits: if not None specifies the names of all splits in the dataset. Some (or all) of these splits will be actually loaded, determined by the application's need. """ loader_limit: int = None streaming: bool = False num_proc: int = None + splits: Optional[List[str]] = None # class level shared cache: _loader_cache = LRUCache(max_size=settings.loader_cache_size) @@ -166,6 +176,8 @@ def load_data(self) -> MultiStream: raise UnitxtError(f"Error in loader:\n{self}") from e self.__class__._loader_cache.max_size = settings.loader_cache_size self.__class__._loader_cache[str(self)] = iterables + if isoftype(iterables, Dict[str, ReusableGenerator]): + return MultiStream.from_generators(iterables, copying=True) return MultiStream.from_iterables(iterables, copying=True) def process(self) -> MultiStream: @@ -226,83 +238,113 @@ def verify(self): self._requirements_list.append(requirement) super().verify() - def filter_load(self, dataset: DatasetDict): + def log_filter_load(self): if not settings.allow_unverified_code: raise ValueError( f"{self.__class__.__name__} cannot run use filtering_lambda expression without setting unitxt.settings.allow_unverified_code=True or by setting environment variable: UNITXT_ALLOW_UNVERIFIED_CODE=True." ) logger.info(f"\nLoading filtered by: {self.filtering_lambda};") - return dataset.filter(eval(self.filtering_lambda)) def is_streaming(self) -> bool: if self.streaming is None: return settings.stream_hf_datasets_by_default return self.streaming - def stream_dataset(self): - with tempfile.TemporaryDirectory() as dir_to_be_deleted: - if settings.disable_hf_datasets_cache and not self.is_streaming(): - cache_dir = dir_to_be_deleted - else: - cache_dir = None + def limit_dataset( + self, + limit: int, + dataset: Union[Dataset, IterableDataset, DatasetDict, IterableDatasetDict], + ) -> Union[Dataset, IterableDataset, DatasetDict, IterableDatasetDict]: + if isinstance(dataset, (DatasetDict, IterableDatasetDict)): + for split_name in dataset: + try: + split_limit = min(limit, len(dataset[split_name])) + except: + split_limit = limit + dataset[split_name] = dataset[split_name].take(split_limit) + else: try: - dataset = hf_load_dataset( - self.path, - name=self.name, - data_dir=self.data_dir, - data_files=self.data_files, - revision=self.revision, - streaming=self.is_streaming(), - cache_dir=cache_dir, - split=self.split, - trust_remote_code=settings.allow_unverified_code, - num_proc=self.num_proc, - download_config=DownloadConfig( - max_retries=settings.loaders_max_retries - ), - ) - except ValueError as e: - if "trust_remote_code" in str(e): - raise ValueError( - f"{self.__class__.__name__} cannot run remote code from huggingface without setting unitxt.settings.allow_unverified_code=True or by setting environment variable: UNITXT_ALLOW_UNVERIFIED_CODE." - ) from e - raise e - - if self.split is not None: - dataset = {self.split: dataset} - - if self.filtering_lambda is not None: - dataset = self.filter_load(dataset) + split_limit = min(limit, len(dataset)) + except: + split_limit = limit + dataset = dataset.take(split_limit) return dataset - def load_dataset(self): - with tempfile.TemporaryDirectory() as dir_to_be_deleted: - if settings.disable_hf_datasets_cache: - cache_dir = dir_to_be_deleted - else: - cache_dir = None - try: - dataset = hf_load_dataset( - self.path, - name=self.name, - data_dir=self.data_dir, - data_files=self.data_files, - streaming=False, - keep_in_memory=True, - cache_dir=cache_dir, - split=self.split, - trust_remote_code=settings.allow_unverified_code, - num_proc=self.num_proc, - ) - except ValueError as e: - if "trust_remote_code" in str(e): - raise ValueError( - f"{self.__class__.__name__} cannot run remote code from huggingface without setting unitxt.settings.allow_unverified_code=True or by setting environment variable: UNITXT_ALLOW_UNVERIFIED_CODE." - ) from e + # returns Dict when split names are not known in advance, and just the the single split dataset - if known + # flake8: noqa: C901 + def load_dataset( + self, split: str, streaming=None, disable_memory_caching=False + ) -> Union[IterableDatasetDict, IterableDataset, Dataset, DatasetDict]: + dataset = self.__class__._loader_cache.get(str(self) + "_" + str(split), None) + if dataset is None: + if streaming is None: + streaming = self.is_streaming() + + # try to optimize when not too dangerous + if self.get_limit() <= 100: + streaming = True + + with tempfile.TemporaryDirectory() as dir_to_be_deleted: + if settings.disable_hf_datasets_cache: + cache_dir = dir_to_be_deleted + else: + cache_dir = None + kwargs = { + "path": self.path, + "name": self.name, + "data_dir": self.data_dir, + "data_files": self.data_files, + "revision": self.revision, + "streaming": streaming, + "keep_in_memory": True, + "cache_dir": cache_dir, + "verification_mode": "no_checks", + "split": split, + "trust_remote_code": settings.allow_unverified_code, + "num_proc": self.num_proc, + } + try: + # load the dataset and verify that it is useful + dataset = hf_load_dataset(**kwargs) + if isinstance(dataset, (Dataset, IterableDataset)): + next(iter(dataset)) + else: + for k in dataset.keys(): + next(iter(dataset[k])) + break - if self.split is not None: - dataset = {self.split: dataset} + except: + try: + current_streaming = kwargs["streaming"] + logger.info( + f"needed to swap streaming from {current_streaming} to {not current_streaming} for path {self.path}" + ) + # try the opposite way of streaming + kwargs["streaming"] = not kwargs["streaming"] + dataset = hf_load_dataset(**kwargs) + if isinstance(dataset, (Dataset, IterableDataset)): + next(iter(dataset)) + else: + for k in dataset.keys(): + next(iter(dataset[k])) + break + + except ValueError as e: + if "trust_remote_code" in str(e): + raise ValueError( + f"{self.__class__.__name__} cannot run remote code from huggingface without setting unitxt.settings.allow_unverified_code=True or by setting environment variable: UNITXT_ALLOW_UNVERIFIED_CODE." + ) from e + + if self.filtering_lambda is not None: + dataset = dataset.filter(eval(self.filtering_lambda)) + + if self.get_limit(): + dataset = self.limit_dataset(self.get_limit(), dataset) + + if not disable_memory_caching: + self.__class__._loader_cache.max_size = settings.loader_cache_size + self.__class__._loader_cache[str(self) + "_" + str(split)] = dataset return dataset @@ -317,31 +359,54 @@ def _maybe_set_classification_policy(self): None, # No warning when loading from public hub ) - def load_iterables(self) -> IterableDatasetDict: + def get_splits(self) -> List[str]: + if self.splits is not None: + return self.splits + # do a minimal effort to get the split names. Less than downloading the whole dataset, which will + # be done anyhow, when splits are not known. try: - dataset = self.stream_dataset() - except ( - NotImplementedError - ): # streaming is not supported for zipped files so we load without streaming - dataset = self.load_dataset() - + ds_builder = load_dataset_builder( + self.path, self.name, trust_remote_code=settings.allow_unverified_code + ) + dataset_info = ds_builder.info + if dataset_info.splits is not None: + # split names are known before the split themselves are pulled from HF, + # and we can postpone that pulling of the splits until actually demanded + return list(dataset_info.splits.keys()) + return None + except: + return None + + def load_iterables( + self + ) -> Union[Dict[str, ReusableGenerator], IterableDatasetDict, DatasetDict]: + # log once for all splits, as they are limited the same + if self.get_limit() is not None: + self.log_limited_loading() if self.filtering_lambda is not None: - dataset = self.filter_load(dataset) + self.log_filter_load() - limit = self.get_limit() - if limit is not None: - self.log_limited_loading() - result = {} - for split_name in dataset: - try: - split_limit = min(limit, len(dataset[split_name])) - except: - split_limit = limit - result[split_name] = dataset[split_name].take(split_limit) + if self.split is not None: + splits = [self.split] + elif self.splits is not None: + splits = self.splits + else: + splits = self.get_splits() - return result + if splits is not None: + return { + split: ReusableGenerator( + self.split_generator, gen_kwargs={"split": split} + ) + for split in splits + } - return dataset + # split names are unknown, have to download all the splits of the dataset + return self.load_dataset(split=None) + + def split_generator(self, split: str) -> Generator: + dataset = self.load_dataset(split=split) + yield from dataset class LoadCSV(Loader): @@ -366,7 +431,6 @@ class LoadCSV(Loader): files: Dict[str, str] chunksize: int = 1000 - loader_limit: Optional[int] = None streaming: bool = True sep: str = "," compression: Optional[str] = None @@ -399,33 +463,25 @@ def get_args(self): return args def load_iterables(self): - for attempt in range(settings.loaders_max_retries): - try: - iterables = {} - import fsspec + # log once for the whole data + if self.get_limit() is not None: + self.log_limited_loading() + iterables = {} + for split_name in self.files.keys(): + iterables[split_name] = ReusableGenerator( + self.split_generator, gen_kwargs={"split": split_name} + ) + return iterables - for split_name, file_path in self.files.items(): - reader = self.get_reader() - if self.get_limit() is not None: - self.log_limited_loading() + def split_generator(self, split: str) -> Generator: + dataset = self.__class__._loader_cache.get(str(self) + "_" + split, None) + if dataset is None: + reader = self.get_reader() + dataset = reader(self.files[split], **self.get_args()).to_dict("records") + self.__class__._loader_cache.max_size = settings.loader_cache_size + self.__class__._loader_cache[str(self) + "_" + split] = dataset - try: - iterables[split_name] = reader( - file_path, **self.get_args() - ).to_dict("records") - except ValueError: - with fsspec.open(file_path, mode="rt") as f: - iterables[split_name] = reader( - f, **self.get_args() - ).to_dict("records") - return iterables - except Exception as e: - logger.debug(f"Attempt csv load {attempt + 1} failed: {e}") - if attempt < settings.loaders_max_retries - 1: - time.sleep(2) - else: - raise e - raise RuntimeError() + yield from dataset class LoadFromSklearn(Loader): @@ -435,7 +491,6 @@ class LoadFromSklearn(Loader): Args: dataset_name: The name of the sklearn dataset to fetch. - splits: A list of data splits to load, e.g., ['train', 'test']. Example: Loading form sklearn @@ -446,12 +501,17 @@ class LoadFromSklearn(Loader): """ dataset_name: str - splits: List[str] = ["train", "test"] _requirements_list: List[str] = ["scikit-learn", "pandas"] data_classification_policy = ["public"] + splits = [ + "test", + "train", + "all", + ] # from reading the code .../sklearn/datasets/_twenty_newsgroups.py + def verify(self): super().verify() @@ -465,14 +525,24 @@ def prepare(self): self.downloader = getattr(sklearn_datatasets, f"fetch_{self.dataset_name}") def load_iterables(self): - with TemporaryDirectory() as temp_directory: - for split in self.splits: - split_data = self.downloader(subset=split) - targets = [split_data["target_names"][t] for t in split_data["target"]] - df = pd.DataFrame([split_data["data"], targets]).T - df.columns = ["data", "target"] - df.to_csv(os.path.join(temp_directory, f"{split}.csv"), index=None) - return hf_load_dataset(temp_directory, streaming=False) + return { + split_name: ReusableGenerator( + self.split_generator, gen_kwargs={"split": split_name} + ) + for split_name in self.splits + } + + def split_generator(self, split: str) -> Generator: + dataset = self.__class__._loader_cache.get(str(self) + "_" + split, None) + if dataset is None: + split_data = self.downloader(subset=split) + targets = [split_data["target_names"][t] for t in split_data["target"]] + df = pd.DataFrame([split_data["data"], targets]).T + df.columns = ["data", "target"] + dataset = df.to_dict("records") + self.__class__._loader_cache.max_size = settings.loader_cache_size + self.__class__._loader_cache[str(self) + "_" + split] = dataset + yield from dataset class MissingKaggleCredentialsError(ValueError): @@ -853,7 +923,7 @@ class LoadFromHFSpace(LoadHF): token_env: Optional[str] = None requirements_list: List[str] = ["huggingface_hub"] - streaming: bool = True + streaming = True def _get_token(self) -> Optional[Union[bool, str]]: if self.token_env: @@ -900,6 +970,8 @@ def _download_data(self) -> str: if isinstance(self.data_files, str): data_files = [self.data_files] elif isinstance(self.data_files, Mapping): + if self.splits is None: + self.splits = sorted(self.data_files.keys()) data_files = list(self.data_files.values()) else: data_files = self.data_files @@ -983,6 +1055,9 @@ def _maybe_set_classification_policy(self): def load_data(self): self._map_wildcard_path_to_full_paths() self.path = self._download_data() + if self.splits is None and isinstance(self.data_files, dict): + self.splits = sorted(self.data_files.keys()) + return super().load_data() @@ -1016,7 +1091,6 @@ class LoadFromAPI(Loader): urls: Dict[str, str] chunksize: int = 100000 - loader_limit: Optional[int] = None streaming: bool = False api_key_env_var: str = "SQL_API_KEY" headers: Optional[Dict[str, Any]] = None diff --git a/utils/.secrets.baseline b/utils/.secrets.baseline index a8bb505111..cd84f16a87 100644 --- a/utils/.secrets.baseline +++ b/utils/.secrets.baseline @@ -151,7 +151,7 @@ "filename": "src/unitxt/loaders.py", "hashed_secret": "840268f77a57d5553add023cfa8a4d1535f49742", "is_verified": false, - "line_number": 560, + "line_number": 629, "is_secret": false } ], @@ -184,5 +184,5 @@ } ] }, - "generated_at": "2025-02-09T13:52:43Z" + "generated_at": "2025-02-08T16:10:49Z" } From 40943ad47995027e0233e2cfe8b8b05fa196c4f8 Mon Sep 17 00:00:00 2001 From: dafnapension Date: Sun, 9 Feb 2025 13:21:52 +0200 Subject: [PATCH 2/5] combined with Elron's loaders.py in branch lazy_loadHF and follow his fix of LoadCSV Signed-off-by: dafnapension --- src/unitxt/loaders.py | 83 +++++++++++++++++++++++++---------------- utils/.secrets.baseline | 4 +- 2 files changed, 52 insertions(+), 35 deletions(-) diff --git a/src/unitxt/loaders.py b/src/unitxt/loaders.py index 86740d225a..2eaee130cd 100644 --- a/src/unitxt/loaders.py +++ b/src/unitxt/loaders.py @@ -67,7 +67,7 @@ from tqdm import tqdm from .dataclass import OptionalField -from .error_utils import UnitxtError +from .error_utils import UnitxtError, UnitxtWarning from .fusion import FixedFusion from .generator_utils import ReusableGenerator from .logging_utils import get_logger @@ -227,7 +227,7 @@ class LoadHF(Loader): Union[str, Sequence[str], Mapping[str, Union[str, Sequence[str]]]] ] = None revision: Optional[str] = None - streaming: bool = None + streaming = None filtering_lambda: Optional[str] = None num_proc: Optional[int] = None requirements_list: List[str] = OptionalField(default_factory=list) @@ -314,27 +314,25 @@ def load_dataset( next(iter(dataset[k])) break - except: - try: - current_streaming = kwargs["streaming"] - logger.info( - f"needed to swap streaming from {current_streaming} to {not current_streaming} for path {self.path}" - ) - # try the opposite way of streaming - kwargs["streaming"] = not kwargs["streaming"] - dataset = hf_load_dataset(**kwargs) - if isinstance(dataset, (Dataset, IterableDataset)): - next(iter(dataset)) - else: - for k in dataset.keys(): - next(iter(dataset[k])) - break - - except ValueError as e: - if "trust_remote_code" in str(e): - raise ValueError( - f"{self.__class__.__name__} cannot run remote code from huggingface without setting unitxt.settings.allow_unverified_code=True or by setting environment variable: UNITXT_ALLOW_UNVERIFIED_CODE." - ) from e + except Exception as e: + if e is ValueError and "trust_remote_code" in str(e): + raise ValueError( + f"{self.__class__.__name__} cannot run remote code from huggingface without setting unitxt.settings.allow_unverified_code=True or by setting environment variable: UNITXT_ALLOW_UNVERIFIED_CODE." + ) from e + + current_streaming = kwargs["streaming"] + logger.info( + f"needed to swap streaming from {current_streaming} to {not current_streaming} for path {self.path}" + ) + # try the opposite way of streaming + kwargs["streaming"] = not kwargs["streaming"] + dataset = hf_load_dataset(**kwargs) + if isinstance(dataset, (Dataset, IterableDataset)): + next(iter(dataset)) + else: + for k in dataset.keys(): + next(iter(dataset[k])) + break if self.filtering_lambda is not None: dataset = dataset.filter(eval(self.filtering_lambda)) @@ -373,6 +371,9 @@ def get_splits(self) -> List[str]: # split names are known before the split themselves are pulled from HF, # and we can postpone that pulling of the splits until actually demanded return list(dataset_info.splits.keys()) + UnitxtWarning( + f'LoadHF(path="{self.path}", name="{self.name}") could not retrieve split names without loading the dataset. Consider defining "splits" in the LoadHF definition to improve loading time.' + ) return None except: return None @@ -431,7 +432,7 @@ class LoadCSV(Loader): files: Dict[str, str] chunksize: int = 1000 - streaming: bool = True + streaming = True sep: str = "," compression: Optional[str] = None lines: Optional[bool] = None @@ -463,7 +464,7 @@ def get_args(self): return args def load_iterables(self): - # log once for the whole data + # log once for all splits of the data and all attempts to read each of them if self.get_limit() is not None: self.log_limited_loading() iterables = {} @@ -476,11 +477,29 @@ def load_iterables(self): def split_generator(self, split: str) -> Generator: dataset = self.__class__._loader_cache.get(str(self) + "_" + split, None) if dataset is None: - reader = self.get_reader() - dataset = reader(self.files[split], **self.get_args()).to_dict("records") - self.__class__._loader_cache.max_size = settings.loader_cache_size - self.__class__._loader_cache[str(self) + "_" + split] = dataset + import fsspec + reader = self.get_reader() + for attempt in range(settings.loaders_max_retries): + try: + dataset = reader(self.files[split], **self.get_args()).to_dict( + "records" + ) + next(iter(dataset)) + break + except ValueError: + with fsspec.open(self.files[split], mode="rt") as f: + dataset = reader(f, **self.get_args()).to_dict("records") + next(iter(dataset)) + break + except Exception as e: + logger.debug(f"Attempt csv load {attempt + 1} failed: {e}") + if attempt < settings.loaders_max_retries - 1: + time.sleep(2) + else: + raise e + self.__class__._loader_cache.max_size = settings.loader_cache_size + self.__class__._loader_cache[str(self) + "_" + split] = dataset yield from dataset @@ -915,9 +934,9 @@ class LoadFromHFSpace(LoadHF): ) """ + path = None space_name: str data_files: Union[str, Sequence[str], Mapping[str, Union[str, Sequence[str]]]] - path: Optional[str] = None revision: Optional[str] = None use_token: Optional[bool] = None token_env: Optional[str] = None @@ -1055,8 +1074,6 @@ def _maybe_set_classification_policy(self): def load_data(self): self._map_wildcard_path_to_full_paths() self.path = self._download_data() - if self.splits is None and isinstance(self.data_files, dict): - self.splits = sorted(self.data_files.keys()) return super().load_data() @@ -1091,7 +1108,7 @@ class LoadFromAPI(Loader): urls: Dict[str, str] chunksize: int = 100000 - streaming: bool = False + streaming = False api_key_env_var: str = "SQL_API_KEY" headers: Optional[Dict[str, Any]] = None data_field: str = "data" diff --git a/utils/.secrets.baseline b/utils/.secrets.baseline index cd84f16a87..2ab92dd461 100644 --- a/utils/.secrets.baseline +++ b/utils/.secrets.baseline @@ -151,7 +151,7 @@ "filename": "src/unitxt/loaders.py", "hashed_secret": "840268f77a57d5553add023cfa8a4d1535f49742", "is_verified": false, - "line_number": 629, + "line_number": 649, "is_secret": false } ], @@ -184,5 +184,5 @@ } ] }, - "generated_at": "2025-02-08T16:10:49Z" + "generated_at": "2025-02-09T15:55:58Z" } From 97b4ff6def5a8fe5127d31fa74b4234299253038 Mon Sep 17 00:00:00 2001 From: dafnapension Date: Sun, 9 Feb 2025 18:39:32 +0200 Subject: [PATCH 3/5] change the returned error on a missing file: in the lazy mode, this is identified only when trying to pull, not on generation of the loader Signed-off-by: dafnapension --- tests/library/test_loaders.py | 3 +-- utils/.secrets.baseline | 6 +++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/library/test_loaders.py b/tests/library/test_loaders.py index 5ec46aeffc..9054727bd7 100644 --- a/tests/library/test_loaders.py +++ b/tests/library/test_loaders.py @@ -4,7 +4,6 @@ from unittest.mock import patch import pandas as pd -from unitxt.error_utils import UnitxtError from unitxt.loaders import ( LoadCSV, LoadFromDictionary, @@ -79,7 +78,7 @@ def test_load_csv(self): self.assertEqual(saved_instance[1].to_dict(), loaded_instance) def test_failed_load_csv(self): - with self.assertRaises(UnitxtError): + with self.assertRaises(FileNotFoundError): list(LoadCSV(files={"test": "not_exist.csv"})()["test"]) def test_load_csv_with_pandas_args(self): diff --git a/utils/.secrets.baseline b/utils/.secrets.baseline index 2ab92dd461..3a96c2bc25 100644 --- a/utils/.secrets.baseline +++ b/utils/.secrets.baseline @@ -171,7 +171,7 @@ "filename": "tests/library/test_loaders.py", "hashed_secret": "8d814baafe5d8412572dc520dcab83f60ce1375c", "is_verified": false, - "line_number": 118, + "line_number": 117, "is_secret": false }, { @@ -179,10 +179,10 @@ "filename": "tests/library/test_loaders.py", "hashed_secret": "42a472ac88cd8d43a2c5ae0bd0bdf4626cdaba31", "is_verified": false, - "line_number": 128, + "line_number": 127, "is_secret": false } ] }, - "generated_at": "2025-02-09T15:55:58Z" + "generated_at": "2025-02-09T16:35:51Z" } From 2c843f436e152df950cbd3ed6492ee3c8650579a Mon Sep 17 00:00:00 2001 From: dafnapension Date: Sun, 9 Feb 2025 20:06:47 +0200 Subject: [PATCH 4/5] add time spent in split_generator -- to load time Signed-off-by: dafnapension --- performance/bluebench_profiler.py | 10 ++++++---- .../compare_benchmark_performance_results.py | 14 +++++++------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/performance/bluebench_profiler.py b/performance/bluebench_profiler.py index 9a04cb3221..d1a4e72080 100644 --- a/performance/bluebench_profiler.py +++ b/performance/bluebench_profiler.py @@ -154,17 +154,20 @@ def main(): pst.strip_dirs() pst.sort_stats("name") # sort by function name pst.print_stats( - "profile_benchmark_blue_bench|profiler_instantiate_benchmark_recipe|profiler_generate_benchmark_dataset|profiler_instantiate_model|profiler_infer_predictions|profiler_evaluate_predictions|load_data|load_iterables" + "profile_benchmark_blue_bench|profiler_instantiate_benchmark_recipe|profiler_generate_benchmark_dataset|profiler_instantiate_model|profiler_infer_predictions|profiler_evaluate_predictions|load_data|load_iterables|split_generator" ) s = f.getvalue() assert s.split("\n")[7].split()[3] == "cumtime" overall_tot_time = find_cummtime_of( "profile_benchmark_blue_bench", "bluebench_profiler.py", s ) - load_time = find_cummtime_of("load_data", "loaders.py", s) - just_load_no_initial_ms_time = find_cummtime_of( + # load_time = find_cummtime_of("load_data", "loaders.py", s) + load_time = find_cummtime_of( "load_iterables", "loaders.py", s ) + load_time += find_cummtime_of( + "split_generator", "loaders.py", s + ) instantiate_benchmark_time = find_cummtime_of( "profiler_instantiate_benchmark_recipe", "bluebench_profiler.py", s ) @@ -186,7 +189,6 @@ def main(): "dataset_query": dataset_query, "total_time": overall_tot_time, "load_time": load_time, - "load_time_no_initial_ms": just_load_no_initial_ms_time, "instantiate_benchmark_time": instantiate_benchmark_time, "generate_benchmark_dataset_time": generate_benchmark_dataset_time, "instantiate_model_time": instantiate_model_time, diff --git a/performance/compare_benchmark_performance_results.py b/performance/compare_benchmark_performance_results.py index 5765727d4e..4890c0872e 100644 --- a/performance/compare_benchmark_performance_results.py +++ b/performance/compare_benchmark_performance_results.py @@ -26,14 +26,14 @@ print(f"use Mocked inference = {os.environ['UNITXT_MOCK_INFERENCE_MODE']}") ratio1 = ( - (pr_perf["generate_benchmark_dataset_time"] - pr_perf["load_time_no_initial_ms"]) + (pr_perf["generate_benchmark_dataset_time"] - pr_perf["load_time"]) / ( main_perf["generate_benchmark_dataset_time"] - - main_perf["load_time_no_initial_ms"] + - main_perf["load_time"] ) if ( main_perf["generate_benchmark_dataset_time"] - - main_perf["load_time_no_initial_ms"] + - main_perf["load_time"] ) > 0 else 1 @@ -49,13 +49,13 @@ line2 = "--------------------|-------------|-------------|---------------\n" line3 = f" Total time | {main_perf['total_time']:>11} | {pr_perf['total_time']:>11} | {pr_perf['total_time'] / main_perf['total_time']:.2f}\n" ratio_line4 = ( - pr_perf["load_time_no_initial_ms"] / main_perf["load_time_no_initial_ms"] - if main_perf["load_time_no_initial_ms"] > 0 + pr_perf["load_time"] / main_perf["load_time"] + if main_perf["load_time"] > 0 else 1 ) -line4 = f" Load time | {main_perf['load_time_no_initial_ms']:>11} | {pr_perf['load_time_no_initial_ms']:>11} | {ratio_line4:.2f}\n" +line4 = f" Load time | {main_perf['load_time']:>11} | {pr_perf['load_time']:>11} | {ratio_line4:.2f}\n" line5 = f" DS Gen. inc. Load | {main_perf['generate_benchmark_dataset_time']:>11} | {pr_perf['generate_benchmark_dataset_time']:>11} | {pr_perf['generate_benchmark_dataset_time'] / main_perf['generate_benchmark_dataset_time']:.2f}\n" -line6 = f" DS Gen. exc. Load | {round(main_perf['generate_benchmark_dataset_time'] - main_perf['load_time_no_initial_ms'], 3):>11} | {round(pr_perf['generate_benchmark_dataset_time'] - pr_perf['load_time_no_initial_ms'], 3):>11} | {ratio1:.2f}\n" +line6 = f" DS Gen. exc. Load | {round(main_perf['generate_benchmark_dataset_time'] - main_perf['load_time'], 3):>11} | {round(pr_perf['generate_benchmark_dataset_time'] - pr_perf['load_time'], 3):>11} | {ratio1:.2f}\n" line7 = f" Inference time | {main_perf['inference_time']:>11} | {pr_perf['inference_time']:>11} | {pr_perf['inference_time'] / main_perf['inference_time']:.2f}\n" line8 = f" Evaluate time | {main_perf['evaluation_time']:>11} | {pr_perf['evaluation_time']:>11} | {ratio2:.2f}\n" line9 = f" Benchmark Instant. | {main_perf['instantiate_benchmark_time']:>11} | {pr_perf['instantiate_benchmark_time']:>11} | {pr_perf['instantiate_benchmark_time'] / main_perf['instantiate_benchmark_time']:.2f}\n" From 309c6dc7771fa1696c5d6307a5301d16a29217c5 Mon Sep 17 00:00:00 2001 From: dafnapension Date: Mon, 10 Feb 2025 12:26:13 +0200 Subject: [PATCH 5/5] bluebench_profiler to also profile cards and recipes Signed-off-by: dafnapension --- performance/bluebench_profiler.py | 31 +++++++++++++++++-- .../compare_benchmark_performance_results.py | 8 +++-- src/unitxt/loaders.py | 2 +- 3 files changed, 35 insertions(+), 6 deletions(-) diff --git a/performance/bluebench_profiler.py b/performance/bluebench_profiler.py index d1a4e72080..0563ed0163 100644 --- a/performance/bluebench_profiler.py +++ b/performance/bluebench_profiler.py @@ -8,7 +8,9 @@ from typing import Any, Dict, List, Union from unitxt.api import _source_to_dataset, evaluate, load_recipe +from unitxt.artifact import fetch_artifact from unitxt.benchmark import Benchmark +from unitxt.card import TaskCard from unitxt.inference import ( CrossProviderInferenceEngine, InferenceEngine, @@ -16,6 +18,8 @@ ) from unitxt.logging_utils import get_logger from unitxt.settings_utils import get_settings +from unitxt.standard import DatasetRecipe +from unitxt.templates import TemplatesDict, TemplatesList logger = get_logger() settings = get_settings() @@ -61,8 +65,29 @@ class BlueBenchProfiler: def profiler_instantiate_benchmark_recipe( self, dataset_query: str, **kwargs - ) -> Benchmark: - return load_recipe(dataset_query, **kwargs) + ) -> Union[Benchmark, DatasetRecipe]: + benchmark_or_card , _ = fetch_artifact(dataset_query) + if isinstance(benchmark_or_card, (Benchmark, DatasetRecipe)): + return load_recipe(dataset_query, **kwargs) + assert isinstance(benchmark_or_card, TaskCard) + # benchmark_or_card is a taskcard. determine a template for it: + if isinstance(benchmark_or_card.templates, list): + template = benchmark_or_card.templates[0] + elif isinstance(benchmark_or_card.templates, TemplatesList): + template = benchmark_or_card.templates.items[0] + elif isinstance(benchmark_or_card.templates, dict): + for templ in benchmark_or_card.templates.values(): + template = templ + break + elif isinstance(benchmark_or_card.templates, TemplatesDict): + for templ in benchmark_or_card.templates.items.values(): + template = templ + break + else: + raise ValueError( + f"Unidentified type of templates {benchmark_or_card.templates} in card {dataset_query}" + ) + return DatasetRecipe(card=benchmark_or_card, template=template) def profiler_generate_benchmark_dataset( self, benchmark_recipe: Benchmark, split: str, **kwargs @@ -104,6 +129,8 @@ def profiler_do_the_profiling(self, dataset_query: str, split: str, **kwargs): dataset_query = "benchmarks.bluebench[loader_limit=30,max_samples_per_subset=30]" +# dataset_query = "cards.cola" +# dataset_query = "recipes.bluebench.knowledge.mmlu_pro_math" def profile_benchmark_blue_bench(): diff --git a/performance/compare_benchmark_performance_results.py b/performance/compare_benchmark_performance_results.py index 4890c0872e..729df6adfc 100644 --- a/performance/compare_benchmark_performance_results.py +++ b/performance/compare_benchmark_performance_results.py @@ -25,6 +25,8 @@ print(f"used_eager_mode in PR = {pr_perf['used_eager_mode']}") print(f"use Mocked inference = {os.environ['UNITXT_MOCK_INFERENCE_MODE']}") +ratio0 = pr_perf["total_time"] / main_perf["total_time"] + ratio1 = ( (pr_perf["generate_benchmark_dataset_time"] - pr_perf["load_time"]) / ( @@ -47,7 +49,7 @@ line1 = " What is Measured | Main Branch | PR Branch | PR/Main ratio \n" line2 = "--------------------|-------------|-------------|---------------\n" -line3 = f" Total time | {main_perf['total_time']:>11} | {pr_perf['total_time']:>11} | {pr_perf['total_time'] / main_perf['total_time']:.2f}\n" +line3 = f" Total time | {main_perf['total_time']:>11} | {pr_perf['total_time']:>11} | {ratio0:.2f}\n" ratio_line4 = ( pr_perf["load_time"] / main_perf["load_time"] if main_perf["load_time"] > 0 @@ -58,14 +60,14 @@ line6 = f" DS Gen. exc. Load | {round(main_perf['generate_benchmark_dataset_time'] - main_perf['load_time'], 3):>11} | {round(pr_perf['generate_benchmark_dataset_time'] - pr_perf['load_time'], 3):>11} | {ratio1:.2f}\n" line7 = f" Inference time | {main_perf['inference_time']:>11} | {pr_perf['inference_time']:>11} | {pr_perf['inference_time'] / main_perf['inference_time']:.2f}\n" line8 = f" Evaluate time | {main_perf['evaluation_time']:>11} | {pr_perf['evaluation_time']:>11} | {ratio2:.2f}\n" -line9 = f" Benchmark Instant. | {main_perf['instantiate_benchmark_time']:>11} | {pr_perf['instantiate_benchmark_time']:>11} | {pr_perf['instantiate_benchmark_time'] / main_perf['instantiate_benchmark_time']:.2f}\n" +line9 = f" BM/Recipe Instant. | {main_perf['instantiate_benchmark_time']:>11} | {pr_perf['instantiate_benchmark_time']:>11} | {pr_perf['instantiate_benchmark_time'] / main_perf['instantiate_benchmark_time']:.2f}\n" line10 = f" Model Instantiation| {main_perf['instantiate_model_time']:>11} | {pr_perf['instantiate_model_time']:>11} | {pr_perf['instantiate_model_time'] / main_perf['instantiate_model_time']:.2f}\n" print("### Performance Comparison Results, time expressed in seconds:\n") print(line1 + line2 + line3 + line4 + line5 + line6 + line7 + line8 + line9 + line10) print("\n\n") # Performance degradation check (5% threshold) -if ratio1 > 1.05 or ratio2 > 1.05: +if (ratio0 > 1.05) and (ratio1 > 1.05 or ratio2 > 1.05): print( "\n**Warning**: Performance degradation in Dataset Generation and/or Evaluation exceeds 5%!" ) diff --git a/src/unitxt/loaders.py b/src/unitxt/loaders.py index 2eaee130cd..47a449dcb4 100644 --- a/src/unitxt/loaders.py +++ b/src/unitxt/loaders.py @@ -282,7 +282,7 @@ def load_dataset( streaming = self.is_streaming() # try to optimize when not too dangerous - if self.get_limit() <= 100: + if self.get_limit() is not None and self.get_limit() <= 100: streaming = True with tempfile.TemporaryDirectory() as dir_to_be_deleted: