Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TextSplitters: Refactor HTMLHeaderTextSplitter for Enhanced Maintainability and Readability #29397

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
216 changes: 96 additions & 120 deletions libs/text-splitters/langchain_text_splitters/html.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,9 @@ def __init__(
return_each_element: Whether to return each HTML
element as a separate Document. Defaults to False.
"""
# Sort headers by their numeric level so that h1 < h2 < h3...
self.headers_to_split_on = sorted(
headers_to_split_on, key=lambda x: int(x[0][1])
headers_to_split_on, key=lambda x: int(x[0][1:])
)
self.header_mapping = dict(self.headers_to_split_on)
self.header_tags = [tag for tag, _ in self.headers_to_split_on]
Expand Down Expand Up @@ -163,49 +164,6 @@ def split_text_from_url(
response.raise_for_status()
return self.split_text(response.text)

def _header_level(self, tag_name: str) -> int:
"""Determine the heading level of a tag."""
if tag_name.lower() in ["h1", "h2", "h3", "h4", "h5", "h6"]:
return int(tag_name[1])
# Returns high level if it isn't a header
return 9999

def _dom_depth(self, element: Any) -> int:
"""Determine the DOM depth of an element by counting its parents."""
depth = 0
for _ in element.parents:
depth += 1
return depth

def _get_elements(self, html_content: str) -> List[Any]:
"""Parse HTML content and return a list of BeautifulSoup elements.

This helper function takes HTML content as input,
parses it using BeautifulSoup4, and returns all HTML elements
found in the document body. If no body tag exists,
it returns all elements in the full document.

Args:
html_content: Raw HTML content to be parsed.

Returns:
List[Any]: A list of BeautifulSoup elements found in the HTML document.

Raises:
ImportError: If the BeautifulSoup4 package is not installed.
"""
try:
from bs4 import BeautifulSoup # type: ignore[import-untyped]
except ImportError as e:
raise ImportError(
"Unable to import BeautifulSoup/PageElement, \
please install with `pip install \
bs4`."
) from e
soup = BeautifulSoup(html_content, "html.parser")
body = soup.body if soup.body else soup
return body.find_all()

def split_text_from_file(self, file: Any) -> List[Document]:
"""Split HTML content from a file into a list of Document objects.

Expand All @@ -220,105 +178,120 @@ def split_text_from_file(self, file: Any) -> List[Document]:
html_content = f.read()
else:
html_content = file.read()
elements = self._get_elements(html_content)
documents: List[Document] = []
return list(self._generate_documents(html_content))

def _generate_documents(self, html_content: str) -> Any:
"""Private method that performs a DFS traversal over the DOM and yields.

Document objects on-the-fly. This approach maintains the same splitting
logic (headers vs. non-headers, chunking, etc.) while walking the DOM
explicitly in code.

Args:
html_content: The raw HTML content.

Yields:
Document objects as they are created.
"""
try:
from bs4 import BeautifulSoup
except ImportError as e:
raise ImportError(
"Unable to import BeautifulSoup. Please install via `pip install bs4`."
) from e

soup = BeautifulSoup(html_content, "html.parser")
body = soup.body if soup.body else soup

# Dictionary of active headers:
# key = user-defined header name (e.g. "Header 1")
# value = (header_text, level, dom_depth)
active_headers: Dict[str, Tuple[str, int, int]] = {}
current_chunk: List[str] = []
chunk_dom_depth = 0

def finalize_chunk() -> None:
if current_chunk:
final_meta = {
key: content
for key, (content, level, dom_depth) in active_headers.items()
if chunk_dom_depth >= dom_depth
}
combined_text = " \n".join(
line for line in current_chunk if line.strip()
)
if combined_text.strip():
documents.append(
Document(page_content=combined_text, metadata=final_meta)
)
current_chunk.clear()

for element in elements:
tag = element.name
def finalize_chunk() -> Optional[Document]:
"""Finalize the accumulated chunk into a single Document."""
if not current_chunk:
return None

final_text = " \n".join(line for line in current_chunk if line.strip())
current_chunk.clear()
if not final_text.strip():
return None

final_meta = {k: v[0] for k, v in active_headers.items()}
return Document(page_content=final_text, metadata=final_meta)

# We'll use a stack for DFS traversal
stack = [body]
while stack:
node = stack.pop()
children = list(node.children)
for child in reversed(children):
if getattr(child, "name", None):
stack.append(child)

tag = getattr(node, "name", None)
if not tag:
continue
text = " ".join(
t
for t in element.find_all(string=True, recursive=False)
if isinstance(t, str)

node_text = " ".join(
t for t in node.find_all(string=True, recursive=False) if t.strip()
).strip()
if not text:
if not node_text:
continue

level = self._header_level(tag)
dom_depth = self._dom_depth(element)
dom_depth = len(list(node.parents))

# If this node is one of our headers
if tag in self.header_tags:
# If we're aggregating, finalize whatever chunk we had
if not self.return_each_element:
finalize_chunk()
doc = finalize_chunk()
if doc:
yield doc

# Determine numeric level (h1->1, h2->2, etc.)
try:
level = int(tag[1:])
except ValueError:
level = 9999

# Remove headers at same or deeper level
# Remove any active headers that are at or deeper than this new level
headers_to_remove = [
key for key, (_, lvl, _) in active_headers.items() if lvl >= level
k for k, (_, lvl, d) in active_headers.items() if lvl >= level
]
for key in headers_to_remove:
del active_headers[key]

header_key = self.header_mapping[tag]
active_headers[header_key] = (text, level, dom_depth)

# Produce a document for the header itself
header_meta = {
key: content
for key, (content, lvl, dd) in active_headers.items()
if dom_depth >= dd
}
documents.append(Document(page_content=text, metadata=header_meta))
# After encountering a header,
# no immediate content goes to current_chunk
# (if return_each_element is False, we wait for next content)
# (if return_each_element is True, we create docs per element anyway)
# Add/Update the active header
header_name = self.header_mapping[tag]
active_headers[header_name] = (node_text, level, dom_depth)

# Always yield a Document for the header
header_meta = {k: v[0] for k, v in active_headers.items()}
yield Document(page_content=node_text, metadata=header_meta)

else:
# Non-header element logic
# Remove headers that don't apply if dom_depth < their dom_depth
headers_to_remove = [
key for key, (_, _, dd) in active_headers.items() if dom_depth < dd
headers_out_of_scope = [
k for k, (_, _, d) in active_headers.items() if dom_depth < d
]
for key in headers_to_remove:
for key in headers_out_of_scope:
del active_headers[key]

if self.return_each_element:
# Produce a doc for this element immediately
element_meta = {
key: content
for key, (content, lvl, dd) in active_headers.items()
if dom_depth >= dd
}
if text.strip():
documents.append(
Document(page_content=text, metadata=element_meta)
)
# Yield each element's text as its own Document
meta = {k: v[0] for k, v in active_headers.items()}
yield Document(page_content=node_text, metadata=meta)
else:
# Accumulate content in current_chunk
if text.strip():
current_chunk.append(text)
chunk_dom_depth = max(chunk_dom_depth, dom_depth)
# Accumulate text in our chunk
current_chunk.append(node_text)

# If we're aggregating and have leftover chunk, yield it
if not self.return_each_element:
# finalize any remaining chunk
finalize_chunk()

# If no headers were found at all and return_each_element=False, behavior is:
# The entire content should be in one document.
# The logic above naturally handles it:
# If no recognized headers, we never split; we ended up just accumulating text
# in current_chunk and finalizing once at the end.

return documents
doc = finalize_chunk()
if doc:
yield doc


class HTMLSectionSplitter:
Expand Down Expand Up @@ -916,7 +889,9 @@ def _process_element(
if current_content:
documents.extend(
self._create_documents(
current_headers, " ".join(current_content), preserved_elements
current_headers,
" ".join(current_content),
preserved_elements,
)
)

Expand Down Expand Up @@ -972,7 +947,8 @@ def _further_split_chunk(
if split_with_preserved.strip():
result.append(
Document(
page_content=split_with_preserved.strip(), metadata=metadata
page_content=split_with_preserved.strip(),
metadata=metadata,
)
)

Expand Down
Loading