Skip to content

Commit

Permalink
add some logs
Browse files Browse the repository at this point in the history
  • Loading branch information
sunli829 committed Feb 3, 2025
1 parent 22896d9 commit 466a91f
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 7 deletions.
2 changes: 1 addition & 1 deletion rust/crates/httpclient/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ license = "MIT OR Apache-2.0"
futures-util = "0.3.21"
hmac = "0.12.1"
parking_lot = "0.12.0"
reqwest = { version = "0.12.3", default-features = false, features = [
reqwest = { version = "0.12.12", default-features = false, features = [
"rustls-tls",
"json",
] }
Expand Down
5 changes: 4 additions & 1 deletion rust/crates/wsclient/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,10 +393,13 @@ impl WsClient {
T: prost::Message,
R: prost::Message + Default,
{
tracing::info!(message = ?req, "ws request");
let resp = self
.request_raw(command_code, timeout, req.encode_to_vec())
.await?;
Ok(R::decode(&*resp)?)
let resp = R::decode(&*resp)?;
tracing::info!(message = ?resp, "ws response");
Ok(resp)
}
}

Expand Down
25 changes: 24 additions & 1 deletion rust/src/quote/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use longport_wscli::WsClientError;
use serde::{Deserialize, Serialize};
use time::{Date, PrimitiveDateTime};
use tokio::sync::{mpsc, oneshot};
use tracing::{instrument::WithSubscriber, Subscriber};
use tracing::{dispatcher, instrument::WithSubscriber, Subscriber};

use crate::{
quote::{
Expand Down Expand Up @@ -54,12 +54,31 @@ pub struct QuoteContext {
log_subscriber: Arc<dyn Subscriber + Send + Sync>,
}

impl Drop for QuoteContext {
fn drop(&mut self) {
dispatcher::with_default(&self.log_subscriber.clone().into(), || {
tracing::info!("quote context dropped");
});
}
}

impl QuoteContext {
/// Create a `QuoteContext`
pub async fn try_new(
config: Arc<Config>,
) -> Result<(Self, mpsc::UnboundedReceiver<PushEvent>)> {
let log_subscriber = config.create_log_subscriber("quote");

dispatcher::with_default(&log_subscriber.clone().into(), || {
tracing::info!(
language = ?config.language,
enable_overnight = ?config.enable_overnight,
push_candlestick_mode = ?config.push_candlestick_mode,
enable_print_quote_packages = ?config.enable_print_quote_packages,
"creating quote context"
);
});

let language = config.language.unwrap_or_default();
let http_cli = config.create_http_client();
let (command_tx, command_rx) = mpsc::unbounded_channel();
Expand All @@ -72,6 +91,10 @@ impl QuoteContext {
let quote_package_details = core.quote_package_details().to_vec();
tokio::spawn(core.run().with_subscriber(log_subscriber.clone()));

dispatcher::with_default(&log_subscriber.clone().into(), || {
tracing::info!("quote context created");
});

Ok((
QuoteContext {
language,
Expand Down
23 changes: 20 additions & 3 deletions rust/src/quote/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,8 +520,6 @@ impl Core {
sub_types: SubFlags,
is_first_push: bool,
) -> Result<()> {
tracing::info!(symbols = ?symbols, sub_types = ?sub_types, "subscribe");

// send request
let req = SubscribeRequest {
symbol: symbols.clone(),
Expand Down Expand Up @@ -613,17 +611,22 @@ impl Core {
symbol: String,
period: Period,
) -> Result<Vec<Candlestick>> {
tracing::info!(symbol = symbol, period = ?period, "subscribe candlesticks");

if let Some(candlesticks) = self
.store
.securities
.get(&symbol)
.map(|data| data.candlesticks.get(&period))
{
tracing::info!(symbol = symbol, period = ?period, "subscribed, returns candlesticks in memory");
return Ok(candlesticks
.map(|candlesticks| candlesticks.candlesticks.clone())
.unwrap_or_default());
}

tracing::info!(symbol = symbol, "fetch symbol board");

let security_data = self.store.securities.entry(symbol.clone()).or_default();
if security_data.board != SecurityBoard::Unknown {
// update board
Expand All @@ -645,7 +648,10 @@ impl Core {
security_data.board = resp.secu_static_info[0].board.parse().unwrap_or_default();
}

tracing::info!(symbol = symbol, board = ?security_data.board, "got the symbol board");

// pull candlesticks
tracing::info!(symbol = symbol, period = ?period, "pull history candlesticks");
let resp: SecurityCandlestickResponse = self
.ws_cli
.request(
Expand All @@ -659,6 +665,7 @@ impl Core {
},
)
.await?;
tracing::info!(symbol = symbol, period = ?period, len = resp.candlesticks.len(), "got history candlesticks");

let candlesticks = resp
.candlesticks
Expand Down Expand Up @@ -690,15 +697,18 @@ impl Core {
return Ok(candlesticks);
}

tracing::info!(symbol = symbol, period = ?period, sub_flags = ?sub_flags, "subscribe for candlesticks");

let req = SubscribeRequest {
symbol: vec![symbol],
symbol: vec![symbol.clone()],
sub_type: sub_flags.into(),
is_first_push: true,
};
self.ws_cli
.request::<_, ()>(cmd_code::SUBSCRIBE, None, req)
.await?;

tracing::info!(symbol = symbol, period = ?period, sub_flags = ?sub_flags, "subscribed for candlesticks");
Ok(candlesticks)
}

Expand Down Expand Up @@ -1130,6 +1140,13 @@ fn update_and_push_candlestick(
};

for (candlestick, is_confirmed) in push_candlesticks {
tracing::info!(
symbol = symbol,
period = ?period,
is_confirmed = is_confirmed,
candlestick = ?candlestick,
"push candlestick"
);
let _ = tx.send(PushEvent {
sequence: 0,
symbol: symbol.to_string(),
Expand Down
19 changes: 18 additions & 1 deletion rust/src/trade/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use longport_wscli::WsClientError;
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, oneshot};
use tracing::{instrument::WithSubscriber, Subscriber};
use tracing::{dispatcher, instrument::WithSubscriber, Subscriber};

use crate::{
trade::{
Expand Down Expand Up @@ -46,12 +46,25 @@ pub struct TradeContext {
log_subscriber: Arc<dyn Subscriber + Send + Sync>,
}

impl Drop for TradeContext {
fn drop(&mut self) {
dispatcher::with_default(&self.log_subscriber.clone().into(), || {
tracing::info!("trade context dropped");
});
}
}

impl TradeContext {
/// Create a `TradeContext`
pub async fn try_new(
config: Arc<Config>,
) -> Result<(Self, mpsc::UnboundedReceiver<PushEvent>)> {
let log_subscriber = config.create_log_subscriber("trade");

dispatcher::with_default(&log_subscriber.clone().into(), || {
tracing::info!(language = ?config.language, "creating trade context");
});

let http_cli = config.create_http_client();
let (command_tx, command_rx) = mpsc::unbounded_channel();
let (push_tx, push_rx) = mpsc::unbounded_channel();
Expand All @@ -60,6 +73,10 @@ impl TradeContext {
.await?;
tokio::spawn(core.run().with_subscriber(log_subscriber.clone()));

dispatcher::with_default(&log_subscriber.clone().into(), || {
tracing::info!("trade context created");
});

Ok((
TradeContext {
http_cli,
Expand Down
3 changes: 3 additions & 0 deletions rust/src/trade/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ impl Core {
async fn handle_push(&mut self, command_code: u8, body: Vec<u8>) -> Result<()> {
match PushEvent::parse(command_code, &body) {
Ok(Some(event)) => {
tracing::info!(event = ?event, "push event");
let _ = self.push_tx.send(event);
}
Ok(None) => {}
Expand Down Expand Up @@ -262,6 +263,7 @@ impl Core {
let req = Sub {
topics: topics.iter().map(ToString::to_string).collect(),
};
tracing::info!(topics = ?req.topics, "subscribing topics");
let resp: SubResponse = self.ws_cli.request(cmd_code::SUBSCRIBE, None, req).await?;
self.subscriptions = resp.current.into_iter().collect();
Ok(())
Expand All @@ -271,6 +273,7 @@ impl Core {
let req = Unsub {
topics: topics.iter().map(ToString::to_string).collect(),
};
tracing::info!(topics = ?req.topics, "unsubscribing topics");
let resp: UnsubResponse = self
.ws_cli
.request(cmd_code::UNSUBSCRIBE, None, req)
Expand Down

0 comments on commit 466a91f

Please sign in to comment.