-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add Dagster assets to compute exchange rates #29323
base: master
Are you sure you want to change the base?
feat: Add Dagster assets to compute exchange rates #29323
Conversation
cluster.map_hosts_by_role(insert, NodeRole.DATA).result() | ||
cluster.map_hosts_by_role(reload_dict, NodeRole.DATA).result() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@PostHog/clickhouse is this reasonable? Or should I be running in different places? I need to insert data and then once all data has been inserted I need to reload the dictionaries
class ExchangeRateConfig(dagster.Config): | ||
"""Configuration for the exchange rate API.""" | ||
|
||
app_id: str = os.environ.get("OPEN_EXCHANGE_RATES_APP_ID", "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@PostHog/team-infra Who can help me set this up on our Dagster deployment in all our environments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can add the environment variables in the values files, here is the place for US prod:
https://github.com/PostHog/charts/blob/main/config/dagster/prod-us.yaml#L54
But you will have to add it in all envs here:
https://github.com/PostHog/charts/tree/main/config/dagster
Then when the PR is merged they should be added.
0f1c291
to
5f28558
Compare
dags/person_overrides.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just organized this slightly better, no random schedule definitions and sensors in the main definitions.py
file
5f28558
to
ebade53
Compare
You can understand what's happening here much better if you read on README.md, but here's an AI-generated TL:DR; The workflow consists of two main components: a fetcher that retrieves the data from the API and a storage component that persists it to ClickHouse. It's implemented using Dagster's asset-based approach with daily partitions starting from 2025-01-01. We're adding this to provide reliable currency conversion capabilities throughout the application. The system automatically reloads a ClickHouse dictionary after inserting new data, ensuring fast lookups for exchange rates. The workflow includes both daily jobs (for historical rates) and hourly jobs (for current day rates) to maintain up-to-date data. The implementation includes comprehensive error handling for API failures and database operations, and returns rich metadata for monitoring in the Dagster UI.
ebade53
to
84cf829
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR Summary
Added a Dagster workflow for fetching and storing exchange rates from the Open Exchange Rates API to ClickHouse, providing currency conversion capabilities throughout PostHog.
- Fixed bug in
exchange_rate.py
wheredaily_exchange_rates_schedule
incorrectly usesdatetime.timedelta
instead of justtimedelta
- Discrepancy between README and implementation: README mentions
fetch_exchange_rates_core
op which doesn't exist in the code - Mismatch between asset names in documentation (
fetch_exchange_rates
) vs implementation (exchange_rates
) - Partitioning start date of 2025-01-01 seems unusual for historical exchange rate data
- Schedules defined in
exchange_rate.py
are properly registered indefinitions.py
but may not be running as expected
9 file(s) reviewed, 14 comment(s)
Edit PR Review Bot Settings | Greptile
ExchangeRateConfig, | ||
) | ||
|
||
from .clickhouse import store_exchange_rates_in_clickhouse |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Missing trailing comma at the end of the import statement. While not required by Python syntax, adding a trailing comma would maintain consistency with the style used in the previous import statement.
exchange_rate.daily_exchange_rates_job, | ||
exchange_rate.hourly_exchange_rates_job, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: In exchange_rate.py, there's a datetime import issue in the daily_exchange_rates_schedule function. It uses datetime.timedelta but imports datetime as a module, which will cause a runtime error.
|
||
# Prepare values for batch insert | ||
# Use toDate() to cast the string date to a ClickHouse Date type | ||
values = [f"(toDate('{row['date']}'), '{row['currency']}', {row['rate']})" for row in rows] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: SQL injection risk: currency values from external API are directly inserted into SQL without proper escaping. Consider using parameterized queries or proper escaping for currency values.
values = [f"(toDate('{row['date']}'), '{row['currency']}', {row['rate']})" for row in rows] | |
values = [(row['date'], row['currency'], row['rate']) for row in rows] |
cluster.map_hosts_by_role(insert, NodeRole.DATA).result() | ||
cluster.map_hosts_by_role(reload_dict, NodeRole.DATA).result() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Potential issue: if insert fails on some hosts but succeeds on others, you'll still attempt to reload the dictionary on all hosts. Consider checking the insert result before proceeding with dictionary reload.
except Exception as e: | ||
context.log.warning(f"Failed to insert exchange rates: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Only logging a warning on insert failure means the asset will appear to succeed even if data wasn't stored. Consider raising the exception to properly indicate failure.
@patch("clickhouse_driver.Client") | ||
def test_store_exchange_rates_in_clickhouse_reload_dict( | ||
mock_client_class, mock_exchange_rates, mock_clickhouse_cluster | ||
): | ||
"""Test the dictionary reload logic after storing exchange rates.""" | ||
# Create a context with a partition key (date) | ||
date_str = "2025-01-01" | ||
context = build_asset_context(partition_key=date_str) | ||
|
||
# Setup to capture the function passed to map_hosts_by_role | ||
call_count = 0 | ||
|
||
def capture_fn(fn, role, *args, **kwargs): | ||
nonlocal call_count | ||
call_count += 1 | ||
# The second call should be the dictionary reload | ||
if call_count == 2: | ||
fn(mock_client_class) | ||
return MagicMock() | ||
|
||
mock_clickhouse_cluster.map_hosts_by_role.side_effect = capture_fn | ||
|
||
# Call the asset function | ||
store_exchange_rates_in_clickhouse( | ||
context=context, | ||
exchange_rates=mock_exchange_rates, | ||
cluster=mock_clickhouse_cluster, | ||
) | ||
|
||
# Verify the dictionary reload was called | ||
assert mock_client_class.sync_execute.call_count >= 1 | ||
|
||
# Check that the reload dictionary command was executed | ||
reload_call = mock_client_class.sync_execute.call_args_list[-1] | ||
reload_sql = reload_call[0][0] | ||
|
||
assert "SYSTEM RELOAD DICTIONARY exchange_rate_dict" in reload_sql | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: The test only verifies the reload dictionary command was executed but doesn't test the first call (insert). Consider adding a similar check for the first call to ensure both operations are properly tested.
@patch("clickhouse_driver.Client") | ||
def test_store_exchange_rates_in_clickhouse_exception_handling( | ||
mock_client_class, mock_exchange_rates, mock_clickhouse_cluster | ||
): | ||
"""Test exception handling during ClickHouse operations.""" | ||
# Create a context with a partition key (date) | ||
date_str = "2025-01-01" | ||
context = build_asset_context(partition_key=date_str) | ||
|
||
# Setup the client to raise an exception | ||
mock_client_class.sync_execute.side_effect = Exception("Test exception") | ||
|
||
# Setup to capture the function passed to map_hosts_by_role | ||
def capture_fn(fn, role, *args, **kwargs): | ||
# Call the function with our mock client to test exception handling | ||
fn(mock_client_class) | ||
return MagicMock() | ||
|
||
mock_clickhouse_cluster.map_hosts_by_role.side_effect = capture_fn | ||
|
||
# Call the asset function - it should not raise an exception | ||
result = store_exchange_rates_in_clickhouse( | ||
context=context, | ||
exchange_rates=mock_exchange_rates, | ||
cluster=mock_clickhouse_cluster, | ||
) | ||
|
||
# Verify the function handled the exception and returned a result | ||
assert result is not None | ||
assert result.metadata["date"].value == date_str | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: The exception handling test only verifies that the function doesn't crash, but doesn't check if the warning was properly logged. Consider adding a check for context.log.warning being called.
with pytest.raises(Exception) as exchange_err: | ||
exchange_rates(mock_context, mock_config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Consider using a more specific exception type instead of the generic Exception. This would make the error handling more precise and the tests more robust.
with pytest.raises(Exception) as exchange_info: | ||
exchange_rates(mock_context, mock_config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Similar to the API error test, consider using a more specific exception type here for better error handling.
result = exchange_rates(context, mock_config) | ||
|
||
# Verify the result | ||
assert result == mock_api_response["rates"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: This test could be more thorough by also checking specific rate values like in the first test (lines 58-59).
You can understand what's happening here much better if you read on README.md, but here's an AI-generated TL;DR:
The workflow consists of two main components: a fetcher that retrieves the data from the API and a storage component that persists it to ClickHouse. It's implemented using Dagster's asset-based approach with daily partitions starting from 2025-01-01. In the future we might want to introduce a separate API job to guarantee we'll have accurate data even in case one of the APIs fail
We're adding this to provide reliable currency conversion capabilities throughout the application. The system automatically reloads a ClickHouse dictionary after inserting new data, ensuring fast lookups for exchange rates. The workflow includes both daily jobs (for historical rates) and hourly jobs (for current day rates) to maintain up-to-date data.
The implementation includes comprehensive error handling for API failures and database operations and returns rich metadata for monitoring in the Dagster UI.
I can see it properly in the UI, but it doesn't run for some reason, I think it's related to my WSL+Windows setup, will try on MacOS soon

If I run through the CLI, however, it finishes properly! 🎉
