Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Dagster assets to compute exchange rates #29323

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

rafaeelaudibert
Copy link
Member

@rafaeelaudibert rafaeelaudibert commented Feb 27, 2025

You can understand what's happening here much better if you read on README.md, but here's an AI-generated TL;DR:

The workflow consists of two main components: a fetcher that retrieves the data from the API and a storage component that persists it to ClickHouse. It's implemented using Dagster's asset-based approach with daily partitions starting from 2025-01-01. In the future we might want to introduce a separate API job to guarantee we'll have accurate data even in case one of the APIs fail

We're adding this to provide reliable currency conversion capabilities throughout the application. The system automatically reloads a ClickHouse dictionary after inserting new data, ensuring fast lookups for exchange rates. The workflow includes both daily jobs (for historical rates) and hourly jobs (for current day rates) to maintain up-to-date data.

The implementation includes comprehensive error handling for API failures and database operations and returns rich metadata for monitoring in the Dagster UI.


I can see it properly in the UI, but it doesn't run for some reason, I think it's related to my WSL+Windows setup, will try on MacOS soon
image

If I run through the CLI, however, it finishes properly! 🎉
image

Comment on lines +66 to +67
cluster.map_hosts_by_role(insert, NodeRole.DATA).result()
cluster.map_hosts_by_role(reload_dict, NodeRole.DATA).result()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@PostHog/clickhouse is this reasonable? Or should I be running in different places? I need to insert data and then once all data has been inserted I need to reload the dictionaries

class ExchangeRateConfig(dagster.Config):
"""Configuration for the exchange rate API."""

app_id: str = os.environ.get("OPEN_EXCHANGE_RATES_APP_ID", "")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@PostHog/team-infra Who can help me set this up on our Dagster deployment in all our environments?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add the environment variables in the values files, here is the place for US prod:

https://github.com/PostHog/charts/blob/main/config/dagster/prod-us.yaml#L54

But you will have to add it in all envs here:

https://github.com/PostHog/charts/tree/main/config/dagster

Then when the PR is merged they should be added.

@rafaeelaudibert rafaeelaudibert force-pushed the add-dag-to-compute-historical-currency-rate branch from 0f1c291 to 5f28558 Compare February 27, 2025 20:54
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just organized this slightly better, no random schedule definitions and sensors in the main definitions.py file

@rafaeelaudibert rafaeelaudibert marked this pull request as draft February 27, 2025 21:07
@rafaeelaudibert rafaeelaudibert force-pushed the add-dag-to-compute-historical-currency-rate branch from 5f28558 to ebade53 Compare February 27, 2025 22:05
@rafaeelaudibert rafaeelaudibert marked this pull request as ready for review February 27, 2025 22:08
You can understand what's happening here much better if you read on
README.md, but here's an AI-generated TL:DR;

The workflow consists of two main components: a fetcher that retrieves the data from the API and a storage component that persists it to ClickHouse. It's implemented using Dagster's asset-based approach with daily partitions starting from 2025-01-01.

We're adding this to provide reliable currency conversion capabilities throughout the application. The system automatically reloads a ClickHouse dictionary after inserting new data, ensuring fast lookups for exchange rates. The workflow includes both daily jobs (for historical rates) and hourly jobs (for current day rates) to maintain up-to-date data.

The implementation includes comprehensive error handling for API failures and database operations, and returns rich metadata for monitoring in the Dagster UI.
@rafaeelaudibert rafaeelaudibert force-pushed the add-dag-to-compute-historical-currency-rate branch from ebade53 to 84cf829 Compare February 27, 2025 22:12
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Summary

Added a Dagster workflow for fetching and storing exchange rates from the Open Exchange Rates API to ClickHouse, providing currency conversion capabilities throughout PostHog.

  • Fixed bug in exchange_rate.py where daily_exchange_rates_schedule incorrectly uses datetime.timedelta instead of just timedelta
  • Discrepancy between README and implementation: README mentions fetch_exchange_rates_core op which doesn't exist in the code
  • Mismatch between asset names in documentation (fetch_exchange_rates) vs implementation (exchange_rates)
  • Partitioning start date of 2025-01-01 seems unusual for historical exchange rate data
  • Schedules defined in exchange_rate.py are properly registered in definitions.py but may not be running as expected

9 file(s) reviewed, 14 comment(s)
Edit PR Review Bot Settings | Greptile

ExchangeRateConfig,
)

from .clickhouse import store_exchange_rates_in_clickhouse
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Missing trailing comma at the end of the import statement. While not required by Python syntax, adding a trailing comma would maintain consistency with the style used in the previous import statement.

Comment on lines +36 to +37
exchange_rate.daily_exchange_rates_job,
exchange_rate.hourly_exchange_rates_job,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: In exchange_rate.py, there's a datetime import issue in the daily_exchange_rates_schedule function. It uses datetime.timedelta but imports datetime as a module, which will cause a runtime error.


# Prepare values for batch insert
# Use toDate() to cast the string date to a ClickHouse Date type
values = [f"(toDate('{row['date']}'), '{row['currency']}', {row['rate']})" for row in rows]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: SQL injection risk: currency values from external API are directly inserted into SQL without proper escaping. Consider using parameterized queries or proper escaping for currency values.

Suggested change
values = [f"(toDate('{row['date']}'), '{row['currency']}', {row['rate']})" for row in rows]
values = [(row['date'], row['currency'], row['rate']) for row in rows]

Comment on lines +66 to +67
cluster.map_hosts_by_role(insert, NodeRole.DATA).result()
cluster.map_hosts_by_role(reload_dict, NodeRole.DATA).result()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Potential issue: if insert fails on some hosts but succeeds on others, you'll still attempt to reload the dictionary on all hosts. Consider checking the insert result before proceeding with dictionary reload.

Comment on lines +56 to +57
except Exception as e:
context.log.warning(f"Failed to insert exchange rates: {e}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Only logging a warning on insert failure means the asset will appear to succeed even if data wasn't stored. Consider raising the exception to properly indicate failure.

Comment on lines +135 to +172
@patch("clickhouse_driver.Client")
def test_store_exchange_rates_in_clickhouse_reload_dict(
mock_client_class, mock_exchange_rates, mock_clickhouse_cluster
):
"""Test the dictionary reload logic after storing exchange rates."""
# Create a context with a partition key (date)
date_str = "2025-01-01"
context = build_asset_context(partition_key=date_str)

# Setup to capture the function passed to map_hosts_by_role
call_count = 0

def capture_fn(fn, role, *args, **kwargs):
nonlocal call_count
call_count += 1
# The second call should be the dictionary reload
if call_count == 2:
fn(mock_client_class)
return MagicMock()

mock_clickhouse_cluster.map_hosts_by_role.side_effect = capture_fn

# Call the asset function
store_exchange_rates_in_clickhouse(
context=context,
exchange_rates=mock_exchange_rates,
cluster=mock_clickhouse_cluster,
)

# Verify the dictionary reload was called
assert mock_client_class.sync_execute.call_count >= 1

# Check that the reload dictionary command was executed
reload_call = mock_client_class.sync_execute.call_args_list[-1]
reload_sql = reload_call[0][0]

assert "SYSTEM RELOAD DICTIONARY exchange_rate_dict" in reload_sql

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: The test only verifies the reload dictionary command was executed but doesn't test the first call (insert). Consider adding a similar check for the first call to ensure both operations are properly tested.

Comment on lines +174 to +204
@patch("clickhouse_driver.Client")
def test_store_exchange_rates_in_clickhouse_exception_handling(
mock_client_class, mock_exchange_rates, mock_clickhouse_cluster
):
"""Test exception handling during ClickHouse operations."""
# Create a context with a partition key (date)
date_str = "2025-01-01"
context = build_asset_context(partition_key=date_str)

# Setup the client to raise an exception
mock_client_class.sync_execute.side_effect = Exception("Test exception")

# Setup to capture the function passed to map_hosts_by_role
def capture_fn(fn, role, *args, **kwargs):
# Call the function with our mock client to test exception handling
fn(mock_client_class)
return MagicMock()

mock_clickhouse_cluster.map_hosts_by_role.side_effect = capture_fn

# Call the asset function - it should not raise an exception
result = store_exchange_rates_in_clickhouse(
context=context,
exchange_rates=mock_exchange_rates,
cluster=mock_clickhouse_cluster,
)

# Verify the function handled the exception and returned a result
assert result is not None
assert result.metadata["date"].value == date_str

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: The exception handling test only verifies that the function doesn't crash, but doesn't check if the warning was properly logged. Consider adding a check for context.log.warning being called.

Comment on lines +77 to +78
with pytest.raises(Exception) as exchange_err:
exchange_rates(mock_context, mock_config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider using a more specific exception type instead of the generic Exception. This would make the error handling more precise and the tests more robust.

Comment on lines +109 to +110
with pytest.raises(Exception) as exchange_info:
exchange_rates(mock_context, mock_config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Similar to the API error test, consider using a more specific exception type here for better error handling.

result = exchange_rates(context, mock_config)

# Verify the result
assert result == mock_api_response["rates"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: This test could be more thorough by also checking specific rate values like in the first test (lines 58-59).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants