From 466a91f414ed11dea70acde07d1a633161936651 Mon Sep 17 00:00:00 2001 From: Sunli Date: Fri, 31 Jan 2025 23:08:39 +0800 Subject: [PATCH] add some logs --- rust/crates/httpclient/Cargo.toml | 2 +- rust/crates/wsclient/src/client.rs | 5 ++++- rust/src/quote/context.rs | 25 ++++++++++++++++++++++++- rust/src/quote/core.rs | 23 ++++++++++++++++++++--- rust/src/trade/context.rs | 19 ++++++++++++++++++- rust/src/trade/core.rs | 3 +++ 6 files changed, 70 insertions(+), 7 deletions(-) diff --git a/rust/crates/httpclient/Cargo.toml b/rust/crates/httpclient/Cargo.toml index 78bb5e5ef..14ff938f4 100644 --- a/rust/crates/httpclient/Cargo.toml +++ b/rust/crates/httpclient/Cargo.toml @@ -9,7 +9,7 @@ license = "MIT OR Apache-2.0" futures-util = "0.3.21" hmac = "0.12.1" parking_lot = "0.12.0" -reqwest = { version = "0.12.3", default-features = false, features = [ +reqwest = { version = "0.12.12", default-features = false, features = [ "rustls-tls", "json", ] } diff --git a/rust/crates/wsclient/src/client.rs b/rust/crates/wsclient/src/client.rs index 9037617c8..74cb81b93 100644 --- a/rust/crates/wsclient/src/client.rs +++ b/rust/crates/wsclient/src/client.rs @@ -393,10 +393,13 @@ impl WsClient { T: prost::Message, R: prost::Message + Default, { + tracing::info!(message = ?req, "ws request"); let resp = self .request_raw(command_code, timeout, req.encode_to_vec()) .await?; - Ok(R::decode(&*resp)?) + let resp = R::decode(&*resp)?; + tracing::info!(message = ?resp, "ws response"); + Ok(resp) } } diff --git a/rust/src/quote/context.rs b/rust/src/quote/context.rs index adc193a0b..b408c409e 100644 --- a/rust/src/quote/context.rs +++ b/rust/src/quote/context.rs @@ -6,7 +6,7 @@ use longport_wscli::WsClientError; use serde::{Deserialize, Serialize}; use time::{Date, PrimitiveDateTime}; use tokio::sync::{mpsc, oneshot}; -use tracing::{instrument::WithSubscriber, Subscriber}; +use tracing::{dispatcher, instrument::WithSubscriber, Subscriber}; use crate::{ quote::{ @@ -54,12 +54,31 @@ pub struct QuoteContext { log_subscriber: Arc, } +impl Drop for QuoteContext { + fn drop(&mut self) { + dispatcher::with_default(&self.log_subscriber.clone().into(), || { + tracing::info!("quote context dropped"); + }); + } +} + impl QuoteContext { /// Create a `QuoteContext` pub async fn try_new( config: Arc, ) -> Result<(Self, mpsc::UnboundedReceiver)> { let log_subscriber = config.create_log_subscriber("quote"); + + dispatcher::with_default(&log_subscriber.clone().into(), || { + tracing::info!( + language = ?config.language, + enable_overnight = ?config.enable_overnight, + push_candlestick_mode = ?config.push_candlestick_mode, + enable_print_quote_packages = ?config.enable_print_quote_packages, + "creating quote context" + ); + }); + let language = config.language.unwrap_or_default(); let http_cli = config.create_http_client(); let (command_tx, command_rx) = mpsc::unbounded_channel(); @@ -72,6 +91,10 @@ impl QuoteContext { let quote_package_details = core.quote_package_details().to_vec(); tokio::spawn(core.run().with_subscriber(log_subscriber.clone())); + dispatcher::with_default(&log_subscriber.clone().into(), || { + tracing::info!("quote context created"); + }); + Ok(( QuoteContext { language, diff --git a/rust/src/quote/core.rs b/rust/src/quote/core.rs index 0a6f1e821..f1e6f813e 100644 --- a/rust/src/quote/core.rs +++ b/rust/src/quote/core.rs @@ -520,8 +520,6 @@ impl Core { sub_types: SubFlags, is_first_push: bool, ) -> Result<()> { - tracing::info!(symbols = ?symbols, sub_types = ?sub_types, "subscribe"); - // send request let req = SubscribeRequest { symbol: symbols.clone(), @@ -613,17 +611,22 @@ impl Core { symbol: String, period: Period, ) -> Result> { + tracing::info!(symbol = symbol, period = ?period, "subscribe candlesticks"); + if let Some(candlesticks) = self .store .securities .get(&symbol) .map(|data| data.candlesticks.get(&period)) { + tracing::info!(symbol = symbol, period = ?period, "subscribed, returns candlesticks in memory"); return Ok(candlesticks .map(|candlesticks| candlesticks.candlesticks.clone()) .unwrap_or_default()); } + tracing::info!(symbol = symbol, "fetch symbol board"); + let security_data = self.store.securities.entry(symbol.clone()).or_default(); if security_data.board != SecurityBoard::Unknown { // update board @@ -645,7 +648,10 @@ impl Core { security_data.board = resp.secu_static_info[0].board.parse().unwrap_or_default(); } + tracing::info!(symbol = symbol, board = ?security_data.board, "got the symbol board"); + // pull candlesticks + tracing::info!(symbol = symbol, period = ?period, "pull history candlesticks"); let resp: SecurityCandlestickResponse = self .ws_cli .request( @@ -659,6 +665,7 @@ impl Core { }, ) .await?; + tracing::info!(symbol = symbol, period = ?period, len = resp.candlesticks.len(), "got history candlesticks"); let candlesticks = resp .candlesticks @@ -690,8 +697,10 @@ impl Core { return Ok(candlesticks); } + tracing::info!(symbol = symbol, period = ?period, sub_flags = ?sub_flags, "subscribe for candlesticks"); + let req = SubscribeRequest { - symbol: vec![symbol], + symbol: vec![symbol.clone()], sub_type: sub_flags.into(), is_first_push: true, }; @@ -699,6 +708,7 @@ impl Core { .request::<_, ()>(cmd_code::SUBSCRIBE, None, req) .await?; + tracing::info!(symbol = symbol, period = ?period, sub_flags = ?sub_flags, "subscribed for candlesticks"); Ok(candlesticks) } @@ -1130,6 +1140,13 @@ fn update_and_push_candlestick( }; for (candlestick, is_confirmed) in push_candlesticks { + tracing::info!( + symbol = symbol, + period = ?period, + is_confirmed = is_confirmed, + candlestick = ?candlestick, + "push candlestick" + ); let _ = tx.send(PushEvent { sequence: 0, symbol: symbol.to_string(), diff --git a/rust/src/trade/context.rs b/rust/src/trade/context.rs index b69aa6200..9305ed7d2 100644 --- a/rust/src/trade/context.rs +++ b/rust/src/trade/context.rs @@ -5,7 +5,7 @@ use longport_wscli::WsClientError; use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, oneshot}; -use tracing::{instrument::WithSubscriber, Subscriber}; +use tracing::{dispatcher, instrument::WithSubscriber, Subscriber}; use crate::{ trade::{ @@ -46,12 +46,25 @@ pub struct TradeContext { log_subscriber: Arc, } +impl Drop for TradeContext { + fn drop(&mut self) { + dispatcher::with_default(&self.log_subscriber.clone().into(), || { + tracing::info!("trade context dropped"); + }); + } +} + impl TradeContext { /// Create a `TradeContext` pub async fn try_new( config: Arc, ) -> Result<(Self, mpsc::UnboundedReceiver)> { let log_subscriber = config.create_log_subscriber("trade"); + + dispatcher::with_default(&log_subscriber.clone().into(), || { + tracing::info!(language = ?config.language, "creating trade context"); + }); + let http_cli = config.create_http_client(); let (command_tx, command_rx) = mpsc::unbounded_channel(); let (push_tx, push_rx) = mpsc::unbounded_channel(); @@ -60,6 +73,10 @@ impl TradeContext { .await?; tokio::spawn(core.run().with_subscriber(log_subscriber.clone())); + dispatcher::with_default(&log_subscriber.clone().into(), || { + tracing::info!("trade context created"); + }); + Ok(( TradeContext { http_cli, diff --git a/rust/src/trade/core.rs b/rust/src/trade/core.rs index 03ae244d1..7d5da8ffd 100644 --- a/rust/src/trade/core.rs +++ b/rust/src/trade/core.rs @@ -208,6 +208,7 @@ impl Core { async fn handle_push(&mut self, command_code: u8, body: Vec) -> Result<()> { match PushEvent::parse(command_code, &body) { Ok(Some(event)) => { + tracing::info!(event = ?event, "push event"); let _ = self.push_tx.send(event); } Ok(None) => {} @@ -262,6 +263,7 @@ impl Core { let req = Sub { topics: topics.iter().map(ToString::to_string).collect(), }; + tracing::info!(topics = ?req.topics, "subscribing topics"); let resp: SubResponse = self.ws_cli.request(cmd_code::SUBSCRIBE, None, req).await?; self.subscriptions = resp.current.into_iter().collect(); Ok(()) @@ -271,6 +273,7 @@ impl Core { let req = Unsub { topics: topics.iter().map(ToString::to_string).collect(), }; + tracing::info!(topics = ?req.topics, "unsubscribing topics"); let resp: UnsubResponse = self .ws_cli .request(cmd_code::UNSUBSCRIBE, None, req)