Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Tests write to own log file #230

Merged
merged 6 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/goe/offload/offload_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -1621,7 +1621,7 @@ def get_password_snippet():
password_snippet = get_password_snippet()

# Certain config should not be logged
config_blacklist = [
config_nolog = [
"spark.jdbc.password",
"spark.authenticate.secret",
"spark.hadoop.fs.gs.auth.service.account.private.key",
Expand All @@ -1635,13 +1635,13 @@ def get_password_snippet():
]
debug_conf_snippet = dedent(
"""\
config_blacklist = %s
config_nolog = %s
for kv in sorted(spark.sparkContext.getConf().getAll()):
if kv[0] in config_blacklist:
if kv[0] in config_nolog:
print(kv[0], '?')
else:
print(kv)"""
% repr(config_blacklist)
% repr(config_nolog)
)

params = {
Expand Down
100 changes: 63 additions & 37 deletions tests/integration/scenarios/assertion_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,61 +55,76 @@
from goe.config.orchestration_config import OrchestrationConfig
from testlib.test_framework.backend_testing_api import BackendTestingApiInterface
from testlib.test_framework.frontend_testing_api import FrontendTestingApiInterface
from testlib.test_framework.offload_test_messages import OffloadTestMessages


def hint_text_in_log(
messages: "OffloadMessages",
offload_messages: "OffloadMessages",
config: "OrchestrationConfig",
parallelism: int,
search_from_text,
):
if config.db_type == DBTYPE_ORACLE:
hint = (
"NO_PARALLEL"
if parallelism in (0, 1)
else "PARALLEL({})".format(str(parallelism))
)
return bool(messages.get_line_from_log(hint, search_from_text))
return bool(test_functions.get_line_from_log(offload_messages, hint))
else:
return False


def text_in_log(
messages: "OffloadMessages", search_text: str, search_from_text=""
offload_messages: "OffloadMessages",
search_text: str,
test_messages: "OffloadTestMessages",
search_from_text="",
) -> bool:
# Do not log search_text below because that is a sure fire way to break the check.
messages.log(f'text_in_log(">8 snip 8<", "{search_from_text}")', detail=VERBOSE)
test_messages.log(
f'text_in_log(">8 snip 8<", "{search_from_text}")', detail=VERBOSE
)
return bool(
messages.get_line_from_log(search_text, search_from_text=search_from_text)
test_functions.get_line_from_log(
offload_messages, search_text, search_from_text=search_from_text
)
)


def text_in_events(messages, message_token) -> bool:
messages.log(f"text_in_events({message_token})", detail=VERBOSE)
return test_functions.text_in_events(messages, message_token)
def text_in_events(
offload_messages: "OffloadMessages",
message_token,
test_messages: "OffloadTestMessages",
) -> bool:
test_messages.log(f"text_in_events({message_token})", detail=VERBOSE)
return test_functions.text_in_events(offload_messages, message_token)


def text_in_messages(messages, log_text) -> bool:
messages.log(f"text_in_messages({log_text})", detail=VERBOSE)
return test_functions.text_in_messages(messages, log_text)
def text_in_messages(
offload_messages: "OffloadMessages", log_text, test_messages: "OffloadTestMessages"
) -> bool:
test_messages.log(f"text_in_messages({log_text})", detail=VERBOSE)
return test_functions.text_in_messages(offload_messages, log_text)


def messages_step_executions(messages, step_text) -> int:
def messages_step_executions(
offload_messages: "OffloadMessages", step_text, test_messages: "OffloadTestMessages"
) -> int:
"""Return the number of times step "step_text" was executed."""
assert step_text
messages.log("messages_step_executions: %s" % step_text, detail=VERBOSE)
if messages and step_text in messages.steps:
return messages.steps[step_text]["count"]
test_messages.log("messages_step_executions: %s" % step_text, detail=VERBOSE)
if offload_messages and step_text in offload_messages.steps:
return offload_messages.steps[step_text]["count"]
return 0


def get_offload_row_count_from_log(messages, test_name):
def get_offload_row_count_from_log(offload_messages, test_messages):
"""Search test log forwards from the "test_name" we are processing and find the offload row count"""
messages.log("get_offload_row_count_from_log(%s)" % test_name, detail=VERBOSE)
matched_line = messages.get_line_from_log(
TOTAL_ROWS_OFFLOADED_LOG_TEXT, search_from_text=test_name
test_messages.log("get_offload_row_count_from_log()", detail=VERBOSE)
matched_line = test_functions.get_line_from_log(
offload_messages, TOTAL_ROWS_OFFLOADED_LOG_TEXT
)
messages.log("matched_line: %s" % matched_line)
test_messages.log("matched_line: %s" % matched_line)
rows = int(matched_line.split()[-1]) if matched_line else None
return rows

Expand Down Expand Up @@ -425,11 +440,13 @@ def load_table_is_compressed(db_name, table_name, config, dfs_client, messages):


def offload_rowsource_split_type_assertion(
messages: "OffloadMessages", story_id: str, split_type: str
offload_messages: "OffloadMessages",
split_type: str,
test_messages: "OffloadTestMessages",
):
search = TRANSPORT_ROW_SOURCE_QUERY_SPLIT_TYPE_TEXT + split_type
if not text_in_log(messages, search, "(%s)" % (story_id)):
messages.log(
if not text_in_log(offload_messages, search, test_messages):
test_messages.log(
f"offload_rowsource_split_type_assertion failed, did not find: {search}"
)
return False
Expand Down Expand Up @@ -474,17 +491,17 @@ def date_goe_part_column_name(backend_api, source_col_name, granularity_override
def standard_dimension_assertion(
config: "OrchestrationConfig",
backend_api: "BackendTestingApiInterface",
messages: "OffloadMessages",
messages: "OffloadTestMessages",
repo_client: "OrchestrationRepoClientInterface",
schema: str,
data_db: str,
table_name: str,
backend_db=None,
backend_table=None,
story_id="",
split_type=None,
partition_functions=None,
bucket_column=None,
offload_messages: "OffloadMessages" = None,
) -> bool:
data_db = backend_db or data_db
backend_table = backend_table or table_name
Expand All @@ -509,11 +526,15 @@ def standard_dimension_assertion(
messages.log("backend_table_exists() == False")
return False

if text_in_messages(messages, MISSING_ROWS_IMPORTED_WARNING):
if offload_messages and text_in_messages(
offload_messages, MISSING_ROWS_IMPORTED_WARNING, messages
):
return False

if split_type:
if not offload_rowsource_split_type_assertion(messages, story_id, split_type):
if split_type and offload_messages:
if not offload_rowsource_split_type_assertion(
offload_messages, split_type, messages
):
return False

return True
Expand All @@ -523,7 +544,7 @@ def sales_based_fact_assertion(
config: "OrchestrationConfig",
backend_api: "BackendTestingApiInterface",
frontend_api: "FrontendTestingApiInterface",
messages: "OffloadMessages",
messages: "OffloadTestMessages",
repo_client: "OrchestrationRepoClientInterface",
schema: str,
data_db: str,
Expand All @@ -534,7 +555,6 @@ def sales_based_fact_assertion(
offload_pattern=OFFLOAD_PATTERN_90_10,
incremental_key="TIME_ID",
incremental_range=None,
story_id="",
split_type=None,
check_hwm_in_metadata=True,
ipa_predicate_type="RANGE",
Expand All @@ -543,6 +563,7 @@ def sales_based_fact_assertion(
partition_functions=None,
synthetic_partition_column_name=None,
check_backend_rowcount=False,
offload_messages: "OffloadMessages" = None,
) -> bool:
data_db = backend_db or data_db
backend_table = backend_table or table_name
Expand Down Expand Up @@ -572,9 +593,10 @@ def check_fn(mt):
elif offload_pattern == OFFLOAD_PATTERN_100_0:
offload_type = OFFLOAD_TYPE_FULL
incremental_key = None
check_fn = lambda mt: bool(
not mt.incremental_key and not mt.incremental_high_value
)

def check_fn(mt):
return bool(not mt.incremental_key and not mt.incremental_high_value)

elif offload_pattern == OFFLOAD_PATTERN_100_10:
offload_type = OFFLOAD_TYPE_FULL

Expand Down Expand Up @@ -625,11 +647,15 @@ def check_fn(mt):
):
return False

if text_in_messages(messages, MISSING_ROWS_IMPORTED_WARNING):
if offload_messages and text_in_messages(
offload_messages, MISSING_ROWS_IMPORTED_WARNING, messages
):
return False

if split_type:
if not offload_rowsource_split_type_assertion(messages, story_id, split_type):
if split_type and offload_messages:
if not offload_rowsource_split_type_assertion(
offload_messages, split_type, messages
):
return False

return True
9 changes: 8 additions & 1 deletion tests/integration/scenarios/scenario_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,11 @@ def run_offload(
execution_id = ExecutionId()
messages = OffloadMessages.from_options(
orchestration_config,
log_fh=parent_messages.get_log_fh(),
# log_fh=parent_messages.get_log_fh(),
execution_id=execution_id,
command_type=orchestration_constants.COMMAND_OFFLOAD,
)
messages.init_log(orchestration_config.log_path, "test_offload")
try:
config_overrides = get_config_overrides(config_overrides, orchestration_config)
messages_override = None if no_messages_override else messages
Expand Down Expand Up @@ -103,6 +104,12 @@ def run_offload(
else:
parent_messages.log(traceback.format_exc())
raise
finally:
try:
messages.close_log()
except Exception:
pass

return messages


Expand Down
2 changes: 1 addition & 1 deletion tests/integration/scenarios/setup_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def get_sales_based_fact_partition_list(
"""
if not frontend_api:
return []
if type(hv_string_list) == str:
if isinstance(hv_string_list, str):
hv_string_list = [hv_string_list]

partitions = frontend_api.frontend_table_partition_list(
Expand Down
39 changes: 22 additions & 17 deletions tests/integration/scenarios/test_column_controls.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
data_db_name,
load_db_name,
)
from goe.offload.offload_messages import VVERBOSE
from goe.offload.offload_source_table import (
DATA_SAMPLE_SIZE_AUTO,
DATETIME_STATS_SAMPLING_OPT_ACTION_TEXT,
Expand All @@ -60,6 +59,7 @@
backend_column_exists,
frontend_column_exists,
hint_text_in_log,
text_in_messages,
)
from tests.integration.scenarios.scenario_runner import (
ScenarioRunnerException,
Expand Down Expand Up @@ -138,10 +138,6 @@ def load_db(schema, config):
return load_db


def log_test_marker(messages, test_id):
messages.log(test_id, detail=VVERBOSE)


def cast_validation_exception_text(backend_api):
"""Get expected exception text when cast validation fails"""
if backend_api:
Expand Down Expand Up @@ -501,6 +497,7 @@ def samp_date_assertion(
backend_api,
frontend_api,
messages,
offload_messages,
data_db,
backend_name,
from_stats=True,
Expand Down Expand Up @@ -573,7 +570,9 @@ def samp_date_assertion(
% ("good_ts", expected_good_data_type)
)
if expected_bad_data_type != expected_good_data_type:
text_match = messages.text_in_messages(DATETIME_STATS_SAMPLING_OPT_ACTION_TEXT)
text_match = text_in_messages(
offload_messages, DATETIME_STATS_SAMPLING_OPT_ACTION_TEXT, messages
)
if text_match != from_stats:
raise ScenarioRunnerException(
"text_match != from_stats: %s != %s" % (text_match, from_stats)
Expand Down Expand Up @@ -811,7 +810,7 @@ def test_numeric_controls(config, schema, data_db):
"decimal_padding_digits": 2,
"execute": True,
}
run_offload(options, config, messages)
offload_messages = run_offload(options, config, messages)
nums_assertion(
config,
frontend_api,
Expand All @@ -834,9 +833,8 @@ def test_numeric_controls(config, schema, data_db):
"verify_row_count": False,
"execute": True,
}
log_test_marker(messages, f"{id}:samp1")
run_offload(options, config, messages)
assert hint_text_in_log(messages, config, 0, f"{id}:samp1")
offload_messages = run_offload(options, config, messages)
assert hint_text_in_log(offload_messages, config, 0)

# Offload Dimension With Parallel Sampling=3.
# Runs with --no-verify to remove risk of verification having a PARALLEL hint.
Expand All @@ -849,9 +847,8 @@ def test_numeric_controls(config, schema, data_db):
"verify_row_count": False,
"execute": True,
}
log_test_marker(messages, f"{id}:samp2")
run_offload(options, config, messages)
assert hint_text_in_log(messages, config, 3, f"{id}:samp2")
offload_messages = run_offload(options, config, messages)
assert hint_text_in_log(offload_messages, config, 3)

# Offload Dimension with number overflow (expect to fail).
if config.target not in [offload_constants.DBTYPE_BIGQUERY]:
Expand Down Expand Up @@ -1016,9 +1013,15 @@ def test_date_sampling(config, schema, data_db):
"create_backend_db": True,
"execute": True,
}
run_offload(options, config, messages)
offload_messages = run_offload(options, config, messages)
samp_date_assertion(
config, backend_api, frontend_api, messages, data_db, DATE_SDIM
config,
backend_api,
frontend_api,
messages,
offload_messages,
data_db,
DATE_SDIM,
)

# Remove stats from DATE_SDIM.
Expand All @@ -1042,12 +1045,13 @@ def test_date_sampling(config, schema, data_db):
"reset_backend_table": True,
"execute": True,
}
run_offload(options, config, messages)
offload_messages = run_offload(options, config, messages)
samp_date_assertion(
config,
backend_api,
frontend_api,
messages,
offload_messages,
data_db,
DATE_SDIM,
from_stats=False,
Expand Down Expand Up @@ -1076,12 +1080,13 @@ def test_date_sampling(config, schema, data_db):
"reset_backend_table": True,
"execute": True,
}
run_offload(options, config, messages)
offload_messages = run_offload(options, config, messages)
samp_date_assertion(
config,
backend_api,
frontend_api,
messages,
offload_messages,
data_db,
DATE_SDIM,
from_stats=False,
Expand Down
Loading
Loading