-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstock_data_run.py
79 lines (60 loc) · 2.04 KB
/
stock_data_run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import numpy as np
import pandas as pd
import stock_data_nodes
from hamilton import driver, base, telemetry
from hamilton.execution import executors
from hamilton.function_modifiers import value, source
import json
telemetry.disable_telemetry()
# Ultimate objectives
# 1. easy way to define multi metrics from single source table -- done
# 2. custom naming of final metric names or default names -- done
# 3. Parallelizable -- done
# 4. Pending
# * Rolling stats
# * Weighted stats
inputs = { # load from actuals or wherever -- this is our initial data we use as input.
# Note: these do not have to be all series, they could be scalar inputs.
'start_date': '2023-10-30', # Monday
'end_date': '2023-11-10',
'resample': 'W',
# 'aggs': {'price': ['mean', 'last'], 'volume': ['sum']}, # default column naming
'aggs': {
# colunm_name = ('source_column', 'pandas allowed stat func as string or lambda')
'openp': ('price', 'first'),
'highp': ('price', 'max'),
'lowp': ('price', 'min'),
'closep': ('price', 'last'),
'volume': ('volume', 'sum'),
'adv': ('volume', lambda x: np.mean(x)), # of course, ('volume', 'mean') as usual works too
'mdv': ('volume', 'median'),
},
# filter for specific tickers
#'tickers': ['B'],
}
config = {}
dr = (
driver.Builder()
.with_modules(stock_data_nodes)
.enable_dynamic_execution(allow_experimental_mode=True)
# .with_local_executor(executors.SynchronousLocalTaskExecutor())
.with_config(config)
.build()
)
output_columns = [
#'stock_data',
'final_stats',
#'ticker_df',
#'ticker_list'
#'adv_5d'
]
out = dr.execute(output_columns, inputs=inputs)
print('\n**************************\nOutput:')
# print(out)
print(out['final_stats'])
out['final_stats']['from'].dt.day_name()
run_it = False
if run_it:
ticker_df = my_hamilton_nodes.stock_data().query("ticker=='A'")
cache_file = '/home/user/hamilton_test.dot'
dr.visualize_execution(output_columns, cache_file, {'format': 'png'})