Skip to content

Commit

Permalink
Add Multi Recv API for release/2.4 (#4431)
Browse files Browse the repository at this point in the history
  • Loading branch information
ami-GS authored Aug 5, 2024
1 parent 1cb9a4c commit 408fc9a
Show file tree
Hide file tree
Showing 32 changed files with 544 additions and 45 deletions.
2 changes: 1 addition & 1 deletion .azure/OneBranch.Package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ extends:
ob_createvpack_owneralias: quicdev
ob_createvpack_description: msquic.$(Build.SourceBranchName)
ob_createvpack_versionAs: string
ob_createvpack_version: 2.4.1-$(Build.BuildId)
ob_createvpack_version: 2.4.2-$(Build.BuildId)
steps:
- task: DownloadPipelineArtifact@2
inputs:
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ message(STATUS "Platform version: ${CMAKE_VS_WINDOWS_TARGET_PLATFORM_VERSION}")
message(STATUS "Build type: ${CMAKE_BUILD_TYPE}")

set(QUIC_MAJOR_VERSION 2)
set(QUIC_FULL_VERSION 2.4.1)
set(QUIC_FULL_VERSION 2.4.2)

if (WIN32)
set(CX_PLATFORM "windows")
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "msquic"
version = "2.4.1-beta"
version = "2.4.2-beta"
edition = "2018"
authors = ["Microsoft"]
description = "Microsoft implementation of the IETF QUIC protocol"
Expand Down
1 change: 1 addition & 0 deletions docs/Settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ The following settings are available via registry as well as via [QUIC_SETTINGS]
| Stateless Operation Expiration | uint16_t | StatelessOperationExpirationMs | 100 | The time limit between operations for the same endpoint, in milliseconds. |
| Congestion Control Algorithm | uint16_t | CongestionControlAlgorithm | 0 (Cubic) | The congestion control algorithm used for the connection. |
| ECN | uint8_t | EcnEnabled | 0 (FALSE) | Enable sender-side ECN support. |
| Stream Multi Receive | uint8_t | StreamMultiReceiveEnabled | 0 (FALSE) | Enable multi receive support |

The types map to registry types as follows:
- `uint64_t` is a `REG_QWORD`.
Expand Down
12 changes: 10 additions & 2 deletions docs/Streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ Typically, the buffer count is one, which means that most events will include a

When the buffer count is 0, it signifies the reception of a QUIC frame with empty data, which also indicates the end of stream data.

Currently, the maximum buffer count is 2 in the case of partial receive, where only a portion of the buffer data is consumed (as explained below). However, it is strongly advised not to assume in application code that the upper limit is always 2. This caution is important because future releases may incorporate multiple circular buffers to enhance performance, leading to potential changes in the buffer count limit.
Currently, the maximum buffer count is 3 in the case of partial receive, where only a portion of the buffer data is consumed (as explained below). However, it is strongly advised not to assume in application code that the upper limit is always 3. This caution is important because future releases may change internal algorithm, leading to potential changes in the buffer count limit.

The app then may respond to the event in a number of ways:

Expand All @@ -100,4 +100,12 @@ Any value less than or equal to the initial **TotalBufferLength** value is allow

Whenever a receive isn't fully accepted by the app, additional receive events are immediately disabled. The app is assumed to be at capacity and not able to consume more until further indication. To re-enable receive callbacks, the app must call [StreamReceiveSetEnabled](api/StreamReceiveSetEnabled.md).

There are cases where an app may want to partially accept the current data, but still immediately get a callback with the rest of the data. To do this (only works in the synchronous flow) the app must return `QUIC_STATUS_CONTINUE`.
There are cases where an app may want to partially accept the current data, but still immediately get a callback with the rest of the data. To do this (only works in the synchronous flow) the app must return `QUIC_STATUS_CONTINUE`.

## Multi Receive mode

Setting [`StreamMultiReceiveEnabled`](./Settings.md) an app can continue getting indicated by `QUIC_STREAM_EVENT_RECEIVE` without returning `QUIC_STATUS_SUCCESS` nor calling [StreamReceiveComplete](api/StreamReceiveComplete.md).

This changes internal receive buffer more efficient for continuous receiving.

The app need to keep track of total `TotalBufferLength` to later call [StreamReceiveComplete](api/StreamReceiveComplete.md) appropriately.
12 changes: 10 additions & 2 deletions docs/api/QUIC_SETTINGS.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ typedef struct QUIC_SETTINGS {
uint64_t ReliableResetEnabled : 1;
uint64_t OneWayDelayEnabled : 1;
uint64_t NetStatsEventEnabled : 1;
uint64_t RESERVED : 22;
uint64_t StreamMultiReceiveEnabled : 1;
uint64_t RESERVED : 21;
#else
uint64_t RESERVED : 26;
#endif
Expand Down Expand Up @@ -104,7 +105,8 @@ typedef struct QUIC_SETTINGS {
uint64_t ReliableResetEnabled : 1;
uint64_t OneWayDelayEnabled : 1;
uint64_t NetStatsEventEnabled : 1;
uint64_t ReservedFlags : 59;
uint64_t StreamMultiReceiveEnabled : 1;
uint64_t ReservedFlags : 58;
#else
uint64_t ReservedFlags : 63;
#endif
Expand Down Expand Up @@ -351,6 +353,12 @@ Initial stream receive flow control window size for remotely initiated unidirect

**Default value:** 0 (no overwrite)

`StreamMultiReceiveEnabled`

Enable multi receive mode. An app can continue receiving stream data without calling `StreamReceiveComplete` for each `QUIC_STREAM_EVENT_RECEIVE` indication.

**Default value:** 0 (`FALSE`)

# Remarks

When setting new values for the settings, the app must set the corresponding `.IsSet.*` parameter for each actual parameter that is being set or updated. For example:
Expand Down
3 changes: 2 additions & 1 deletion docs/api/StreamReceiveComplete.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ void
# Remarks
This is an asynchronous API but can run inline if called in a callback.
The application must ensure that one `StreamReceiveComplete` call corresponds to one `QUIC_STREAM_EVENT_RECEIVE` event.
The application, without setting `StreamMultiReceiveEnabled`, must ensure that one `StreamReceiveComplete` call corresponds to one `QUIC_STREAM_EVENT_RECEIVE` event.
Duplicate `StreamReceiveComplete` calls after one `QUIC_STREAM_EVENT_RECEIVE` event are ignored silently even with different `BufferLength`.
The `StreamMultiReceiveEnabled` mode doesn't follow this rule. Multiple `QUIC_STREAM_EVENT_RECEIVE` events can be indicated at once by `StreamReceiveComplete`. The application needs to keep track of accumulated `TotalBufferLength` with this mode.
# See Also
Expand Down
2 changes: 1 addition & 1 deletion scripts/package-distribution.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ $ArtifactsBinDir = Join-Path $BaseArtifactsDir "bin"
# All direct subfolders are OS's
$Platforms = Get-ChildItem -Path $ArtifactsBinDir

$Version = "2.4.1"
$Version = "2.4.2"

$WindowsBuilds = @()
$AllBuilds = @()
Expand Down
2 changes: 1 addition & 1 deletion scripts/package-nuget.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ $DistDir = Join-Path $BaseArtifactsDir "dist"
$CurrentCommitHash = Get-GitHash -RepoDir $RootDir
$RepoRemote = Get-GitRemote -RepoDir $RootDir

$Version = "2.4.1"
$Version = "2.4.2"

$BuildId = $env:BUILD_BUILDID
if ($null -ne $BuildId) {
Expand Down
2 changes: 1 addition & 1 deletion scripts/write-versions.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ $ArtifactsDir = $BuildConfig.ArtifactsDir
$SourceVersion = $env:BUILD_SOURCEVERSION;
$SourceBranch = $env:BUILD_SOURCEBRANCH;
$BuildId = $env:BUILD_BUILDID;
$VersionNumber = "2.4.1";
$VersionNumber = "2.4.2";

class BuildData {
[string]$SourceVersion;
Expand Down
6 changes: 6 additions & 0 deletions src/core/quicdef.h
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,11 @@ CXPLAT_STATIC_ASSERT(
//
#define QUIC_DEFAULT_NET_STATS_EVENT_ENABLED FALSE

//
// The default settings for using multiple parallel receives for streams.
//
#define QUIC_DEFAULT_STREAM_MULTI_RECEIVE_ENABLED FALSE

//
// The number of rounds in Cubic Slow Start to sample RTT.
//
Expand Down Expand Up @@ -633,6 +638,7 @@ CXPLAT_STATIC_ASSERT(
#define QUIC_SETTING_RELIABLE_RESET_ENABLED "ReliableResetEnabled"
#define QUIC_SETTING_ONE_WAY_DELAY_ENABLED "OneWayDelayEnabled"
#define QUIC_SETTING_NET_STATS_EVENT_ENABLED "NetStatsEventEnabled"
#define QUIC_SETTING_STREAM_MULTI_RECEIVE_ENABLED "StreamMultiReceiveEnabled"

#define QUIC_SETTING_INITIAL_WINDOW_PACKETS "InitialWindowPackets"
#define QUIC_SETTING_SEND_IDLE_TIMEOUT_MS "SendIdleTimeoutMs"
Expand Down
41 changes: 41 additions & 0 deletions src/core/settings.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ QuicSettingsSetDefault(
if (!Settings->IsSet.NetStatsEventEnabled) {
Settings->NetStatsEventEnabled = QUIC_DEFAULT_NET_STATS_EVENT_ENABLED;
}
if (!Settings->IsSet.StreamMultiReceiveEnabled) {
Settings->StreamMultiReceiveEnabled = QUIC_DEFAULT_STREAM_MULTI_RECEIVE_ENABLED;
}
}

_IRQL_requires_max_(PASSIVE_LEVEL)
Expand Down Expand Up @@ -330,6 +333,9 @@ QuicSettingsCopy(
if (!Destination->IsSet.NetStatsEventEnabled) {
Destination->NetStatsEventEnabled = Source->NetStatsEventEnabled;
}
if (!Destination->IsSet.StreamMultiReceiveEnabled) {
Destination->StreamMultiReceiveEnabled = Source->StreamMultiReceiveEnabled;
}
}

_IRQL_requires_max_(PASSIVE_LEVEL)
Expand Down Expand Up @@ -700,6 +706,11 @@ QuicSettingApply(
Destination->NetStatsEventEnabled = Source->NetStatsEventEnabled;
Destination->IsSet.NetStatsEventEnabled = TRUE;
}

if (Source->IsSet.StreamMultiReceiveEnabled && (!Destination->IsSet.StreamMultiReceiveEnabled || OverWrite)) {
Destination->StreamMultiReceiveEnabled = Source->StreamMultiReceiveEnabled;
Destination->IsSet.StreamMultiReceiveEnabled = TRUE;
}
return TRUE;
}

Expand Down Expand Up @@ -1358,6 +1369,16 @@ QuicSettingsLoad(
&ValueLen);
Settings->NetStatsEventEnabled = !!Value;
}
if (!Settings->IsSet.StreamMultiReceiveEnabled) {
Value = QUIC_DEFAULT_STREAM_MULTI_RECEIVE_ENABLED;
ValueLen = sizeof(Value);
CxPlatStorageReadValue(
Storage,
QUIC_SETTING_STREAM_MULTI_RECEIVE_ENABLED,
(uint8_t*)&Value,
&ValueLen);
Settings->StreamMultiReceiveEnabled = !!Value;
}
}

_IRQL_requires_max_(PASSIVE_LEVEL)
Expand Down Expand Up @@ -1426,6 +1447,7 @@ QuicSettingsDump(
QuicTraceLogVerbose(SettingReliableResetEnabled, "[sett] ReliableResetEnabled = %hhu", Settings->ReliableResetEnabled);
QuicTraceLogVerbose(SettingOneWayDelayEnabled, "[sett] OneWayDelayEnabled = %hhu", Settings->OneWayDelayEnabled);
QuicTraceLogVerbose(SettingNetStatsEventEnabled, "[sett] NetStatsEventEnabled = %hhu", Settings->NetStatsEventEnabled);
QuicTraceLogVerbose(SettingsStreamMultiReceiveEnabled, "[sett] StreamMultiReceiveEnabled= %hhu", Settings->StreamMultiReceiveEnabled);
}

_IRQL_requires_max_(PASSIVE_LEVEL)
Expand Down Expand Up @@ -1587,6 +1609,9 @@ QuicSettingsDumpNew(
if (Settings->IsSet.NetStatsEventEnabled) {
QuicTraceLogVerbose(SettingNetStatsEventEnabled, "[sett] NetStatsEventEnabled = %hhu", Settings->NetStatsEventEnabled);
}
if (Settings->IsSet.StreamMultiReceiveEnabled) {
QuicTraceLogVerbose(SettingStreamMultiReceiveEnabled, "[sett] StreamMultiReceiveEnabled = %hhu", Settings->StreamMultiReceiveEnabled);
}
}

#define SETTINGS_SIZE_THRU_FIELD(SettingsType, Field) \
Expand Down Expand Up @@ -1843,6 +1868,14 @@ QuicSettingsSettingsToInternal(
SettingsSize,
InternalSettings);

SETTING_COPY_FLAG_TO_INTERNAL_SIZED(
Flags,
StreamMultiReceiveEnabled,
QUIC_SETTINGS,
Settings,
SettingsSize,
InternalSettings);

return QUIC_STATUS_SUCCESS;
}

Expand Down Expand Up @@ -2004,6 +2037,14 @@ QuicSettingsGetSettings(
*SettingsLength,
InternalSettings);

SETTING_COPY_FLAG_FROM_INTERNAL_SIZED(
Flags,
StreamMultiReceiveEnabled,
QUIC_SETTINGS,
Settings,
*SettingsLength,
InternalSettings);

*SettingsLength = CXPLAT_MIN(*SettingsLength, sizeof(QUIC_SETTINGS));

return QUIC_STATUS_SUCCESS;
Expand Down
4 changes: 3 additions & 1 deletion src/core/settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ typedef struct QUIC_SETTINGS_INTERNAL {
uint64_t ReliableResetEnabled : 1;
uint64_t OneWayDelayEnabled : 1;
uint64_t NetStatsEventEnabled : 1;
uint64_t RESERVED : 17;
uint64_t StreamMultiReceiveEnabled : 1;
uint64_t RESERVED : 16;
} IsSet;
};

Expand Down Expand Up @@ -111,6 +112,7 @@ typedef struct QUIC_SETTINGS_INTERNAL {
uint8_t ReliableResetEnabled : 1;
uint8_t OneWayDelayEnabled : 1;
uint8_t NetStatsEventEnabled : 1;
uint8_t StreamMultiReceiveEnabled : 1;
uint8_t MtuDiscoveryMissingProbeCount;

} QUIC_SETTINGS_INTERNAL;
Expand Down
4 changes: 3 additions & 1 deletion src/core/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ QuicStreamInitialize(
Stream->Flags.Allocated = TRUE;
Stream->Flags.SendEnabled = TRUE;
Stream->Flags.ReceiveEnabled = TRUE;
Stream->Flags.ReceiveMultiple = Connection->Settings.StreamMultiReceiveEnabled;
Stream->RecvMaxLength = UINT64_MAX;
Stream->RefCount = 1;
Stream->SendRequestsTail = &Stream->SendRequests;
Expand Down Expand Up @@ -131,7 +132,8 @@ QuicStreamInitialize(
&Stream->RecvBuffer,
InitialRecvBufferLength,
FlowControlWindowSize,
QUIC_RECV_BUF_MODE_CIRCULAR,
Stream->Flags.ReceiveMultiple ?
QUIC_RECV_BUF_MODE_MULTIPLE : QUIC_RECV_BUF_MODE_CIRCULAR,
PreallocatedRecvChunk);
if (QUIC_FAILED(Status)) {
goto Exit;
Expand Down
1 change: 1 addition & 0 deletions src/core/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ typedef union QUIC_STREAM_FLAGS {

BOOLEAN SendEnabled : 1; // Application is allowed to send data.
BOOLEAN ReceiveEnabled : 1; // Application is ready for receive callbacks.
BOOLEAN ReceiveMultiple : 1; // The app supports multiple parallel receive indications.
BOOLEAN ReceiveFlushQueued : 1; // The receive flush operation is queued.
BOOLEAN ReceiveDataPending : 1; // Data (or FIN) is queued and ready for delivery.
BOOLEAN ReceiveCallActive : 1; // There is an active receive to the app.
Expand Down
25 changes: 13 additions & 12 deletions src/core/stream_recv.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,7 @@ QuicStreamRecvQueueFlush(
// The caller has indicated data is ready to be indicated to the
// application. Queue a FLUSH_RECV if one isn't already queued.
//

if (Stream->Flags.ReceiveEnabled &&
Stream->Flags.ReceiveDataPending &&
Stream->RecvPendingLength == 0) {
if (Stream->Flags.ReceiveEnabled && Stream->Flags.ReceiveDataPending) {

if (AllowInlineFlush) {
QuicStreamRecvFlush(Stream);
Expand Down Expand Up @@ -541,7 +538,9 @@ QuicStreamProcessStreamFrame(
}
}

if (ReadyToDeliver) {
if (ReadyToDeliver &&
(Stream->RecvBuffer.RecvMode == QUIC_RECV_BUF_MODE_MULTIPLE ||
Stream->RecvBuffer.ReadPendingLength == 0)) {
Stream->Flags.ReceiveDataPending = TRUE;
QuicStreamRecvQueueFlush(
Stream,
Expand Down Expand Up @@ -870,8 +869,6 @@ QuicStreamRecvFlush(
return;
}

CXPLAT_TEL_ASSERT(!Stream->RecvPendingLength); // N.B. - Will be an invalid assert once we support multiple receives

BOOLEAN FlushRecv = TRUE;
while (FlushRecv) {
CXPLAT_DBG_ASSERT(!Stream->Flags.SentStopSending);
Expand Down Expand Up @@ -924,9 +921,10 @@ QuicStreamRecvFlush(
Event.RECEIVE.Flags |= QUIC_RECEIVE_FLAG_FIN; // TODO - 0-RTT flag?
}

Stream->Flags.ReceiveEnabled = FALSE;
Stream->Flags.ReceiveEnabled = Stream->Flags.ReceiveMultiple;
Stream->Flags.ReceiveCallActive = TRUE;
Stream->RecvPendingLength += Event.RECEIVE.TotalBufferLength;
CXPLAT_DBG_ASSERT(Stream->RecvPendingLength <= Stream->RecvBuffer.ReadPendingLength);

QuicTraceEvent(
StreamAppReceive,
Expand Down Expand Up @@ -1056,7 +1054,7 @@ QuicStreamReceiveComplete(
//
Stream->Flags.ReceiveEnabled = TRUE;

} else {
} else if (!Stream->Flags.ReceiveMultiple) {
//
// The app didn't drain all the data, so we will need to wait for them
// to request a new receive.
Expand All @@ -1080,9 +1078,10 @@ QuicStreamReceiveComplete(
if (Stream->Flags.ReceiveDataPending) {
//
// There is still more data for the app to process and it still has
// receive callbacks enabled, so do another recv flush.
// receive callbacks enabled, so do another recv flush (if not already
// doing multi-receive mode).
//
return TRUE;
return !Stream->Flags.ReceiveMultiple;
}

if (Stream->RecvBuffer.BaseOffset == Stream->RecvMaxLength) {
Expand Down Expand Up @@ -1157,7 +1156,9 @@ QuicStreamRecvSetEnabledState(
CXPLAT_DBG_ASSERT(!Stream->Flags.SentStopSending);
Stream->Flags.ReceiveEnabled = NewRecvEnabled;

if (Stream->Flags.Started && NewRecvEnabled) {
if (Stream->Flags.Started && NewRecvEnabled &&
(Stream->RecvBuffer.RecvMode == QUIC_RECV_BUF_MODE_MULTIPLE ||
Stream->RecvBuffer.ReadPendingLength == 0)) {
//
// The application just resumed receive callbacks. Queue a
// flush receive operation to start draining the receive buffer.
Expand Down
2 changes: 2 additions & 0 deletions src/core/unittest/SettingsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ TEST(SettingsTest, TestAllSettingsFieldsSet)
SETTINGS_FEATURE_SET_TEST(ReliableResetEnabled, QuicSettingsSettingsToInternal);
SETTINGS_FEATURE_SET_TEST(OneWayDelayEnabled, QuicSettingsSettingsToInternal);
SETTINGS_FEATURE_SET_TEST(NetStatsEventEnabled, QuicSettingsSettingsToInternal);
SETTINGS_FEATURE_SET_TEST(StreamMultiReceiveEnabled, QuicSettingsSettingsToInternal);

Settings.IsSetFlags = 0;
Settings.IsSet.RESERVED = ~Settings.IsSet.RESERVED;
Expand Down Expand Up @@ -209,6 +210,7 @@ TEST(SettingsTest, TestAllSettingsFieldsGet)
SETTINGS_FEATURE_GET_TEST(ReliableResetEnabled, QuicSettingsGetSettings);
SETTINGS_FEATURE_GET_TEST(OneWayDelayEnabled, QuicSettingsGetSettings);
SETTINGS_FEATURE_GET_TEST(NetStatsEventEnabled, QuicSettingsGetSettings);
SETTINGS_FEATURE_GET_TEST(StreamMultiReceiveEnabled, QuicSettingsGetSettings);

Settings.IsSetFlags = 0;
Settings.IsSet.RESERVED = ~Settings.IsSet.RESERVED;
Expand Down
Loading

0 comments on commit 408fc9a

Please sign in to comment.