diff --git a/libs/text-splitters/langchain_text_splitters/html.py b/libs/text-splitters/langchain_text_splitters/html.py index 6a235e085bdea..08d3358db3c76 100644 --- a/libs/text-splitters/langchain_text_splitters/html.py +++ b/libs/text-splitters/langchain_text_splitters/html.py @@ -124,8 +124,9 @@ def __init__( return_each_element: Whether to return each HTML element as a separate Document. Defaults to False. """ + # Sort headers by their numeric level so that h1 < h2 < h3... self.headers_to_split_on = sorted( - headers_to_split_on, key=lambda x: int(x[0][1]) + headers_to_split_on, key=lambda x: int(x[0][1:]) ) self.header_mapping = dict(self.headers_to_split_on) self.header_tags = [tag for tag, _ in self.headers_to_split_on] @@ -163,49 +164,6 @@ def split_text_from_url( response.raise_for_status() return self.split_text(response.text) - def _header_level(self, tag_name: str) -> int: - """Determine the heading level of a tag.""" - if tag_name.lower() in ["h1", "h2", "h3", "h4", "h5", "h6"]: - return int(tag_name[1]) - # Returns high level if it isn't a header - return 9999 - - def _dom_depth(self, element: Any) -> int: - """Determine the DOM depth of an element by counting its parents.""" - depth = 0 - for _ in element.parents: - depth += 1 - return depth - - def _get_elements(self, html_content: str) -> List[Any]: - """Parse HTML content and return a list of BeautifulSoup elements. - - This helper function takes HTML content as input, - parses it using BeautifulSoup4, and returns all HTML elements - found in the document body. If no body tag exists, - it returns all elements in the full document. - - Args: - html_content: Raw HTML content to be parsed. - - Returns: - List[Any]: A list of BeautifulSoup elements found in the HTML document. - - Raises: - ImportError: If the BeautifulSoup4 package is not installed. - """ - try: - from bs4 import BeautifulSoup # type: ignore[import-untyped] - except ImportError as e: - raise ImportError( - "Unable to import BeautifulSoup/PageElement, \ - please install with `pip install \ - bs4`." - ) from e - soup = BeautifulSoup(html_content, "html.parser") - body = soup.body if soup.body else soup - return body.find_all() - def split_text_from_file(self, file: Any) -> List[Document]: """Split HTML content from a file into a list of Document objects. @@ -220,105 +178,120 @@ def split_text_from_file(self, file: Any) -> List[Document]: html_content = f.read() else: html_content = file.read() - elements = self._get_elements(html_content) - documents: List[Document] = [] + return list(self._generate_documents(html_content)) + + def _generate_documents(self, html_content: str) -> Any: + """Private method that performs a DFS traversal over the DOM and yields. + + Document objects on-the-fly. This approach maintains the same splitting + logic (headers vs. non-headers, chunking, etc.) while walking the DOM + explicitly in code. + + Args: + html_content: The raw HTML content. + + Yields: + Document objects as they are created. + """ + try: + from bs4 import BeautifulSoup + except ImportError as e: + raise ImportError( + "Unable to import BeautifulSoup. Please install via `pip install bs4`." + ) from e + + soup = BeautifulSoup(html_content, "html.parser") + body = soup.body if soup.body else soup + + # Dictionary of active headers: + # key = user-defined header name (e.g. "Header 1") + # value = (header_text, level, dom_depth) active_headers: Dict[str, Tuple[str, int, int]] = {} current_chunk: List[str] = [] - chunk_dom_depth = 0 - - def finalize_chunk() -> None: - if current_chunk: - final_meta = { - key: content - for key, (content, level, dom_depth) in active_headers.items() - if chunk_dom_depth >= dom_depth - } - combined_text = " \n".join( - line for line in current_chunk if line.strip() - ) - if combined_text.strip(): - documents.append( - Document(page_content=combined_text, metadata=final_meta) - ) - current_chunk.clear() - for element in elements: - tag = element.name + def finalize_chunk() -> Optional[Document]: + """Finalize the accumulated chunk into a single Document.""" + if not current_chunk: + return None + + final_text = " \n".join(line for line in current_chunk if line.strip()) + current_chunk.clear() + if not final_text.strip(): + return None + + final_meta = {k: v[0] for k, v in active_headers.items()} + return Document(page_content=final_text, metadata=final_meta) + + # We'll use a stack for DFS traversal + stack = [body] + while stack: + node = stack.pop() + children = list(node.children) + for child in reversed(children): + if getattr(child, "name", None): + stack.append(child) + + tag = getattr(node, "name", None) if not tag: continue - text = " ".join( - t - for t in element.find_all(string=True, recursive=False) - if isinstance(t, str) + + node_text = " ".join( + t for t in node.find_all(string=True, recursive=False) if t.strip() ).strip() - if not text: + if not node_text: continue - level = self._header_level(tag) - dom_depth = self._dom_depth(element) + dom_depth = len(list(node.parents)) + # If this node is one of our headers if tag in self.header_tags: + # If we're aggregating, finalize whatever chunk we had if not self.return_each_element: - finalize_chunk() + doc = finalize_chunk() + if doc: + yield doc + + # Determine numeric level (h1->1, h2->2, etc.) + try: + level = int(tag[1:]) + except ValueError: + level = 9999 - # Remove headers at same or deeper level + # Remove any active headers that are at or deeper than this new level headers_to_remove = [ - key for key, (_, lvl, _) in active_headers.items() if lvl >= level + k for k, (_, lvl, d) in active_headers.items() if lvl >= level ] for key in headers_to_remove: del active_headers[key] - header_key = self.header_mapping[tag] - active_headers[header_key] = (text, level, dom_depth) - - # Produce a document for the header itself - header_meta = { - key: content - for key, (content, lvl, dd) in active_headers.items() - if dom_depth >= dd - } - documents.append(Document(page_content=text, metadata=header_meta)) - # After encountering a header, - # no immediate content goes to current_chunk - # (if return_each_element is False, we wait for next content) - # (if return_each_element is True, we create docs per element anyway) + # Add/Update the active header + header_name = self.header_mapping[tag] + active_headers[header_name] = (node_text, level, dom_depth) + + # Always yield a Document for the header + header_meta = {k: v[0] for k, v in active_headers.items()} + yield Document(page_content=node_text, metadata=header_meta) + else: - # Non-header element logic - # Remove headers that don't apply if dom_depth < their dom_depth - headers_to_remove = [ - key for key, (_, _, dd) in active_headers.items() if dom_depth < dd + headers_out_of_scope = [ + k for k, (_, _, d) in active_headers.items() if dom_depth < d ] - for key in headers_to_remove: + for key in headers_out_of_scope: del active_headers[key] if self.return_each_element: - # Produce a doc for this element immediately - element_meta = { - key: content - for key, (content, lvl, dd) in active_headers.items() - if dom_depth >= dd - } - if text.strip(): - documents.append( - Document(page_content=text, metadata=element_meta) - ) + # Yield each element's text as its own Document + meta = {k: v[0] for k, v in active_headers.items()} + yield Document(page_content=node_text, metadata=meta) else: - # Accumulate content in current_chunk - if text.strip(): - current_chunk.append(text) - chunk_dom_depth = max(chunk_dom_depth, dom_depth) + # Accumulate text in our chunk + current_chunk.append(node_text) + # If we're aggregating and have leftover chunk, yield it if not self.return_each_element: - # finalize any remaining chunk - finalize_chunk() - - # If no headers were found at all and return_each_element=False, behavior is: - # The entire content should be in one document. - # The logic above naturally handles it: - # If no recognized headers, we never split; we ended up just accumulating text - # in current_chunk and finalizing once at the end. - - return documents + doc = finalize_chunk() + if doc: + yield doc class HTMLSectionSplitter: @@ -916,7 +889,9 @@ def _process_element( if current_content: documents.extend( self._create_documents( - current_headers, " ".join(current_content), preserved_elements + current_headers, + " ".join(current_content), + preserved_elements, ) ) @@ -972,7 +947,8 @@ def _further_split_chunk( if split_with_preserved.strip(): result.append( Document( - page_content=split_with_preserved.strip(), metadata=metadata + page_content=split_with_preserved.strip(), + metadata=metadata, ) )