Skip to content

Commit

Permalink
fix indexing issues
Browse files Browse the repository at this point in the history
  • Loading branch information
santteegt committed Sep 17, 2024
1 parent f6dc127 commit c263a1e
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 2 deletions.
2 changes: 1 addition & 1 deletion gaianet_rag_api_pipeline/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def create_endpoint_stream(
# NOTICE: using lowercase field name as airbyte returns fields like that
schema_columns[f"{field.lower()}"] = pw.column_definition(
dtype=dtype | None,
primary_key=(field == "id"),
# primary_key=(field == "id"), # NOTICE: disabled to use pathway own indexer
default_value=dtype()
)
logger.debug(f"schema columns for {stream_id} stream - {schema_columns}")
Expand Down
7 changes: 6 additions & 1 deletion gaianet_rag_api_pipeline/preprocessing.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from gaianet_rag_api_pipeline.config import logger
import pandas as pd
import pathway as pw
import time


def preprocessing(
Expand Down Expand Up @@ -35,7 +37,10 @@ def preprocess_table(table: pd.DataFrame) -> pd.DataFrame:

# get preprocessed table to be forwarded to the streams normalization stage
preprocessed_table = table[["preprocessed_text", "preprocessed_metadata"]]
preprocessed_table.columns = ["content", "metadata"]
preprocessed_table.rename(
columns={"preprocessed_text": "content", "preprocessed_metadata": "metadata"},
inplace=True
)
return preprocessed_table

output_table = preprocess_table(input_stream)
Expand Down

0 comments on commit c263a1e

Please sign in to comment.