From 13c1b8071859a8eb7cdbd395d5983c1cb0ab7040 Mon Sep 17 00:00:00 2001 From: Sunli Date: Thu, 7 Mar 2024 17:28:19 +0800 Subject: [PATCH] fix resubscribe daily candlesticks --- examples/rust/account_asset/Cargo.toml | 1 + examples/rust/account_asset/src/main.rs | 7 +- examples/rust/http_client/Cargo.toml | 1 + examples/rust/http_client/src/main.rs | 5 ++ examples/rust/submit_order/Cargo.toml | 1 + examples/rust/submit_order/src/main.rs | 5 ++ .../rust/subscribe_candlesticks/Cargo.toml | 1 + .../rust/subscribe_candlesticks/src/main.rs | 9 +- examples/rust/subscribe_quote/Cargo.toml | 1 + examples/rust/subscribe_quote/src/main.rs | 6 +- examples/rust/today_orders/Cargo.toml | 1 + examples/rust/today_orders/src/main.rs | 5 ++ rust/crates/wsclient/src/client.rs | 2 +- rust/src/quote/core.rs | 88 +++++++++++-------- 14 files changed, 92 insertions(+), 41 deletions(-) diff --git a/examples/rust/account_asset/Cargo.toml b/examples/rust/account_asset/Cargo.toml index 18a070f93..3537d9bd1 100644 --- a/examples/rust/account_asset/Cargo.toml +++ b/examples/rust/account_asset/Cargo.toml @@ -6,3 +6,4 @@ edition = "2021" [dependencies] longport = { path = "../../../rust" } tokio = { version = "1.19", features = ["rt-multi-thread", "macros"] } +tracing-subscriber = { version = "0.3.18", features = ["fmt", "env-filter"] } diff --git a/examples/rust/account_asset/src/main.rs b/examples/rust/account_asset/src/main.rs index 223f91e34..384833563 100644 --- a/examples/rust/account_asset/src/main.rs +++ b/examples/rust/account_asset/src/main.rs @@ -1,13 +1,18 @@ use std::sync::Arc; use longport::{trade::TradeContext, Config}; +use tracing_subscriber::EnvFilter; #[tokio::main] async fn main() -> Result<(), Box> { + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .init(); + let config = Arc::new(Config::from_env()?); let (ctx, _) = TradeContext::try_new(config).await?; - let resp = ctx.account_balance().await?; + let resp = ctx.account_balance(None).await?; println!("{:?}", resp); Ok(()) } diff --git a/examples/rust/http_client/Cargo.toml b/examples/rust/http_client/Cargo.toml index f4161b8c4..998f47945 100644 --- a/examples/rust/http_client/Cargo.toml +++ b/examples/rust/http_client/Cargo.toml @@ -6,3 +6,4 @@ edition = "2021" [dependencies] longport = { path = "../../../rust" } tokio = { version = "1.19", features = ["rt-multi-thread", "macros"] } +tracing-subscriber = { version = "0.3.18", features = ["fmt", "env-filter"] } diff --git a/examples/rust/http_client/src/main.rs b/examples/rust/http_client/src/main.rs index 0fb01bbd7..162c75da2 100644 --- a/examples/rust/http_client/src/main.rs +++ b/examples/rust/http_client/src/main.rs @@ -1,7 +1,12 @@ use longport::httpclient::HttpClient; +use tracing_subscriber::EnvFilter; #[tokio::main] async fn main() -> Result<(), Box> { + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .init(); + let http_cli = HttpClient::from_env()?; let resp = http_cli .request("GET".parse()?, "/v1/trade/execution/today") diff --git a/examples/rust/submit_order/Cargo.toml b/examples/rust/submit_order/Cargo.toml index 31af8e78f..47eee09f7 100644 --- a/examples/rust/submit_order/Cargo.toml +++ b/examples/rust/submit_order/Cargo.toml @@ -6,3 +6,4 @@ edition = "2021" [dependencies] longport = { path = "../../../rust" } tokio = { version = "1.19", features = ["rt-multi-thread", "macros"] } +tracing-subscriber = { version = "0.3.18", features = ["fmt", "env-filter"] } diff --git a/examples/rust/submit_order/src/main.rs b/examples/rust/submit_order/src/main.rs index e8f81087c..05a3fd65f 100644 --- a/examples/rust/submit_order/src/main.rs +++ b/examples/rust/submit_order/src/main.rs @@ -5,9 +5,14 @@ use longport::{ trade::{OrderSide, OrderType, SubmitOrderOptions, TimeInForceType, TradeContext}, Config, }; +use tracing_subscriber::EnvFilter; #[tokio::main] async fn main() -> Result<(), Box> { + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .init(); + let config = Arc::new(Config::from_env()?); let (ctx, _) = TradeContext::try_new(config).await?; diff --git a/examples/rust/subscribe_candlesticks/Cargo.toml b/examples/rust/subscribe_candlesticks/Cargo.toml index 616df502b..6b8e87cf6 100644 --- a/examples/rust/subscribe_candlesticks/Cargo.toml +++ b/examples/rust/subscribe_candlesticks/Cargo.toml @@ -6,3 +6,4 @@ edition = "2021" [dependencies] longport = { path = "../../../rust" } tokio = { version = "1.19", features = ["rt-multi-thread", "macros"] } +tracing-subscriber = { version = "0.3.18", features = ["fmt", "env-filter"] } diff --git a/examples/rust/subscribe_candlesticks/src/main.rs b/examples/rust/subscribe_candlesticks/src/main.rs index 0275b93b1..34f8b0b3e 100644 --- a/examples/rust/subscribe_candlesticks/src/main.rs +++ b/examples/rust/subscribe_candlesticks/src/main.rs @@ -4,13 +4,18 @@ use longport::{ quote::{Period, QuoteContext}, Config, }; +use tracing_subscriber::EnvFilter; #[tokio::main] async fn main() -> Result<(), Box> { + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .init(); + let config = Arc::new(Config::from_env()?); let (ctx, mut receiver) = QuoteContext::try_new(config).await?; - ctx.subscribe_candlesticks("AAPL.US", Period::OneMinute) - .await?; + ctx.subscribe_candlesticks("AAPL.US", Period::Day).await?; + while let Some(event) = receiver.recv().await { println!("{:?}", event); } diff --git a/examples/rust/subscribe_quote/Cargo.toml b/examples/rust/subscribe_quote/Cargo.toml index 4499ae9d5..2a51bbcfd 100644 --- a/examples/rust/subscribe_quote/Cargo.toml +++ b/examples/rust/subscribe_quote/Cargo.toml @@ -6,3 +6,4 @@ edition = "2021" [dependencies] longport = { path = "../../../rust" } tokio = { version = "1.19", features = ["rt-multi-thread", "macros"] } +tracing-subscriber = { version = "0.3.18", features = ["fmt", "env-filter"] } diff --git a/examples/rust/subscribe_quote/src/main.rs b/examples/rust/subscribe_quote/src/main.rs index 252bbfaee..02bf912cb 100644 --- a/examples/rust/subscribe_quote/src/main.rs +++ b/examples/rust/subscribe_quote/src/main.rs @@ -4,12 +4,16 @@ use longport::{ quote::{QuoteContext, SubFlags}, Config, }; +use tracing_subscriber::EnvFilter; #[tokio::main] async fn main() -> Result<(), Box> { + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .init(); + let config = Arc::new(Config::from_env()?); let (ctx, mut receiver) = QuoteContext::try_new(config).await?; - println!("{}", ctx.member_id()); ctx.subscribe( ["700.HK", "AAPL.US", "TSLA.US", "NFLX.US"], SubFlags::QUOTE, diff --git a/examples/rust/today_orders/Cargo.toml b/examples/rust/today_orders/Cargo.toml index 3680ddde3..5fb9865dd 100644 --- a/examples/rust/today_orders/Cargo.toml +++ b/examples/rust/today_orders/Cargo.toml @@ -6,3 +6,4 @@ edition = "2021" [dependencies] longport = { path = "../../../rust" } tokio = { version = "1.19", features = ["rt-multi-thread", "macros"] } +tracing-subscriber = { version = "0.3.18", features = ["fmt", "env-filter"] } diff --git a/examples/rust/today_orders/src/main.rs b/examples/rust/today_orders/src/main.rs index bea6d6661..0dff8be40 100644 --- a/examples/rust/today_orders/src/main.rs +++ b/examples/rust/today_orders/src/main.rs @@ -1,9 +1,14 @@ use std::sync::Arc; use longport::{trade::TradeContext, Config}; +use tracing_subscriber::EnvFilter; #[tokio::main] async fn main() -> Result<(), Box> { + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .init(); + let config = Arc::new(Config::from_env()?); let (ctx, _) = TradeContext::try_new(config).await?; diff --git a/rust/crates/wsclient/src/client.rs b/rust/crates/wsclient/src/client.rs index a202c282d..1e269fe10 100644 --- a/rust/crates/wsclient/src/client.rs +++ b/rust/crates/wsclient/src/client.rs @@ -27,7 +27,7 @@ use crate::{ codec::Packet, WsClientError, WsClientResult, WsCloseReason, WsEvent, WsResponseErrorDetail, }; -const CONNECT_TIMEOUT: Duration = Duration::from_secs(30); +const CONNECT_TIMEOUT: Duration = Duration::from_secs(5); const REQUEST_TIMEOUT: Duration = Duration::from_secs(30); const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(120); const AUTH_TIMEOUT: Duration = Duration::from_secs(5); diff --git a/rust/src/quote/core.rs b/rust/src/quote/core.rs index d6f31f46d..8498225e5 100644 --- a/rust/src/quote/core.rs +++ b/rust/src/quote/core.rs @@ -471,18 +471,22 @@ impl Core { for symbol in &symbols { let mut st = sub_types; + if let Some(candlesticks) = self .store .securities .get(symbol) .map(|data| &data.candlesticks) { - if candlesticks.contains_key(&Period::Day) { - st.remove(SubFlags::QUOTE); - } else if !candlesticks.is_empty() { - st.remove(SubFlags::TRADE); + for period in candlesticks.keys() { + if period == &Period::Day { + st.remove(SubFlags::QUOTE); + } else { + st.remove(SubFlags::TRADE); + } } } + if !st.is_empty() { st_group.entry(st).or_default().push(symbol.as_ref()); } @@ -622,33 +626,41 @@ impl Core { .get_mut(&symbol) .map(|data| &mut data.candlesticks) { - if periods.remove(&period).is_some() - && periods.is_empty() - && !self + if periods.remove(&period).is_some() { + if period == Period::Day { + unsubscribe_quote = true; + } else { + if periods.is_empty() + || (periods.len() == 1 && periods.contains_key(&Period::Day)) + { + unsubscribe_quote = true; + } + } + } + + if unsubscribe_quote { + if !self .subscriptions .get(&symbol) .copied() .unwrap_or_else(SubFlags::empty) .contains(unsubscribe_sub_flags) - { - unsubscribe_quote = true; + { + self.ws_cli + .request( + cmd_code::UNSUBSCRIBE, + None, + UnsubscribeRequest { + symbol: vec![symbol], + sub_type: unsubscribe_sub_flags.into(), + unsub_all: false, + }, + ) + .await?; + } } } - if unsubscribe_quote { - self.ws_cli - .request( - cmd_code::UNSUBSCRIBE, - None, - UnsubscribeRequest { - symbol: vec![symbol], - sub_type: unsubscribe_sub_flags.into(), - unsub_all: false, - }, - ) - .await?; - } - Ok(()) } @@ -677,22 +689,26 @@ impl Core { } async fn resubscribe(&mut self) -> Result<()> { - let mut subscriptions: HashMap> = HashMap::new(); + let mut subscriptions: HashMap> = HashMap::new(); for (symbol, flags) in &self.subscriptions { - let mut flags = *flags; + subscriptions + .entry(*flags) + .or_default() + .insert(symbol.clone()); + } - if self - .store - .securities - .get(symbol) - .map(|data| !data.candlesticks.is_empty()) - .unwrap_or_default() - { - flags |= SubFlags::TRADE; + for (symbol, data) in &self.store.securities { + for period in data.candlesticks.keys() { + subscriptions + .entry(if *period == Period::Day { + SubFlags::QUOTE + } else { + SubFlags::TRADE + }) + .or_default() + .insert(symbol.clone()); } - - subscriptions.entry(flags).or_default().push(symbol.clone()); } for (flags, symbols) in subscriptions { @@ -701,7 +717,7 @@ impl Core { cmd_code::SUBSCRIBE, None, SubscribeRequest { - symbol: symbols, + symbol: symbols.into_iter().collect(), sub_type: flags.into(), is_first_push: false, },