diff --git a/cs/src/core/Epochs/LightEpoch.cs b/cs/src/core/Epochs/LightEpoch.cs index 01ca58c06..989650d84 100644 --- a/cs/src/core/Epochs/LightEpoch.cs +++ b/cs/src/core/Epochs/LightEpoch.cs @@ -181,7 +181,7 @@ public bool ThisInstanceProtected() } /// - /// Enter the thread into the protected code region + /// Enter the thread into the protected code region. /// /// Current epoch [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -218,42 +218,59 @@ public void Resume() Acquire(); ProtectAndDrain(); } - + /// - /// Increment global current epoch + /// Increment current epoch. /// - /// - long BumpCurrentEpoch() + /// new epoch of the system + public long BumpCurrentEpoch() { - Debug.Assert(this.ThisInstanceProtected(), "BumpCurrentEpoch must be called on a protected thread"); - long nextEpoch = Interlocked.Increment(ref CurrentEpoch); - + var nextEpoch = Interlocked.Increment(ref CurrentEpoch); + if (drainCount > 0) - Drain(nextEpoch); - + { + // track whether we acquired protection when calling from unprotected thread, so we restore the thread to + // its pre-call protection status after we are done + if (!ThisInstanceProtected()) + { + Resume(); + Release(); + } + else + { + ProtectAndDrain(); + } + } + return nextEpoch; } /// - /// Increment current epoch and associate trigger action - /// with the prior epoch + /// Increment current epoch and associate trigger action with the prior epoch. The trigger action will execute + /// on a protected thread only after the prior epoch is safe (i.e., after all active threads have advanced past it) /// /// Trigger action - /// - public void BumpCurrentEpoch(Action onDrain) + /// new epoch of the system + public long BumpCurrentEpoch(Action onDrain) { - long PriorEpoch = BumpCurrentEpoch() - 1; - - int i = 0; - while (true) + Debug.Assert(onDrain != null); + + var nextEpoch = Interlocked.Increment(ref CurrentEpoch); + var priorEpoch = nextEpoch - 1; + // track whether we acquired protection when calling from unprotected thread, so we restore the thread to + // its pre-call protection status after we are done + var acquiredProtection = false; + + for (int i = 0;;) { if (drainList[i].epoch == long.MaxValue) { // This was an empty slot. If it still is, assign this action/epoch to the slot. - if (Interlocked.CompareExchange(ref drainList[i].epoch, long.MaxValue - 1, long.MaxValue) == long.MaxValue) + if (Interlocked.CompareExchange(ref drainList[i].epoch, long.MaxValue - 1, long.MaxValue) == + long.MaxValue) { drainList[i].action = onDrain; - drainList[i].epoch = PriorEpoch; + drainList[i].epoch = priorEpoch; Interlocked.Increment(ref drainCount); break; } @@ -264,12 +281,20 @@ public void BumpCurrentEpoch(Action onDrain) if (triggerEpoch <= SafeToReclaimEpoch) { + // Protection is required whenever we may execute a trigger action + if (!acquiredProtection && !ThisInstanceProtected()) + { + acquiredProtection = true; + Resume(); + } + // This was a slot with an epoch that was safe to reclaim. If it still is, execute its trigger, then assign this action/epoch to the slot. - if (Interlocked.CompareExchange(ref drainList[i].epoch, long.MaxValue - 1, triggerEpoch) == triggerEpoch) + if (Interlocked.CompareExchange(ref drainList[i].epoch, long.MaxValue - 1, triggerEpoch) == + triggerEpoch) { var triggerAction = drainList[i].action; drainList[i].action = onDrain; - drainList[i].epoch = PriorEpoch; + drainList[i].epoch = priorEpoch; triggerAction(); break; } @@ -279,14 +304,36 @@ public void BumpCurrentEpoch(Action onDrain) if (++i == kDrainListSize) { // We are at the end of the drain list and found no empty or reclaimable slot. ProtectAndDrain, which should clear one or more slots. - ProtectAndDrain(); + if (!acquiredProtection && !ThisInstanceProtected()) + { + acquiredProtection = true; + Resume(); + } + else + { + ProtectAndDrain(); + } i = 0; Thread.Yield(); } } - // Now ProtectAndDrain, which may execute the action we just added. - ProtectAndDrain(); + + if (!acquiredProtection && !ThisInstanceProtected()) + { + acquiredProtection = true; + Resume(); + } + else + { + // Now ProtectAndDrain, which may execute the action we just added. + ProtectAndDrain(); + } + + if (acquiredProtection) + Release(); + + return nextEpoch; } /// diff --git a/cs/src/core/FasterLog/FasterLog.cs b/cs/src/core/FasterLog/FasterLog.cs index 82f444149..b29cccafc 100644 --- a/cs/src/core/FasterLog/FasterLog.cs +++ b/cs/src/core/FasterLog/FasterLog.cs @@ -42,6 +42,7 @@ public sealed class FasterLog : IDisposable readonly Queue<(long, FasterLogRecoveryInfo)> ongoingCommitRequests; readonly List coveredCommits = new(); long commitNum, commitCoveredAddress; + private bool logClosing = false; readonly LogCommitPolicy commitPolicy; @@ -74,6 +75,7 @@ public sealed class FasterLog : IDisposable /// public long SafeTailAddress; + /// /// Dictionary of recovered iterators and their committed until addresses /// @@ -231,6 +233,7 @@ public void Reset() CommittedUntilAddress = beginAddress; CommittedBeginAddress = beginAddress; SafeTailAddress = beginAddress; + logClosing = false; commitNum = 0; this.beginAddress = beginAddress; @@ -268,6 +271,8 @@ public void Initialize(long beginAddress, long committedUntilAddress, long lastC commitNum = lastCommitNum; this.beginAddress = beginAddress; + logClosing = false; + if (lastCommitNum > 0) logCommitManager.OnRecovery(lastCommitNum); } @@ -322,24 +327,15 @@ public void Dispose() /// whether to spin until log completion becomes committed public void CompleteLog(bool spinWait = false) { - // Ensure progress even if there is no thread in epoch table. Also, BumpCurrentEpoch must be done on a protected thread. - bool isProtected = epoch.ThisInstanceProtected(); - if (!isProtected) - epoch.Resume(); - try - { - // Ensure all currently started entries will enqueue before we declare log closed - epoch.BumpCurrentEpoch(() => - { - CommitInternal(out _, out _, false, Array.Empty(), long.MaxValue, null); - }); - } - finally + // Use this to signal to future enqueue calls that they should stop as we are closing the log + logClosing = true; + // use a bump to ensure that any concurrent enqueues that have marched passed the check will finish before + // we close the log + epoch.BumpCurrentEpoch(() => { - if (!isProtected) - epoch.Suspend(); - } - + CommitInternal(out _, out _, false, Array.Empty(), long.MaxValue, null); + }); + if (spinWait) WaitForCommit(TailAddress, long.MaxValue); } @@ -480,6 +476,7 @@ public long Enqueue(IEnumerable entries) where T : ILogEnqueueEntry #endregion #region TryEnqueue + /// /// Try to enqueue entry to log (in memory). If it returns true, we are /// done. If it returns false, we need to retry. @@ -497,7 +494,7 @@ public unsafe bool TryEnqueue(T entry, out long logicalAddress) where T : ILo epoch.Resume(); - if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); + if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); if (logicalAddress == 0) @@ -538,7 +535,7 @@ public unsafe bool TryEnqueue(IEnumerable entries, out long logicalAddress ValidateAllocatedLength(allocatedLength); epoch.Resume(); - if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); + if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); @@ -578,8 +575,7 @@ public unsafe bool TryEnqueue(byte[] entry, out long logicalAddress) ValidateAllocatedLength(allocatedLength); epoch.Resume(); - - if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); + if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); if (logicalAddress == 0) @@ -619,8 +615,7 @@ public unsafe bool UnsafeTryEnqueueRaw(ReadOnlySpan entryBytes, bool noCom ValidateAllocatedLength(allocatedLength); epoch.Resume(); - - if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); + if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); if (logicalAddress == 0) @@ -654,8 +649,7 @@ public unsafe bool TryEnqueue(ReadOnlySpan entry, out long logicalAddress) ValidateAllocatedLength(allocatedLength); epoch.Resume(); - - if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); + if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); if (logicalAddress == 0) @@ -689,6 +683,7 @@ public unsafe void Enqueue(THeader userHeader, out long logicalAddress) ValidateAllocatedLength(allocatedLength); epoch.Resume(); + if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); logicalAddress = AllocateBlock(allocatedLength); @@ -715,6 +710,7 @@ public unsafe void Enqueue(THeader userHeader, ref SpanByte item, out l ValidateAllocatedLength(allocatedLength); epoch.Resume(); + if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); logicalAddress = AllocateBlock(allocatedLength); @@ -743,6 +739,7 @@ public unsafe void Enqueue(THeader userHeader, ref SpanByte item1, ref ValidateAllocatedLength(allocatedLength); epoch.Resume(); + if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); logicalAddress = AllocateBlock(allocatedLength); @@ -772,8 +769,9 @@ public unsafe void Enqueue(THeader userHeader, ref SpanByte item1, ref int allocatedLength = headerSize + Align(length); ValidateAllocatedLength(allocatedLength); - epoch.Resume(); - + epoch.Resume(); + if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); + logicalAddress = AllocateBlock(allocatedLength); var physicalAddress = (byte*)allocator.GetPhysicalAddress(logicalAddress); @@ -801,6 +799,7 @@ public unsafe void Enqueue(byte userHeader, ref SpanByte item, out long logicalA ValidateAllocatedLength(allocatedLength); epoch.Resume(); + if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); logicalAddress = AllocateBlock(allocatedLength); @@ -862,6 +861,7 @@ public unsafe bool TryEnqueue(THeader userHeader, ref SpanByte item1, r ValidateAllocatedLength(allocatedLength); epoch.Resume(); + if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); if (logicalAddress == 0) @@ -901,6 +901,7 @@ public unsafe bool TryEnqueue(THeader userHeader, ref SpanByte item1, r ValidateAllocatedLength(allocatedLength); epoch.Resume(); + if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); if (logicalAddress == 0) @@ -938,6 +939,7 @@ public unsafe bool TryEnqueue(byte userHeader, ref SpanByte item, out long logic ValidateAllocatedLength(allocatedLength); epoch.Resume(); + if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); if (logicalAddress == 0) @@ -2533,7 +2535,7 @@ private unsafe bool TryAppend(IReadOnlySpanBatch readOnlySpanBatch, out long log ValidateAllocatedLength(allocatedLength); epoch.Resume(); - if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); + if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); @@ -2779,16 +2781,8 @@ private bool CommitInternal(out long commitTail, out long actualCommitNum, bool } // Otherwise, move to set read-only tail and flush - try - { - epoch.Resume(); - if (!allocator.ShiftReadOnlyToTail(out _, out _)) - CommitMetadataOnly(ref info); - } - finally - { - epoch.Suspend(); - } + if (!allocator.ShiftReadOnlyToTail(out _, out _)) + CommitMetadataOnly(ref info); return true; }