Skip to content

Commit

Permalink
fix resubscribe daily candlesticks
Browse files Browse the repository at this point in the history
  • Loading branch information
sunli829 committed Mar 7, 2024
1 parent 2bd7382 commit 13c1b80
Show file tree
Hide file tree
Showing 14 changed files with 92 additions and 41 deletions.
1 change: 1 addition & 0 deletions examples/rust/account_asset/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ edition = "2021"
[dependencies]
longport = { path = "../../../rust" }
tokio = { version = "1.19", features = ["rt-multi-thread", "macros"] }
tracing-subscriber = { version = "0.3.18", features = ["fmt", "env-filter"] }
7 changes: 6 additions & 1 deletion examples/rust/account_asset/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
use std::sync::Arc;

use longport::{trade::TradeContext, Config};
use tracing_subscriber::EnvFilter;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();

let config = Arc::new(Config::from_env()?);
let (ctx, _) = TradeContext::try_new(config).await?;

let resp = ctx.account_balance().await?;
let resp = ctx.account_balance(None).await?;
println!("{:?}", resp);
Ok(())
}
1 change: 1 addition & 0 deletions examples/rust/http_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ edition = "2021"
[dependencies]
longport = { path = "../../../rust" }
tokio = { version = "1.19", features = ["rt-multi-thread", "macros"] }
tracing-subscriber = { version = "0.3.18", features = ["fmt", "env-filter"] }
5 changes: 5 additions & 0 deletions examples/rust/http_client/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use longport::httpclient::HttpClient;
use tracing_subscriber::EnvFilter;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();

let http_cli = HttpClient::from_env()?;
let resp = http_cli
.request("GET".parse()?, "/v1/trade/execution/today")
Expand Down
1 change: 1 addition & 0 deletions examples/rust/submit_order/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ edition = "2021"
[dependencies]
longport = { path = "../../../rust" }
tokio = { version = "1.19", features = ["rt-multi-thread", "macros"] }
tracing-subscriber = { version = "0.3.18", features = ["fmt", "env-filter"] }
5 changes: 5 additions & 0 deletions examples/rust/submit_order/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@ use longport::{
trade::{OrderSide, OrderType, SubmitOrderOptions, TimeInForceType, TradeContext},
Config,
};
use tracing_subscriber::EnvFilter;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();

let config = Arc::new(Config::from_env()?);
let (ctx, _) = TradeContext::try_new(config).await?;

Expand Down
1 change: 1 addition & 0 deletions examples/rust/subscribe_candlesticks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ edition = "2021"
[dependencies]
longport = { path = "../../../rust" }
tokio = { version = "1.19", features = ["rt-multi-thread", "macros"] }
tracing-subscriber = { version = "0.3.18", features = ["fmt", "env-filter"] }
9 changes: 7 additions & 2 deletions examples/rust/subscribe_candlesticks/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@ use longport::{
quote::{Period, QuoteContext},
Config,
};
use tracing_subscriber::EnvFilter;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();

let config = Arc::new(Config::from_env()?);
let (ctx, mut receiver) = QuoteContext::try_new(config).await?;
ctx.subscribe_candlesticks("AAPL.US", Period::OneMinute)
.await?;
ctx.subscribe_candlesticks("AAPL.US", Period::Day).await?;

while let Some(event) = receiver.recv().await {
println!("{:?}", event);
}
Expand Down
1 change: 1 addition & 0 deletions examples/rust/subscribe_quote/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ edition = "2021"
[dependencies]
longport = { path = "../../../rust" }
tokio = { version = "1.19", features = ["rt-multi-thread", "macros"] }
tracing-subscriber = { version = "0.3.18", features = ["fmt", "env-filter"] }
6 changes: 5 additions & 1 deletion examples/rust/subscribe_quote/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ use longport::{
quote::{QuoteContext, SubFlags},
Config,
};
use tracing_subscriber::EnvFilter;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();

let config = Arc::new(Config::from_env()?);
let (ctx, mut receiver) = QuoteContext::try_new(config).await?;
println!("{}", ctx.member_id());
ctx.subscribe(
["700.HK", "AAPL.US", "TSLA.US", "NFLX.US"],
SubFlags::QUOTE,
Expand Down
1 change: 1 addition & 0 deletions examples/rust/today_orders/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ edition = "2021"
[dependencies]
longport = { path = "../../../rust" }
tokio = { version = "1.19", features = ["rt-multi-thread", "macros"] }
tracing-subscriber = { version = "0.3.18", features = ["fmt", "env-filter"] }
5 changes: 5 additions & 0 deletions examples/rust/today_orders/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use std::sync::Arc;

use longport::{trade::TradeContext, Config};
use tracing_subscriber::EnvFilter;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();

let config = Arc::new(Config::from_env()?);
let (ctx, _) = TradeContext::try_new(config).await?;

Expand Down
2 changes: 1 addition & 1 deletion rust/crates/wsclient/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
codec::Packet, WsClientError, WsClientResult, WsCloseReason, WsEvent, WsResponseErrorDetail,
};

const CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(120);
const AUTH_TIMEOUT: Duration = Duration::from_secs(5);
Expand Down
88 changes: 52 additions & 36 deletions rust/src/quote/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,18 +471,22 @@ impl Core {

for symbol in &symbols {
let mut st = sub_types;

if let Some(candlesticks) = self
.store
.securities
.get(symbol)
.map(|data| &data.candlesticks)
{
if candlesticks.contains_key(&Period::Day) {
st.remove(SubFlags::QUOTE);
} else if !candlesticks.is_empty() {
st.remove(SubFlags::TRADE);
for period in candlesticks.keys() {
if period == &Period::Day {
st.remove(SubFlags::QUOTE);
} else {
st.remove(SubFlags::TRADE);
}
}
}

if !st.is_empty() {
st_group.entry(st).or_default().push(symbol.as_ref());
}
Expand Down Expand Up @@ -622,33 +626,41 @@ impl Core {
.get_mut(&symbol)
.map(|data| &mut data.candlesticks)
{
if periods.remove(&period).is_some()
&& periods.is_empty()
&& !self
if periods.remove(&period).is_some() {
if period == Period::Day {
unsubscribe_quote = true;
} else {
if periods.is_empty()
|| (periods.len() == 1 && periods.contains_key(&Period::Day))
{
unsubscribe_quote = true;
}
}
}

if unsubscribe_quote {
if !self
.subscriptions
.get(&symbol)
.copied()
.unwrap_or_else(SubFlags::empty)
.contains(unsubscribe_sub_flags)
{
unsubscribe_quote = true;
{
self.ws_cli
.request(
cmd_code::UNSUBSCRIBE,
None,
UnsubscribeRequest {
symbol: vec![symbol],
sub_type: unsubscribe_sub_flags.into(),
unsub_all: false,
},
)
.await?;
}
}
}

if unsubscribe_quote {
self.ws_cli
.request(
cmd_code::UNSUBSCRIBE,
None,
UnsubscribeRequest {
symbol: vec![symbol],
sub_type: unsubscribe_sub_flags.into(),
unsub_all: false,
},
)
.await?;
}

Ok(())
}

Expand Down Expand Up @@ -677,22 +689,26 @@ impl Core {
}

async fn resubscribe(&mut self) -> Result<()> {
let mut subscriptions: HashMap<SubFlags, Vec<String>> = HashMap::new();
let mut subscriptions: HashMap<SubFlags, HashSet<String>> = HashMap::new();

for (symbol, flags) in &self.subscriptions {
let mut flags = *flags;
subscriptions
.entry(*flags)
.or_default()
.insert(symbol.clone());
}

if self
.store
.securities
.get(symbol)
.map(|data| !data.candlesticks.is_empty())
.unwrap_or_default()
{
flags |= SubFlags::TRADE;
for (symbol, data) in &self.store.securities {
for period in data.candlesticks.keys() {
subscriptions
.entry(if *period == Period::Day {
SubFlags::QUOTE
} else {
SubFlags::TRADE
})
.or_default()
.insert(symbol.clone());
}

subscriptions.entry(flags).or_default().push(symbol.clone());
}

for (flags, symbols) in subscriptions {
Expand All @@ -701,7 +717,7 @@ impl Core {
cmd_code::SUBSCRIBE,
None,
SubscribeRequest {
symbol: symbols,
symbol: symbols.into_iter().collect(),
sub_type: flags.into(),
is_first_push: false,
},
Expand Down

0 comments on commit 13c1b80

Please sign in to comment.