diff --git a/c/Cargo.toml b/c/Cargo.toml index 70f0f0c11..825ca3c2d 100644 --- a/c/Cargo.toml +++ b/c/Cargo.toml @@ -22,6 +22,7 @@ tokio = { version = "1.19.2", features = ["rt-multi-thread"] } once_cell = "1.12.0" parking_lot = "0.12.1" time = "0.3.9" +tracing = "0.1.34" [build-dependencies] cbindgen = "0.24.3" diff --git a/c/csrc/include/longport.h b/c/csrc/include/longport.h index e5733612c..0a300d64a 100644 --- a/c/csrc/include/longport.h +++ b/c/csrc/include/longport.h @@ -3703,6 +3703,7 @@ extern "C" { * `realtime`) * - `LONGPORT_PRINT_QUOTE_PACKAGES` - Print quote packages when connected, * `true` or `false` (Default: `true`) + * - `LONGPORT_LOG_PATH` - Set the path of the log files (Default: `no logs`) */ struct lb_config_t *lb_config_from_env(struct lb_error_t **error); @@ -3715,7 +3716,8 @@ struct lb_config_t *lb_config_new(const char *app_key, const enum lb_language_t *language, bool enable_overight, const enum lb_push_candlestick_mode_t *push_candlestick_mode, - bool enable_print_quote_packages); + bool enable_print_quote_packages, + const char *log_path); /** * Free the config object diff --git a/c/src/config.rs b/c/src/config.rs index 286a932a7..63f31fb97 100644 --- a/c/src/config.rs +++ b/c/src/config.rs @@ -37,6 +37,7 @@ pub struct CConfig(pub(crate) Arc); /// `realtime`) /// - `LONGPORT_PRINT_QUOTE_PACKAGES` - Print quote packages when connected, /// `true` or `false` (Default: `true`) +/// - `LONGPORT_LOG_PATH` - Set the path of the log files (Default: `no logs`) #[no_mangle] pub unsafe extern "C" fn lb_config_from_env(error: *mut *mut CError) -> *mut CConfig { match Config::from_env() { @@ -63,6 +64,7 @@ pub unsafe extern "C" fn lb_config_new( enable_overight: bool, push_candlestick_mode: *const CPushCandlestickMode, enable_print_quote_packages: bool, + log_path: *const c_char, ) -> *mut CConfig { let app_key = CStr::from_ptr(app_key).to_str().expect("invalid app key"); let app_secret = CStr::from_ptr(app_secret) @@ -109,6 +111,10 @@ pub unsafe extern "C" fn lb_config_new( config = config.dont_print_quote_packages(); } + if !log_path.is_null() { + config = config.log_path(CStr::from_ptr(log_path).to_str().expect("invalid log path")); + } + Box::into_raw(Box::new(CConfig(Arc::new(config)))) } diff --git a/c/src/quote_context/context.rs b/c/src/quote_context/context.rs index 2c34cd191..f494099bd 100644 --- a/c/src/quote_context/context.rs +++ b/c/src/quote_context/context.rs @@ -2,6 +2,7 @@ use std::{ ffi::{c_void, CString}, os::raw::c_char, sync::Arc, + time::Instant, }; use longport::{ @@ -127,12 +128,24 @@ pub unsafe extern "C" fn lb_quote_context_new( .. } => { if let Some(callback) = &state.callbacks.quote { + let log_subscriber = ctx.ctx.log_subscriber(); + let _guard = + tracing::dispatcher::set_default(&log_subscriber.into()); + + let s = Instant::now(); + tracing::info!("begin call on_quote callback"); + let quote_owned: CPushQuoteOwned = (symbol, quote).into(); (callback.f)( Arc::as_ptr(&ctx), "e_owned.to_ffi_type(), callback.userdata, ); + + tracing::info!( + duration = ?s.elapsed(), + "after call on_quote callback" + ); } } PushEvent { @@ -141,12 +154,24 @@ pub unsafe extern "C" fn lb_quote_context_new( .. } => { if let Some(callback) = &state.callbacks.depth { + let log_subscriber = ctx.ctx.log_subscriber(); + let _guard = + tracing::dispatcher::set_default(&log_subscriber.into()); + + let s = Instant::now(); + tracing::info!("begin call on_depth callback"); + let depth_owned: CPushDepthOwned = (symbol, depth).into(); (callback.f)( Arc::as_ptr(&ctx), &depth_owned.to_ffi_type(), callback.userdata, ); + + tracing::info!( + duration = ?s.elapsed(), + "after call on_depth callback" + ); } } PushEvent { @@ -155,12 +180,24 @@ pub unsafe extern "C" fn lb_quote_context_new( .. } => { if let Some(callback) = &state.callbacks.brokers { + let log_subscriber = ctx.ctx.log_subscriber(); + let _guard = + tracing::dispatcher::set_default(&log_subscriber.into()); + + let s = Instant::now(); + tracing::info!("begin call on_brokers callback"); + let brokers_owned: CPushBrokersOwned = (symbol, brokers).into(); (callback.f)( Arc::as_ptr(&ctx), &brokers_owned.to_ffi_type(), callback.userdata, ); + + tracing::info!( + duration = ?s.elapsed(), + "after call on_brokers callback" + ); } } PushEvent { @@ -169,12 +206,24 @@ pub unsafe extern "C" fn lb_quote_context_new( .. } => { if let Some(callback) = &state.callbacks.trades { + let log_subscriber = ctx.ctx.log_subscriber(); + let _guard = + tracing::dispatcher::set_default(&log_subscriber.into()); + + let s = Instant::now(); + tracing::info!("begin call on_trades callback"); + let trades_owned: CPushTradesOwned = (symbol, trades).into(); (callback.f)( Arc::as_ptr(&ctx), &trades_owned.to_ffi_type(), callback.userdata, ); + + tracing::info!( + duration = ?s.elapsed(), + "after call on_trades callback" + ); } } PushEvent { @@ -183,6 +232,13 @@ pub unsafe extern "C" fn lb_quote_context_new( .. } => { if let Some(callback) = &state.callbacks.candlestick { + let log_subscriber = ctx.ctx.log_subscriber(); + let _guard = + tracing::dispatcher::set_default(&log_subscriber.into()); + + let s = Instant::now(); + tracing::info!("begin call on_candlestick callback"); + let candlestick_owned: CPushCandlestickOwned = (symbol, candlestick).into(); (callback.f)( @@ -190,6 +246,11 @@ pub unsafe extern "C" fn lb_quote_context_new( &candlestick_owned.to_ffi_type(), callback.userdata, ); + + tracing::info!( + duration = ?s.elapsed(), + "after call on_candlestick callback" + ); } } } diff --git a/c/src/trade_context/context.rs b/c/src/trade_context/context.rs index 7b28340f6..3e44427fc 100644 --- a/c/src/trade_context/context.rs +++ b/c/src/trade_context/context.rs @@ -1,4 +1,4 @@ -use std::{ffi::c_void, os::raw::c_char, sync::Arc}; +use std::{ffi::c_void, os::raw::c_char, sync::Arc, time::Instant}; use longport::{ trade::{ @@ -98,6 +98,13 @@ pub unsafe extern "C" fn lb_trade_context_new( match event { PushEvent::OrderChanged(order_changed) => { if let Some(callback) = &state.callbacks.order_changed { + let log_subscriber = ctx.ctx.log_subscriber(); + let _guard = + tracing::dispatcher::set_default(&log_subscriber.into()); + + let s = Instant::now(); + tracing::info!("begin call on_order_changed callback"); + let order_changed_owned: CPushOrderChangedOwned = order_changed.into(); (callback.f)( @@ -105,6 +112,11 @@ pub unsafe extern "C" fn lb_trade_context_new( &order_changed_owned.to_ffi_type(), callback.userdata, ); + + tracing::info!( + duration = ?s.elapsed(), + "after call on_order_changed callback" + ); } } } diff --git a/cpp/include/config.hpp b/cpp/include/config.hpp index a574e786f..bf7bd92e0 100644 --- a/cpp/include/config.hpp +++ b/cpp/include/config.hpp @@ -37,6 +37,8 @@ class Config * @param language Language identifer (Default: Language::EN) * @param push_candlestick_mode Push candlestick mode (Default: * PushCandlestickMode::Realtime) + * @param enable_print_quote_packages Print quote packages when connected + * (Default: true) */ Config(const std::string& app_key, const std::string& app_secret, @@ -48,7 +50,8 @@ class Config bool enable_overnight = false, const std::optional& push_candlestick_mode = std::nullopt, - bool enable_print_quote_packages = true); + bool enable_print_quote_packages = true, + const std::optional& log_path = std::nullopt); ~Config(); @@ -76,6 +79,7 @@ class Config /// `realtime`) /// - `LONGPORT_PRINT_QUOTE_PACKAGES` - Print quote packages when connected, /// `true` or `false` (Default: `true`) + /// - `LONGPORT_LOG_PATH` - Set the path of the log files (Default: `no logs`) static Status from_env(Config& config); /// Gets a new `access_token` diff --git a/cpp/src/config.cpp b/cpp/src/config.cpp index e520ad4a4..a510f8b92 100644 --- a/cpp/src/config.cpp +++ b/cpp/src/config.cpp @@ -30,7 +30,8 @@ Config::Config(const std::string& app_key, const std::optional& language, bool enable_overnight, const std::optional& push_candlestick_mode, - bool enable_print_quote_packages) + bool enable_print_quote_packages, + const std::optional& log_path) { lb_language_t c_language; if (language) { @@ -52,7 +53,8 @@ Config::Config(const std::string& app_key, language ? &c_language : nullptr, enable_overnight, push_candlestick_mode ? &c_push_candlestick_mode : nullptr, - enable_print_quote_packages); + enable_print_quote_packages, + log_path ? log_path->c_str() : nullptr); } Config::~Config() diff --git a/java/javasrc/src/main/java/com/longport/Config.java b/java/javasrc/src/main/java/com/longport/Config.java index c4d93f410..b25dc14f6 100644 --- a/java/javasrc/src/main/java/com/longport/Config.java +++ b/java/javasrc/src/main/java/com/longport/Config.java @@ -39,6 +39,7 @@ public class Config implements AutoCloseable { * `realtime`) * - `LONGPORT_PRINT_QUOTE_PACKAGES` - Print quote packages when connected, * `true` or `false` (Default: `true`) + * - `LONGPORT_LOG_PATH` - Set the path of the log files (Default: `no logs`) * * @return Config object * @throws OpenApiException If an error occurs diff --git a/java/javasrc/src/main/java/com/longport/ConfigBuilder.java b/java/javasrc/src/main/java/com/longport/ConfigBuilder.java index 4186971e2..587e0f625 100644 --- a/java/javasrc/src/main/java/com/longport/ConfigBuilder.java +++ b/java/javasrc/src/main/java/com/longport/ConfigBuilder.java @@ -14,6 +14,7 @@ public class ConfigBuilder { private boolean enableOvernight; private PushCandlestickMode pushCandlestickMode; private boolean enablePrintQuotePackages; + private String logPath; /** * Create a `Config` object builder @@ -113,6 +114,17 @@ public ConfigBuilder dontPrintQuotePackages() { return this; } + /** + * Set the path of the log files. + * + * @param path The path of the log files (Default: `no logs`) + * @return this object + */ + public ConfigBuilder logPath(String path) { + this.logPath = path; + return this; + } + /** * Build a Config object * @@ -122,6 +134,6 @@ public ConfigBuilder dontPrintQuotePackages() { public Config build() throws OpenApiException { return new Config( SdkNative.newConfig(appKey, appSecret, accessToken, httpUrl, quoteWsUrl, tradeWsUrl, language, - enableOvernight, pushCandlestickMode, enablePrintQuotePackages)); + enableOvernight, pushCandlestickMode, enablePrintQuotePackages, logPath)); } } diff --git a/java/javasrc/src/main/java/com/longport/SdkNative.java b/java/javasrc/src/main/java/com/longport/SdkNative.java index a93280357..0aa35a0e1 100644 --- a/java/javasrc/src/main/java/com/longport/SdkNative.java +++ b/java/javasrc/src/main/java/com/longport/SdkNative.java @@ -26,7 +26,7 @@ public class SdkNative { public static native long newConfig(String appKey, String appSecret, String accessToken, String httpUrl, String quoteWsUrl, String tradeWsUrl, Language language, boolean enableOvernight, - PushCandlestickMode mode, boolean enablePrintQuotePackages); + PushCandlestickMode mode, boolean enablePrintQuotePackages, String logPath); public static native long newConfigFromEnv(); diff --git a/java/src/config.rs b/java/src/config.rs index 34746b99a..c2bbc5912 100644 --- a/java/src/config.rs +++ b/java/src/config.rs @@ -22,6 +22,7 @@ pub extern "system" fn Java_com_longport_SdkNative_newConfig( enable_overnight: jboolean, push_candlestick_mode: JObject, enable_print_quote_packages: jboolean, + log_path: JString, ) -> jlong { jni_result(&mut env, 0, |env| { let app_key = String::from_jvalue(env, app_key.into())?; @@ -33,6 +34,7 @@ pub extern "system" fn Java_com_longport_SdkNative_newConfig( let language = >::from_jvalue(env, language.into())?; let push_candlestick_mode = >::from_jvalue(env, push_candlestick_mode.into())?; + let log_path = >::from_jvalue(env, log_path.into())?; let mut config = Config::new(app_key, app_secret, access_token); @@ -57,6 +59,9 @@ pub extern "system" fn Java_com_longport_SdkNative_newConfig( if enable_print_quote_packages == 0 { config = config.dont_print_quote_packages(); } + if let Some(log_path) = log_path { + config = config.log_path(log_path); + } Ok(Box::into_raw(Box::new(config)) as jlong) }) diff --git a/nodejs/index.d.ts b/nodejs/index.d.ts index e818f22fe..17c5b9168 100644 --- a/nodejs/index.d.ts +++ b/nodejs/index.d.ts @@ -34,6 +34,8 @@ export interface ConfigParams { * (default: true) */ enablePrintQuotePackages: boolean + /** Set the path of the log files (Default: `no logs`) */ + logPath?: string } /** An request to create a watchlist group */ export interface CreateWatchlistGroup { @@ -2048,6 +2050,10 @@ export class PushQuote { get tradeStatus(): TradeStatus /** Trade session */ get tradeSession(): TradeSession + /** Increase volume between pushes */ + get currentVolume(): number + /** Increase turnover between pushes */ + get currentTurnover(): Decimal } /** Push real-time depth */ export class PushDepth { diff --git a/nodejs/src/config.rs b/nodejs/src/config.rs index 1ad50dc31..80ecffb8a 100644 --- a/nodejs/src/config.rs +++ b/nodejs/src/config.rs @@ -33,6 +33,8 @@ pub struct ConfigParams { /// Enable printing the opened quote packages when connected to the server /// (default: true) pub enable_print_quote_packages: bool, + /// Set the path of the log files (Default: `no logs`) + pub log_path: Option, } /// Configuration for LongPort sdk @@ -75,6 +77,10 @@ impl Config { config = config.dont_print_quote_packages(); } + if let Some(log_path) = params.log_path { + config = config.log_path(log_path); + } + Self(config) } diff --git a/python/Makefile.toml b/python/Makefile.toml index 8b9a060a1..ad7658db1 100644 --- a/python/Makefile.toml +++ b/python/Makefile.toml @@ -13,7 +13,7 @@ args = ["install", "maturin==1.7.4"] command = "pip" args = [ "install", - "target/wheels/longport-2.1.6-cp311-cp311-win_amd64.whl", + "target/wheels/longport-2.1.7-cp311-cp311-win_amd64.whl", "-I", ] dependencies = ["python"] diff --git a/python/pysrc/longport/openapi.pyi b/python/pysrc/longport/openapi.pyi index aa1b244ba..f8ad31573 100644 --- a/python/pysrc/longport/openapi.pyi +++ b/python/pysrc/longport/openapi.pyi @@ -104,6 +104,7 @@ class Config: enable_overnight: Enable overnight quote push_candlestick_mode: Push candlestick mode enable_print_quote_packages: Enable printing the opened quote packages when connected to the server + log_path: Set the path of the log files """ def __init__( @@ -118,6 +119,7 @@ class Config: enable_overnight: bool = False, push_candlestick_mode: Type[PushCandlestickMode] = PushCandlestickMode.Realtime, enable_print_quote_packages: bool = True, + log_path: Optional[str] = None, ) -> None: ... @classmethod @@ -136,6 +138,9 @@ class Config: - `LONGPORT_QUOTE_WS_URL` - Quote websocket endpoint url - `LONGPORT_TRADE_WS_URL` - Trade websocket endpoint url - `LONGPORT_ENABLE_OVERNIGHT` - Enable overnight quote, `true` or `false` (Default: `false`) + - `LONGPORT_PUSH_CANDLESTICK_MODE` - `realtime` or `confirmed` (Default: `realtime`) + - `LONGPORT_PRINT_QUOTE_PACKAGES` - Print quote packages when connected, `true` or `false` (Default: `true`) + - `LONGPORT_LOG_PATH` - Set the path of the log files (Default: `no logs`) """ def refresh_access_token(self, expired_at: Optional[datetime] = None) -> str: diff --git a/python/src/config.rs b/python/src/config.rs index 6c970a319..cda7c7b20 100644 --- a/python/src/config.rs +++ b/python/src/config.rs @@ -23,6 +23,7 @@ impl Config { enable_overnight = false, push_candlestick_mode = PushCandlestickMode::Realtime, enable_print_quote_packages = true, + log_path = None, ))] #[allow(clippy::too_many_arguments)] fn py_new( @@ -36,6 +37,7 @@ impl Config { enable_overnight: bool, push_candlestick_mode: PushCandlestickMode, enable_print_quote_packages: bool, + log_path: Option, ) -> Self { let mut config = longport::Config::new(app_key, app_secret, access_token); @@ -59,6 +61,11 @@ impl Config { } config = config.push_candlestick_mode(push_candlestick_mode.into()); + + if let Some(log_path) = log_path { + config = config.log_path(log_path); + } + Self(config) } diff --git a/rust/Cargo.toml b/rust/Cargo.toml index c2c649601..73ab6ce85 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -26,7 +26,7 @@ tokio = { version = "1.18.2", features = [ "sync", "net", ] } -tokio-tungstenite = "0.21.0" +tokio-tungstenite = "0.26.1" rust_decimal = { version = "1.23.1", features = ["serde-with-str", "maths"] } num_enum = "0.7.0" prost = "0.13.3" @@ -49,6 +49,8 @@ dotenv = "0.15.0" http = "1.1.0" comfy-table = "7.1.1" itertools = "0.13.0" +tracing-subscriber = "0.3.19" +tracing-appender = "0.2.3" [dev-dependencies] tokio = { version = "1.18.2", features = ["rt-multi-thread"] } diff --git a/rust/crates/httpclient/src/config.rs b/rust/crates/httpclient/src/config.rs index 28fee6594..1245d1832 100644 --- a/rust/crates/httpclient/src/config.rs +++ b/rust/crates/httpclient/src/config.rs @@ -1,13 +1,10 @@ -use crate::{is_cn, HttpClientError}; - -const HTTP_URL: &str = "https://openapi.longportapp.com"; -const CN_HTTP_URL: &str = "https://openapi.longportapp.cn"; +use crate::HttpClientError; /// Configuration options for Http client #[derive(Debug, Clone)] pub struct HttpClientConfig { /// HTTP API url - pub(crate) http_url: String, + pub(crate) http_url: Option, /// App key pub(crate) app_key: String, /// App secret @@ -23,9 +20,8 @@ impl HttpClientConfig { app_secret: impl Into, access_token: impl Into, ) -> Self { - let http_url = if is_cn() { CN_HTTP_URL } else { HTTP_URL }; Self { - http_url: http_url.to_string(), + http_url: None, app_key: app_key.into(), app_secret: app_secret.into(), access_token: access_token.into(), @@ -43,28 +39,21 @@ impl HttpClientConfig { pub fn from_env() -> Result { let _ = dotenv::dotenv(); - let app_key = std::env::var("LONGBRIDGE_APP_KEY") - .or_else(|_| std::env::var("LONGPORT_APP_KEY")) - .map_err(|_| HttpClientError::MissingEnvVar { + let app_key = + std::env::var("LONGPORT_APP_KEY").map_err(|_| HttpClientError::MissingEnvVar { name: "LONGPORT_APP_KEY", })?; - let app_secret = std::env::var("LONGBRIDGE_APP_SECRET") - .or_else(|_| std::env::var("LONGPORT_APP_SECRET")) - .map_err(|_| HttpClientError::MissingEnvVar { + let app_secret = + std::env::var("LONGPORT_APP_SECRET").map_err(|_| HttpClientError::MissingEnvVar { name: "LONGPORT_APP_SECRET", })?; - let access_token = std::env::var("LONGBRIDGE_ACCESS_TOKEN") - .or_else(|_| std::env::var("LONGPORT_ACCESS_TOKEN")) - .map_err(|_| HttpClientError::MissingEnvVar { + let access_token = + std::env::var("LONGPORT_ACCESS_TOKEN").map_err(|_| HttpClientError::MissingEnvVar { name: "LONGPORT_ACCESS_TOKEN", })?; let mut config = Self::new(app_key, app_secret, access_token); - if let Ok(http_url) = - std::env::var("LONGBRIDGE_HTTP_URL").or_else(|_| std::env::var("LONGPORT_HTTP_URL")) - { - config.http_url = http_url; - } + config.http_url = std::env::var("LONGPORT_HTTP_URL").ok(); Ok(config) } @@ -75,7 +64,7 @@ impl HttpClientConfig { #[must_use] pub fn http_url(self, url: impl Into) -> Self { Self { - http_url: url.into(), + http_url: Some(url.into()), ..self } } diff --git a/rust/crates/httpclient/src/geo.rs b/rust/crates/httpclient/src/geo.rs index 0d5b4749d..f6ea23670 100644 --- a/rust/crates/httpclient/src/geo.rs +++ b/rust/crates/httpclient/src/geo.rs @@ -1,15 +1,11 @@ -use std::{ - cell::{Cell, RefCell}, - time::{Duration, Instant}, -}; +use std::{cell::RefCell, time::Duration}; // because we may call `is_cn` multi times in a short time, we cache the result thread_local! { - static LAST_PING: Cell> = const { Cell::new(None) }; - static LAST_PING_REGION: RefCell = const { RefCell::new(String::new()) }; + static REGION: RefCell> = const { RefCell::new(None) }; } -fn region() -> Option { +async fn region() -> Option { // check user defined REGION if let Ok(region) = std::env::var("LONGPORT_REGION") { return Some(region); @@ -17,22 +13,14 @@ fn region() -> Option { // check network connectivity // make sure block_on doesn't block the outer tokio runtime - let handler = std::thread::spawn(|| { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - rt.block_on(ping()) - }); - handler.join().unwrap() + ping().await } async fn ping() -> Option { - if let Some(last_ping) = LAST_PING.get() { - if last_ping.elapsed() < Duration::from_secs(60) { - return Some(LAST_PING_REGION.with_borrow(Clone::clone)); - } + if let Some(region) = REGION.with_borrow(Clone::clone) { + return Some(region.clone()); } + let Ok(resp) = reqwest::Client::new() .get("https://api.lbkrs.com/_ping") .timeout(Duration::from_secs(1)) @@ -45,26 +33,29 @@ async fn ping() -> Option { .headers() .get("X-Ip-Region") .and_then(|v| v.to_str().ok())?; - LAST_PING.set(Some(Instant::now())); - LAST_PING_REGION.replace(region.to_string()); + REGION.set(Some(region.to_string())); Some(region.to_string()) } /// do the best to guess whether the access point is in China Mainland or not -pub fn is_cn() -> bool { - region().is_some_and(|region| region.eq_ignore_ascii_case("CN")) +pub async fn is_cn() -> bool { + region() + .await + .is_some_and(|region| region.eq_ignore_ascii_case("CN")) } #[cfg(test)] mod tests { use super::*; - #[test] - fn test_var() { + #[tokio::test] + async fn test_var() { + is_cn().await; + std::env::set_var("LONGPORT_REGION", "CN"); - assert!(is_cn()); + assert!(is_cn().await); std::env::set_var("LONGPORT_REGION", "SG"); - assert!(!is_cn()); + assert!(!is_cn().await); } } diff --git a/rust/crates/httpclient/src/request.rs b/rust/crates/httpclient/src/request.rs index a5df40cc9..8477b6c38 100644 --- a/rust/crates/httpclient/src/request.rs +++ b/rust/crates/httpclient/src/request.rs @@ -1,4 +1,10 @@ -use std::{convert::Infallible, error::Error, marker::PhantomData, time::Duration}; +use std::{ + convert::Infallible, + error::Error, + fmt::Debug, + marker::PhantomData, + time::{Duration, Instant}, +}; use reqwest::{ header::{HeaderMap, HeaderName, HeaderValue}, @@ -7,11 +13,15 @@ use reqwest::{ use serde::{de::DeserializeOwned, Deserialize, Serialize}; use crate::{ + is_cn, signature::{signature, SignatureParams}, timestamp::Timestamp, HttpClient, HttpClientError, HttpClientResult, }; +const HTTP_URL: &str = "https://openapi.longportapp.com"; +const HTTP_URL_CN: &str = "https://openapi.longportapp.cn"; + const USER_AGENT: &str = "openapi-sdk"; const REQUEST_TIMEOUT: Duration = Duration::from_secs(30); const RETRY_COUNT: usize = 5; @@ -19,6 +29,7 @@ const RETRY_INITIAL_DELAY: Duration = Duration::from_millis(100); const RETRY_FACTOR: f32 = 2.0; /// A JSON payload +#[derive(Debug)] pub struct Json(pub T); /// Represents a type that can parse from payload @@ -31,7 +42,7 @@ pub trait FromPayload: Sized + Send + Sync + 'static { } /// Represents a type that can convert to payload -pub trait ToPayload: Sized + Send + Sync + 'static { +pub trait ToPayload: Debug + Sized + Send + Sync + 'static { /// A error type type Err: Error; @@ -53,7 +64,7 @@ where impl ToPayload for Json where - T: Serialize + Send + Sync + 'static, + T: Debug + Serialize + Send + Sync + 'static, { type Err = serde_json::Error; @@ -205,6 +216,18 @@ where Q: Serialize + Send, R: FromPayload, { + async fn http_url(&self) -> &str { + if let Some(url) = self.client.config.http_url.as_deref() { + return url; + } + + if is_cn().await { + HTTP_URL_CN + } else { + HTTP_URL + } + } + async fn do_send(&self) -> HttpClientResult { let HttpClient { http_cli, @@ -222,11 +245,9 @@ where let access_token_value = HeaderValue::from_str(&config.access_token) .map_err(|_| HttpClientError::InvalidAccessToken)?; + let url = self.http_url().await; let mut request_builder = http_cli - .request( - self.method.clone(), - format!("{}{}", config.http_url, self.path), - ) + .request(self.method.clone(), format!("{}{}", url, self.path)) .headers(default_headers.clone()) .headers(self.headers.clone()) .header("User-Agent", USER_AGENT) @@ -264,7 +285,13 @@ where HeaderValue::from_maybe_shared(sign).expect("valid signature"), ); - tracing::debug!(method = %request.method(), url = %request.url(), "http request"); + if let Some(body) = &self.body { + tracing::info!(method = %request.method(), url = %request.url(), body = ?body, "http request"); + } else { + tracing::info!(method = %request.method(), url = %request.url(), "http request"); + } + + let s = Instant::now(); // send request let (status, trace_id, text) = tokio::time::timeout(REQUEST_TIMEOUT, async move { @@ -282,7 +309,7 @@ where .await .map_err(|_| HttpClientError::RequestTimeout)??; - tracing::debug!(body = text.as_str(), "http response"); + tracing::info!(duration = ?s.elapsed(), body = %text.as_str(), "http response"); let resp = match serde_json::from_str::(&text) { Ok(resp) if resp.code == 0 => resp.data.ok_or(HttpClientError::UnexpectedResponse), @@ -302,7 +329,6 @@ where } /// Send request and get the response - #[tracing::instrument(level = "debug", skip(self))] pub async fn send(self) -> HttpClientResult { match self.do_send().await { Ok(resp) => Ok(resp), diff --git a/rust/crates/wsclient/Cargo.toml b/rust/crates/wsclient/Cargo.toml index 1af528683..f270f8e2f 100644 --- a/rust/crates/wsclient/Cargo.toml +++ b/rust/crates/wsclient/Cargo.toml @@ -15,7 +15,7 @@ tokio = { version = "1.18.2", features = [ "sync", "net", ] } -tokio-tungstenite = { version = "0.21.0", features = [ +tokio-tungstenite = { version = "0.26.1", features = [ "rustls-tls-webpki-roots", ] } thiserror = "1.0.31" diff --git a/rust/crates/wsclient/src/client.rs b/rust/crates/wsclient/src/client.rs index f359c1e47..9037617c8 100644 --- a/rust/crates/wsclient/src/client.rs +++ b/rust/crates/wsclient/src/client.rs @@ -155,7 +155,7 @@ impl<'a> Context<'a> { } _ = checkout_timeout.tick() => { if (Instant::now() - ping_time) > HEARTBEAT_TIMEOUT { - tracing::error!("heartbeat timeout"); + tracing::info!("heartbeat timeout"); return Err(WsClientError::ConnectionClosed { reason: None }); } } @@ -180,7 +180,8 @@ impl<'a> Context<'a> { body, signature: None, } - .encode(), + .encode() + .into(), ); self.inflight_requests.insert(request_id, reply_tx); self.sink.send(msg).await?; @@ -227,7 +228,7 @@ impl<'a> Context<'a> { return Err(WsClientError::ConnectionClosed { reason: Some(WsCloseReason { code: close_frame.code, - message: close_frame.reason.into_owned(), + message: close_frame.reason.to_string(), }), }); } diff --git a/rust/src/config.rs b/rust/src/config.rs index 2f196e6fd..c7cef7636 100644 --- a/rust/src/config.rs +++ b/rust/src/config.rs @@ -1,6 +1,8 @@ use std::{ collections::HashMap, fmt::{self, Display}, + path::{Path, PathBuf}, + sync::Arc, }; use http::Method; @@ -10,16 +12,19 @@ use num_enum::IntoPrimitive; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; use tokio_tungstenite::tungstenite::client::IntoClientRequest; +use tracing::{subscriber::NoSubscriber, Level, Subscriber}; +use tracing_appender::rolling::{RollingFileAppender, Rotation}; +use tracing_subscriber::{filter::Targets, layer::SubscriberExt}; use crate::error::Result; -const QUOTE_WS_URL: &str = "wss://openapi-quote.longportapp.com/v2"; -const TRADE_WS_URL: &str = "wss://openapi-trade.longportapp.com/v2"; -const CN_QUOTE_WS_URL: &str = "wss://openapi-quote.longportapp.cn/v2"; -const CN_TRADE_WS_URL: &str = "wss://openapi-trade.longportapp.cn/v2"; +const DEFAULT_QUOTE_WS_URL: &str = "wss://openapi-quote.longportapp.com/v2"; +const DEFAULT_TRADE_WS_URL: &str = "wss://openapi-trade.longportapp.com/v2"; +const DEFAULT_QUOTE_WS_URL_CN: &str = "wss://openapi-quote.longportapp.cn/v2"; +const DEFAULT_TRADE_WS_URL_CN: &str = "wss://openapi-trade.longportapp.cn/v2"; /// Language identifier -#[derive(Debug, Copy, Clone, PartialEq, Eq, IntoPrimitive)] +#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, IntoPrimitive)] #[allow(non_camel_case_types)] #[repr(i32)] pub enum Language { @@ -28,6 +33,7 @@ pub enum Language { /// zh-HK ZH_HK = 2, /// en + #[default] EN = 1, } @@ -48,9 +54,10 @@ impl Display for Language { } /// Push mode for candlestick -#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)] pub enum PushCandlestickMode { /// Realtime mode + #[default] Realtime, /// Confirmed mode Confirmed, @@ -60,12 +67,13 @@ pub enum PushCandlestickMode { #[derive(Debug, Clone)] pub struct Config { pub(crate) http_cli_config: HttpClientConfig, - pub(crate) quote_ws_url: String, - pub(crate) trade_ws_url: String, - pub(crate) language: Language, - pub(crate) enable_overnight: bool, - pub(crate) push_candlestick_mode: PushCandlestickMode, + pub(crate) quote_ws_url: Option, + pub(crate) trade_ws_url: Option, + pub(crate) enable_overnight: Option, + pub(crate) push_candlestick_mode: Option, pub(crate) enable_print_quote_packages: bool, + pub(crate) language: Option, + pub(crate) log_path: Option, } impl Config { @@ -77,22 +85,13 @@ impl Config { ) -> Self { Self { http_cli_config: HttpClientConfig::new(app_key, app_secret, access_token), - quote_ws_url: if is_cn() { - CN_QUOTE_WS_URL - } else { - QUOTE_WS_URL - } - .to_string(), - trade_ws_url: if is_cn() { - CN_TRADE_WS_URL - } else { - TRADE_WS_URL - } - .to_string(), - language: Language::EN, - enable_overnight: false, - push_candlestick_mode: PushCandlestickMode::Realtime, + quote_ws_url: None, + trade_ws_url: None, + language: None, + enable_overnight: None, + push_candlestick_mode: None, enable_print_quote_packages: true, + log_path: None, } } @@ -117,53 +116,38 @@ impl Config { /// `realtime`) /// - `LONGPORT_PRINT_QUOTE_PACKAGES` - Print quote packages when connected, /// `true` or `false` (Default: `true`) + /// - `LONGPORT_LOG_PATH` - Set the path of the log files (Default: `no + /// logs`) pub fn from_env() -> Result { let _ = dotenv::dotenv(); let http_cli_config = HttpClientConfig::from_env()?; - let quote_ws_url = std::env::var("LONGBRIDGE_QUOTE_WS_URL") - .or_else(|_| std::env::var("LONGPORT_QUOTE_WS_URL")) - .unwrap_or_else(|_| { - if is_cn() { - CN_QUOTE_WS_URL - } else { - QUOTE_WS_URL - } - .to_string() - }); - let trade_ws_url = std::env::var("LONGBRIDGE_TRADE_WS_URL") - .or_else(|_| std::env::var("LONGPORT_TRADE_WS_URL")) - .unwrap_or_else(|_| { - if is_cn() { - CN_TRADE_WS_URL - } else { - TRADE_WS_URL - } - .to_string() - }); + let quote_ws_url = std::env::var("LONGPORT_QUOTE_WS_URL").ok(); + let trade_ws_url = std::env::var("LONGPORT_TRADE_WS_URL").ok(); let enable_overnight = std::env::var("LONGPORT_ENABLE_OVERNIGHT") - .ok() - .map(|var| var == "true") - .unwrap_or_default(); - let push_candlestick_mode = - if std::env::var("LONGPORT_PUSH_CANDLESTICK_MODE").as_deref() == Ok("confirmed") { - PushCandlestickMode::Confirmed - } else { - PushCandlestickMode::Realtime - }; + .map(|value| value == "true") + .ok(); + let push_candlestick_mode = std::env::var("LONGPORT_PUSH_CANDLESTICK_MODE") + .map(|value| match value.as_str() { + "confirmed" => PushCandlestickMode::Confirmed, + _ => PushCandlestickMode::Realtime, + }) + .ok(); let enable_print_quote_packages = std::env::var("LONGPORT_PRINT_QUOTE_PACKAGES") .as_deref() .unwrap_or("true") == "true"; + let log_path = std::env::var("LONGPORT_LOG_PATH").ok().map(PathBuf::from); Ok(Config { http_cli_config, quote_ws_url, trade_ws_url, - language: Language::EN, + language: None, enable_overnight, push_candlestick_mode, enable_print_quote_packages, + log_path, }) } @@ -186,7 +170,7 @@ impl Config { #[must_use] pub fn quote_ws_url(self, url: impl Into) -> Self { Self { - quote_ws_url: url.into(), + quote_ws_url: Some(url.into()), ..self } } @@ -199,7 +183,7 @@ impl Config { #[must_use] pub fn trade_ws_url(self, url: impl Into) -> Self { Self { - trade_ws_url: url.into(), + trade_ws_url: Some(url.into()), ..self } } @@ -208,7 +192,10 @@ impl Config { /// /// Default: `Language::EN` pub fn language(self, language: Language) -> Self { - Self { language, ..self } + Self { + language: Some(language), + ..self + } } /// Enable overnight quote @@ -216,7 +203,7 @@ impl Config { /// Default: `false` pub fn enable_overnight(self) -> Self { Self { - enable_overnight: true, + enable_overnight: Some(true), ..self } } @@ -226,7 +213,7 @@ impl Config { /// Default: `PushCandlestickMode::Realtime` pub fn push_candlestick_mode(self, mode: PushCandlestickMode) -> Self { Self { - push_candlestick_mode: mode, + push_candlestick_mode: Some(mode), ..self } } @@ -242,8 +229,11 @@ impl Config { /// Create metadata for auth/reconnect request pub fn create_metadata(&self) -> HashMap { let mut metadata = HashMap::new(); - metadata.insert("accept-language".to_string(), self.language.to_string()); - if self.enable_overnight { + metadata.insert( + "accept-language".to_string(), + self.language.unwrap_or_default().to_string(), + ); + if self.enable_overnight.unwrap_or_default() { metadata.insert("need_over_night_quote".to_string(), "true".to_string()); } metadata @@ -251,31 +241,51 @@ impl Config { #[inline] pub(crate) fn create_http_client(&self) -> HttpClient { - HttpClient::new(self.http_cli_config.clone()) - .header(header::ACCEPT_LANGUAGE, self.language.as_str()) + HttpClient::new(self.http_cli_config.clone()).header( + header::ACCEPT_LANGUAGE, + self.language.unwrap_or_default().as_str(), + ) } fn create_ws_request(&self, url: &str) -> tokio_tungstenite::tungstenite::Result> { let mut request = url.into_client_request()?; request.headers_mut().append( header::ACCEPT_LANGUAGE, - HeaderValue::from_str(self.language.as_str()).unwrap(), + HeaderValue::from_str(self.language.unwrap_or_default().as_str()).unwrap(), ); Ok(request) } - #[inline] - pub(crate) fn create_quote_ws_request( + pub(crate) async fn create_quote_ws_request( &self, - ) -> tokio_tungstenite::tungstenite::Result> { - self.create_ws_request(&self.quote_ws_url) + ) -> (&str, tokio_tungstenite::tungstenite::Result>) { + match self.quote_ws_url.as_deref() { + Some(url) => (url, self.create_ws_request(url)), + None => { + let url = if is_cn().await { + DEFAULT_QUOTE_WS_URL_CN + } else { + DEFAULT_QUOTE_WS_URL + }; + (url, self.create_ws_request(url)) + } + } } - #[inline] - pub(crate) fn create_trade_ws_request( + pub(crate) async fn create_trade_ws_request( &self, - ) -> tokio_tungstenite::tungstenite::Result> { - self.create_ws_request(&self.trade_ws_url) + ) -> (&str, tokio_tungstenite::tungstenite::Result>) { + match self.trade_ws_url.as_deref() { + Some(url) => (url, self.create_ws_request(url)), + None => { + let url = if is_cn().await { + DEFAULT_TRADE_WS_URL_CN + } else { + DEFAULT_TRADE_WS_URL + }; + (url, self.create_ws_request(url)) + } + } } /// Gets a new `access_token` @@ -333,4 +343,38 @@ impl Config { .expect("create tokio runtime") .block_on(self.refresh_access_token(expired_at)) } + + /// Specifies the path of the log file + /// + /// Default: `None` + pub fn log_path(mut self, path: impl Into) -> Self { + self.log_path = Some(path.into()); + self + } + + pub(crate) fn create_log_subscriber( + &self, + path: impl AsRef, + ) -> Arc { + fn internal_create_log_subscriber( + config: &Config, + path: impl AsRef, + ) -> Option> { + let log_path = config.log_path.as_ref()?; + let appender = RollingFileAppender::builder() + .rotation(Rotation::DAILY) + .filename_suffix("log") + .build(log_path.join(path)) + .ok()?; + Some(Arc::new( + tracing_subscriber::fmt() + .with_writer(appender) + .with_ansi(false) + .finish() + .with(Targets::new().with_targets([("longport", Level::INFO)])), + )) + } + + internal_create_log_subscriber(self, path).unwrap_or_else(|| Arc::new(NoSubscriber::new())) + } } diff --git a/rust/src/quote/context.rs b/rust/src/quote/context.rs index 7ecc0f0b1..adc193a0b 100644 --- a/rust/src/quote/context.rs +++ b/rust/src/quote/context.rs @@ -6,6 +6,7 @@ use longport_wscli::WsClientError; use serde::{Deserialize, Serialize}; use time::{Date, PrimitiveDateTime}; use tokio::sync::{mpsc, oneshot}; +use tracing::{instrument::WithSubscriber, Subscriber}; use crate::{ quote::{ @@ -50,6 +51,7 @@ pub struct QuoteContext { member_id: i64, quote_level: String, quote_package_details: Vec, + log_subscriber: Arc, } impl QuoteContext { @@ -57,15 +59,19 @@ impl QuoteContext { pub async fn try_new( config: Arc, ) -> Result<(Self, mpsc::UnboundedReceiver)> { - let language = config.language; + let log_subscriber = config.create_log_subscriber("quote"); + let language = config.language.unwrap_or_default(); let http_cli = config.create_http_client(); let (command_tx, command_rx) = mpsc::unbounded_channel(); let (push_tx, push_rx) = mpsc::unbounded_channel(); - let core = Core::try_new(config, command_rx, push_tx).await?; + let core = Core::try_new(config, command_rx, push_tx) + .with_subscriber(log_subscriber.clone()) + .await?; let member_id = core.member_id(); let quote_level = core.quote_level().to_string(); let quote_package_details = core.quote_package_details().to_vec(); - tokio::spawn(core.run()); + tokio::spawn(core.run().with_subscriber(log_subscriber.clone())); + Ok(( QuoteContext { language, @@ -83,11 +89,18 @@ impl QuoteContext { member_id, quote_level, quote_package_details, + log_subscriber, }, push_rx, )) } + /// Returns the log subscriber + #[inline] + pub fn log_subscriber(&self) -> Arc { + self.log_subscriber.clone() + } + /// Returns the member ID #[inline] pub fn member_id(&self) -> i64 { @@ -1229,6 +1242,7 @@ impl QuoteContext { .request(Method::GET, "/v1/watchlist/groups") .response::>() .send() + .with_subscriber(self.log_subscriber.clone()) .await?; Ok(resp.0.groups) } @@ -1280,6 +1294,7 @@ impl QuoteContext { })) .response::>() .send() + .with_subscriber(self.log_subscriber.clone()) .await?; Ok(id) @@ -1316,6 +1331,7 @@ impl QuoteContext { .request(Method::DELETE, "/v1/watchlist/groups") .query_params(Request { id, purge }) .send() + .with_subscriber(self.log_subscriber.clone()) .await?) } @@ -1365,6 +1381,7 @@ impl QuoteContext { securities: req.securities, })) .send() + .with_subscriber(self.log_subscriber.clone()) .await?; Ok(()) diff --git a/rust/src/quote/core.rs b/rust/src/quote/core.rs index 48fcad11c..f96fc2dcb 100644 --- a/rust/src/quote/core.rs +++ b/rust/src/quote/core.rs @@ -162,14 +162,12 @@ impl Core { let (event_tx, event_rx) = mpsc::unbounded_channel(); - tracing::debug!( - url = config.quote_ws_url.as_str(), - "connecting to quote server", - ); + tracing::info!("connecting to quote server"); + let (url, res) = config.create_quote_ws_request().await; + let request = res.map_err(WsClientError::from)?; + let mut ws_cli = WsClient::open( - config - .create_quote_ws_request() - .map_err(WsClientError::from)?, + request, ProtocolVersion::Version1, CodecType::Protobuf, Platform::OpenAPI, @@ -178,7 +176,7 @@ impl Core { ) .await?; - tracing::debug!(url = config.quote_ws_url.as_str(), "quote server connected"); + tracing::info!(url = url, "quote server connected"); let session = ws_cli.request_auth(otp, config.create_metadata()).await?; @@ -188,7 +186,7 @@ impl Core { cmd_code::QUERY_USER_QUOTE_PROFILE, None, quote::UserQuoteProfileRequest { - language: config.language.to_string(), + language: config.language.unwrap_or_default().to_string(), }, ) .await?; @@ -240,7 +238,7 @@ impl Core { ws_cli.set_rate_limit(rate_limit.clone()); let current_trade_days = fetch_trading_days(&ws_cli).await?; - let push_candlestick_mode = config.push_candlestick_mode; + let push_candlestick_mode = config.push_candlestick_mode.unwrap_or_default(); let mut table = Table::new(); for market_packages in quote_package_details_by_market { @@ -309,13 +307,12 @@ impl Core { // reconnect tokio::time::sleep(RECONNECT_DELAY).await; - tracing::debug!( - url = self.config.quote_ws_url.as_str(), - "connecting to quote server", - ); + tracing::info!("connecting to quote server"); + let (url, res) = self.config.create_quote_ws_request().await; + let request = res.expect("BUG: failed to create quote ws request"); match WsClient::open( - self.config.create_quote_ws_request().unwrap(), + request, ProtocolVersion::Version1, CodecType::Protobuf, Platform::OpenAPI, @@ -331,10 +328,7 @@ impl Core { } } - tracing::debug!( - url = self.config.quote_ws_url.as_str(), - "quote server connected" - ); + tracing::info!(url = url, "quote server connected"); // request new session match &self.session { @@ -387,7 +381,6 @@ impl Core { } } - #[tracing::instrument(level = "debug", skip(self))] async fn main_loop(&mut self) -> Result<()> { let mut update_trading_days_interval = tokio::time::interval_at( Instant::now() + Duration::from_secs(60 * 60 * 24), @@ -520,7 +513,7 @@ impl Core { sub_types: SubFlags, is_first_push: bool, ) -> Result<()> { - tracing::debug!(symbols = ?symbols, sub_types = ?sub_types, "subscribe"); + tracing::info!(symbols = ?symbols, sub_types = ?sub_types, "subscribe"); // send request let req = SubscribeRequest { @@ -548,7 +541,7 @@ impl Core { symbols: Vec, sub_types: SubFlags, ) -> Result<()> { - tracing::debug!(symbols = ?symbols, sub_types = ?sub_types, "unsubscribe"); + tracing::info!(symbols = ?symbols, sub_types = ?sub_types, "unsubscribe"); // send requests let mut st_group: HashMap> = HashMap::new(); @@ -796,7 +789,7 @@ impl Core { } } - tracing::debug!(subscriptions = ?subscriptions, "resubscribe"); + tracing::info!(subscriptions = ?subscriptions, "resubscribe"); for (flags, symbols) in subscriptions { self.ws_cli @@ -919,7 +912,7 @@ impl Core { fn handle_push(&mut self, command_code: u8, body: Vec) -> Result<()> { match PushEvent::parse(command_code, &body) { Ok((mut event, tag)) => { - tracing::debug!(event = ?event, tag = ?tag, "push event"); + tracing::info!(event = ?event, tag = ?tag, "push event"); if tag != Some(PushQuoteTag::Eod) { self.store.handle_push(&mut event); diff --git a/rust/src/trade/context.rs b/rust/src/trade/context.rs index a62585f1e..b69aa6200 100644 --- a/rust/src/trade/context.rs +++ b/rust/src/trade/context.rs @@ -5,6 +5,7 @@ use longport_wscli::WsClientError; use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, oneshot}; +use tracing::{instrument::WithSubscriber, Subscriber}; use crate::{ trade::{ @@ -42,6 +43,7 @@ pub struct EstimateMaxPurchaseQuantityResponse { pub struct TradeContext { command_tx: mpsc::UnboundedSender, http_cli: HttpClient, + log_subscriber: Arc, } impl TradeContext { @@ -49,19 +51,31 @@ impl TradeContext { pub async fn try_new( config: Arc, ) -> Result<(Self, mpsc::UnboundedReceiver)> { + let log_subscriber = config.create_log_subscriber("trade"); let http_cli = config.create_http_client(); let (command_tx, command_rx) = mpsc::unbounded_channel(); let (push_tx, push_rx) = mpsc::unbounded_channel(); - tokio::spawn(Core::try_new(config, command_rx, push_tx).await?.run()); + let core = Core::try_new(config, command_rx, push_tx) + .with_subscriber(log_subscriber.clone()) + .await?; + tokio::spawn(core.run().with_subscriber(log_subscriber.clone())); + Ok(( TradeContext { http_cli, command_tx, + log_subscriber, }, push_rx, )) } + /// Returns the log subscriber + #[inline] + pub fn log_subscriber(&self) -> Arc { + self.log_subscriber.clone() + } + /// Subscribe /// /// Reference: @@ -173,6 +187,7 @@ impl TradeContext { .query_params(options.into().unwrap_or_default()) .response::>() .send() + .with_subscriber(self.log_subscriber.clone()) .await? .0 .trades) @@ -217,6 +232,7 @@ impl TradeContext { .query_params(options.into().unwrap_or_default()) .response::>() .send() + .with_subscriber(self.log_subscriber.clone()) .await? .0 .trades) @@ -268,6 +284,7 @@ impl TradeContext { .query_params(options.into().unwrap_or_default()) .response::>() .send() + .with_subscriber(self.log_subscriber.clone()) .await? .0 .orders) @@ -316,6 +333,7 @@ impl TradeContext { .query_params(options.into().unwrap_or_default()) .response::>() .send() + .with_subscriber(self.log_subscriber.clone()) .await? .0 .orders) @@ -354,6 +372,7 @@ impl TradeContext { .body(Json(options)) .response::>() .send() + .with_subscriber(self.log_subscriber.clone()) .await .map(|_| ())?) } @@ -397,6 +416,7 @@ impl TradeContext { .body(Json(options)) .response::>() .send() + .with_subscriber(self.log_subscriber.clone()) .await? .0; _ = self.command_tx.send(Command::SubmittedOrder { @@ -438,6 +458,7 @@ impl TradeContext { order_id: order_id.into(), }) .send() + .with_subscriber(self.log_subscriber.clone()) .await .map(|_| ())?) } @@ -479,6 +500,7 @@ impl TradeContext { .query_params(Request { currency }) .response::>() .send() + .with_subscriber(self.log_subscriber.clone()) .await? .0 .list) @@ -521,6 +543,7 @@ impl TradeContext { .query_params(options) .response::>() .send() + .with_subscriber(self.log_subscriber.clone()) .await? .0 .list) @@ -556,6 +579,7 @@ impl TradeContext { .query_params(opts.into().unwrap_or_default()) .response::>() .send() + .with_subscriber(self.log_subscriber.clone()) .await? .0) } @@ -590,6 +614,7 @@ impl TradeContext { .query_params(opts.into().unwrap_or_default()) .response::>() .send() + .with_subscriber(self.log_subscriber.clone()) .await? .0) } @@ -628,6 +653,7 @@ impl TradeContext { }) .response::>() .send() + .with_subscriber(self.log_subscriber.clone()) .await? .0) } @@ -670,6 +696,7 @@ impl TradeContext { order_id: order_id.into(), }) .send() + .with_subscriber(self.log_subscriber.clone()) .await? .0) } @@ -716,6 +743,7 @@ impl TradeContext { .query_params(opts) .response::>() .send() + .with_subscriber(self.log_subscriber.clone()) .await? .0) } diff --git a/rust/src/trade/core.rs b/rust/src/trade/core.rs index 9d9847c79..03ae244d1 100644 --- a/rust/src/trade/core.rs +++ b/rust/src/trade/core.rs @@ -60,14 +60,11 @@ impl Core { let (event_tx, event_rx) = mpsc::unbounded_channel(); - tracing::debug!( - url = config.trade_ws_url.as_str(), - "connecting to trade server", - ); + tracing::info!("connecting to trade server"); + let (url, res) = config.create_trade_ws_request().await; + let request = res.map_err(WsClientError::from)?; let ws_cli = WsClient::open( - config - .create_trade_ws_request() - .map_err(WsClientError::from)?, + request, ProtocolVersion::Version1, CodecType::Protobuf, Platform::OpenAPI, @@ -76,7 +73,7 @@ impl Core { ) .await?; - tracing::debug!(url = config.trade_ws_url.as_str(), "trade server connected"); + tracing::info!(url = url, "trade server connected"); let session = ws_cli.request_auth(otp, Default::default()).await?; @@ -106,13 +103,12 @@ impl Core { // reconnect tokio::time::sleep(RECONNECT_DELAY).await; - tracing::debug!( - url = self.config.trade_ws_url.as_str(), - "connecting to trade server", - ); + tracing::info!("connecting to trade server"); + let (url, res) = self.config.create_trade_ws_request().await; + let request = res.expect("BUG: failed to create trade ws request"); match WsClient::open( - self.config.create_trade_ws_request().unwrap(), + request, ProtocolVersion::Version1, CodecType::Protobuf, Platform::OpenAPI, @@ -128,10 +124,7 @@ impl Core { } } - tracing::debug!( - url = self.config.trade_ws_url.as_str(), - "trade server connected" - ); + tracing::info!(url = url, "trade server connected"); // request new session match &self.session { @@ -180,7 +173,6 @@ impl Core { } } - #[tracing::instrument(level = "debug", skip(self))] async fn main_loop(&mut self) -> Result<()> { let mut tick = tokio::time::interval(Duration::from_millis(500));