diff --git a/cs/src/core/Epochs/LightEpoch.cs b/cs/src/core/Epochs/LightEpoch.cs
index 01ca58c06..989650d84 100644
--- a/cs/src/core/Epochs/LightEpoch.cs
+++ b/cs/src/core/Epochs/LightEpoch.cs
@@ -181,7 +181,7 @@ public bool ThisInstanceProtected()
}
///
- /// Enter the thread into the protected code region
+ /// Enter the thread into the protected code region.
///
/// Current epoch
[MethodImpl(MethodImplOptions.AggressiveInlining)]
@@ -218,42 +218,59 @@ public void Resume()
Acquire();
ProtectAndDrain();
}
-
+
///
- /// Increment global current epoch
+ /// Increment current epoch.
///
- ///
- long BumpCurrentEpoch()
+ /// new epoch of the system
+ public long BumpCurrentEpoch()
{
- Debug.Assert(this.ThisInstanceProtected(), "BumpCurrentEpoch must be called on a protected thread");
- long nextEpoch = Interlocked.Increment(ref CurrentEpoch);
-
+ var nextEpoch = Interlocked.Increment(ref CurrentEpoch);
+
if (drainCount > 0)
- Drain(nextEpoch);
-
+ {
+ // track whether we acquired protection when calling from unprotected thread, so we restore the thread to
+ // its pre-call protection status after we are done
+ if (!ThisInstanceProtected())
+ {
+ Resume();
+ Release();
+ }
+ else
+ {
+ ProtectAndDrain();
+ }
+ }
+
return nextEpoch;
}
///
- /// Increment current epoch and associate trigger action
- /// with the prior epoch
+ /// Increment current epoch and associate trigger action with the prior epoch. The trigger action will execute
+ /// on a protected thread only after the prior epoch is safe (i.e., after all active threads have advanced past it)
///
/// Trigger action
- ///
- public void BumpCurrentEpoch(Action onDrain)
+ /// new epoch of the system
+ public long BumpCurrentEpoch(Action onDrain)
{
- long PriorEpoch = BumpCurrentEpoch() - 1;
-
- int i = 0;
- while (true)
+ Debug.Assert(onDrain != null);
+
+ var nextEpoch = Interlocked.Increment(ref CurrentEpoch);
+ var priorEpoch = nextEpoch - 1;
+ // track whether we acquired protection when calling from unprotected thread, so we restore the thread to
+ // its pre-call protection status after we are done
+ var acquiredProtection = false;
+
+ for (int i = 0;;)
{
if (drainList[i].epoch == long.MaxValue)
{
// This was an empty slot. If it still is, assign this action/epoch to the slot.
- if (Interlocked.CompareExchange(ref drainList[i].epoch, long.MaxValue - 1, long.MaxValue) == long.MaxValue)
+ if (Interlocked.CompareExchange(ref drainList[i].epoch, long.MaxValue - 1, long.MaxValue) ==
+ long.MaxValue)
{
drainList[i].action = onDrain;
- drainList[i].epoch = PriorEpoch;
+ drainList[i].epoch = priorEpoch;
Interlocked.Increment(ref drainCount);
break;
}
@@ -264,12 +281,20 @@ public void BumpCurrentEpoch(Action onDrain)
if (triggerEpoch <= SafeToReclaimEpoch)
{
+ // Protection is required whenever we may execute a trigger action
+ if (!acquiredProtection && !ThisInstanceProtected())
+ {
+ acquiredProtection = true;
+ Resume();
+ }
+
// This was a slot with an epoch that was safe to reclaim. If it still is, execute its trigger, then assign this action/epoch to the slot.
- if (Interlocked.CompareExchange(ref drainList[i].epoch, long.MaxValue - 1, triggerEpoch) == triggerEpoch)
+ if (Interlocked.CompareExchange(ref drainList[i].epoch, long.MaxValue - 1, triggerEpoch) ==
+ triggerEpoch)
{
var triggerAction = drainList[i].action;
drainList[i].action = onDrain;
- drainList[i].epoch = PriorEpoch;
+ drainList[i].epoch = priorEpoch;
triggerAction();
break;
}
@@ -279,14 +304,36 @@ public void BumpCurrentEpoch(Action onDrain)
if (++i == kDrainListSize)
{
// We are at the end of the drain list and found no empty or reclaimable slot. ProtectAndDrain, which should clear one or more slots.
- ProtectAndDrain();
+ if (!acquiredProtection && !ThisInstanceProtected())
+ {
+ acquiredProtection = true;
+ Resume();
+ }
+ else
+ {
+ ProtectAndDrain();
+ }
i = 0;
Thread.Yield();
}
}
- // Now ProtectAndDrain, which may execute the action we just added.
- ProtectAndDrain();
+
+ if (!acquiredProtection && !ThisInstanceProtected())
+ {
+ acquiredProtection = true;
+ Resume();
+ }
+ else
+ {
+ // Now ProtectAndDrain, which may execute the action we just added.
+ ProtectAndDrain();
+ }
+
+ if (acquiredProtection)
+ Release();
+
+ return nextEpoch;
}
///
diff --git a/cs/src/core/FasterLog/FasterLog.cs b/cs/src/core/FasterLog/FasterLog.cs
index 82f444149..b29cccafc 100644
--- a/cs/src/core/FasterLog/FasterLog.cs
+++ b/cs/src/core/FasterLog/FasterLog.cs
@@ -42,6 +42,7 @@ public sealed class FasterLog : IDisposable
readonly Queue<(long, FasterLogRecoveryInfo)> ongoingCommitRequests;
readonly List coveredCommits = new();
long commitNum, commitCoveredAddress;
+ private bool logClosing = false;
readonly LogCommitPolicy commitPolicy;
@@ -74,6 +75,7 @@ public sealed class FasterLog : IDisposable
///
public long SafeTailAddress;
+
///
/// Dictionary of recovered iterators and their committed until addresses
///
@@ -231,6 +233,7 @@ public void Reset()
CommittedUntilAddress = beginAddress;
CommittedBeginAddress = beginAddress;
SafeTailAddress = beginAddress;
+ logClosing = false;
commitNum = 0;
this.beginAddress = beginAddress;
@@ -268,6 +271,8 @@ public void Initialize(long beginAddress, long committedUntilAddress, long lastC
commitNum = lastCommitNum;
this.beginAddress = beginAddress;
+ logClosing = false;
+
if (lastCommitNum > 0) logCommitManager.OnRecovery(lastCommitNum);
}
@@ -322,24 +327,15 @@ public void Dispose()
/// whether to spin until log completion becomes committed
public void CompleteLog(bool spinWait = false)
{
- // Ensure progress even if there is no thread in epoch table. Also, BumpCurrentEpoch must be done on a protected thread.
- bool isProtected = epoch.ThisInstanceProtected();
- if (!isProtected)
- epoch.Resume();
- try
- {
- // Ensure all currently started entries will enqueue before we declare log closed
- epoch.BumpCurrentEpoch(() =>
- {
- CommitInternal(out _, out _, false, Array.Empty(), long.MaxValue, null);
- });
- }
- finally
+ // Use this to signal to future enqueue calls that they should stop as we are closing the log
+ logClosing = true;
+ // use a bump to ensure that any concurrent enqueues that have marched passed the check will finish before
+ // we close the log
+ epoch.BumpCurrentEpoch(() =>
{
- if (!isProtected)
- epoch.Suspend();
- }
-
+ CommitInternal(out _, out _, false, Array.Empty(), long.MaxValue, null);
+ });
+
if (spinWait)
WaitForCommit(TailAddress, long.MaxValue);
}
@@ -480,6 +476,7 @@ public long Enqueue(IEnumerable entries) where T : ILogEnqueueEntry
#endregion
#region TryEnqueue
+
///
/// Try to enqueue entry to log (in memory). If it returns true, we are
/// done. If it returns false, we need to retry.
@@ -497,7 +494,7 @@ public unsafe bool TryEnqueue(T entry, out long logicalAddress) where T : ILo
epoch.Resume();
- if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
+ if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
@@ -538,7 +535,7 @@ public unsafe bool TryEnqueue(IEnumerable entries, out long logicalAddress
ValidateAllocatedLength(allocatedLength);
epoch.Resume();
- if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
+ if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
@@ -578,8 +575,7 @@ public unsafe bool TryEnqueue(byte[] entry, out long logicalAddress)
ValidateAllocatedLength(allocatedLength);
epoch.Resume();
-
- if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
+ if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
@@ -619,8 +615,7 @@ public unsafe bool UnsafeTryEnqueueRaw(ReadOnlySpan entryBytes, bool noCom
ValidateAllocatedLength(allocatedLength);
epoch.Resume();
-
- if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
+ if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
@@ -654,8 +649,7 @@ public unsafe bool TryEnqueue(ReadOnlySpan entry, out long logicalAddress)
ValidateAllocatedLength(allocatedLength);
epoch.Resume();
-
- if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
+ if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
@@ -689,6 +683,7 @@ public unsafe void Enqueue(THeader userHeader, out long logicalAddress)
ValidateAllocatedLength(allocatedLength);
epoch.Resume();
+ if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
logicalAddress = AllocateBlock(allocatedLength);
@@ -715,6 +710,7 @@ public unsafe void Enqueue(THeader userHeader, ref SpanByte item, out l
ValidateAllocatedLength(allocatedLength);
epoch.Resume();
+ if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
logicalAddress = AllocateBlock(allocatedLength);
@@ -743,6 +739,7 @@ public unsafe void Enqueue(THeader userHeader, ref SpanByte item1, ref
ValidateAllocatedLength(allocatedLength);
epoch.Resume();
+ if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
logicalAddress = AllocateBlock(allocatedLength);
@@ -772,8 +769,9 @@ public unsafe void Enqueue(THeader userHeader, ref SpanByte item1, ref
int allocatedLength = headerSize + Align(length);
ValidateAllocatedLength(allocatedLength);
- epoch.Resume();
-
+ epoch.Resume();
+ if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
+
logicalAddress = AllocateBlock(allocatedLength);
var physicalAddress = (byte*)allocator.GetPhysicalAddress(logicalAddress);
@@ -801,6 +799,7 @@ public unsafe void Enqueue(byte userHeader, ref SpanByte item, out long logicalA
ValidateAllocatedLength(allocatedLength);
epoch.Resume();
+ if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
logicalAddress = AllocateBlock(allocatedLength);
@@ -862,6 +861,7 @@ public unsafe bool TryEnqueue(THeader userHeader, ref SpanByte item1, r
ValidateAllocatedLength(allocatedLength);
epoch.Resume();
+ if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
@@ -901,6 +901,7 @@ public unsafe bool TryEnqueue(THeader userHeader, ref SpanByte item1, r
ValidateAllocatedLength(allocatedLength);
epoch.Resume();
+ if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
@@ -938,6 +939,7 @@ public unsafe bool TryEnqueue(byte userHeader, ref SpanByte item, out long logic
ValidateAllocatedLength(allocatedLength);
epoch.Resume();
+ if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
@@ -2533,7 +2535,7 @@ private unsafe bool TryAppend(IReadOnlySpanBatch readOnlySpanBatch, out long log
ValidateAllocatedLength(allocatedLength);
epoch.Resume();
- if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
+ if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
@@ -2779,16 +2781,8 @@ private bool CommitInternal(out long commitTail, out long actualCommitNum, bool
}
// Otherwise, move to set read-only tail and flush
- try
- {
- epoch.Resume();
- if (!allocator.ShiftReadOnlyToTail(out _, out _))
- CommitMetadataOnly(ref info);
- }
- finally
- {
- epoch.Suspend();
- }
+ if (!allocator.ShiftReadOnlyToTail(out _, out _))
+ CommitMetadataOnly(ref info);
return true;
}