Skip to content

Commit

Permalink
fix potential deadlock in BlockingIOThreadPool (#634)
Browse files Browse the repository at this point in the history
Motivation:

BlockingIOThreadPool used to call out with a lock held and on top of
that on the wrong thread...

Modifications:

Make BlockingIOThreadPool call out on the supplied `queue`

Result:

fewer deadlock and surprises
  • Loading branch information
weissi authored Oct 19, 2018
1 parent 9bae007 commit 176dd6e
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 2 deletions.
6 changes: 4 additions & 2 deletions Sources/NIO/BlockingIOThreadPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ public final class BlockingIOThreadPool {
self.lock.withLock {
switch self.state {
case .running(let items):
items.forEach { $0(.cancelled) }
queue.async {
items.forEach { $0(.cancelled) }
}
self.state = .shuttingDown(Array(repeating: true, count: numberOfThreads))
(0..<numberOfThreads).forEach { _ in
self.semaphore.signal()
Expand Down Expand Up @@ -162,7 +164,7 @@ public final class BlockingIOThreadPool {
self.queues.enumerated().forEach { idAndQueue in
let id = idAndQueue.0
let q = idAndQueue.1
q.async { [unowned self] in
q.async {
self.process(identifier: id)
}
}
Expand Down
3 changes: 3 additions & 0 deletions Tests/NIOTests/BlockingIOThreadPoolTest+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ extension BlockingIOThreadPoolTest {
("testDoubleShutdownWorks", testDoubleShutdownWorks),
("testStateCancelled", testStateCancelled),
("testStateActive", testStateActive),
("testLoseLastReferenceAndShutdownWhileTaskStillRunning", testLoseLastReferenceAndShutdownWhileTaskStillRunning),
("testDeadLockIfCalledOutWithLockHeld", testDeadLockIfCalledOutWithLockHeld),
("testPoolDoesGetReleasedWhenStoppedAndReferencedDropped", testPoolDoesGetReleasedWhenStoppedAndReferencedDropped),
]
}
}
Expand Down
77 changes: 77 additions & 0 deletions Tests/NIOTests/BlockingIOThreadPoolTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import XCTest
import NIO
import Dispatch
import Foundation

class BlockingIOThreadPoolTest: XCTestCase {
func testDoubleShutdownWorks() throws {
Expand Down Expand Up @@ -48,4 +49,80 @@ class BlockingIOThreadPoolTest: XCTestCase {
group.wait()
try threadPool.syncShutdownGracefully()
}

func testLoseLastReferenceAndShutdownWhileTaskStillRunning() throws {
let blockThreadSem = DispatchSemaphore(value: 0)
let allDoneSem = DispatchSemaphore(value: 0)

({
let threadPool = BlockingIOThreadPool(numberOfThreads: 2)
threadPool.start()
threadPool.submit { _ in
Foundation.Thread.sleep(forTimeInterval: 0.1)
}
threadPool.submit { _ in
blockThreadSem.wait()
}
threadPool.shutdownGracefully { error in
XCTAssertNil(error)
allDoneSem.signal()
}
})()
blockThreadSem.signal()
allDoneSem.wait()
}

func testDeadLockIfCalledOutWithLockHeld() throws {
let blockRunningSem = DispatchSemaphore(value: 0)
let blockOneThreadSem = DispatchSemaphore(value: 0)
let threadPool = BlockingIOThreadPool(numberOfThreads: 1)
let allDone = DispatchSemaphore(value: 0)
threadPool.start()
// enqueue one that'll block the whole pool (1 thread only)
threadPool.submit { state in
XCTAssertEqual(state, .active)
blockRunningSem.signal()
blockOneThreadSem.wait()
}
blockRunningSem.wait()
// enqueue one that will be cancelled and then calls shutdown again which needs the lock
threadPool.submit { state in
XCTAssertEqual(state, .cancelled)
threadPool.shutdownGracefully { error in
XCTAssertNil(error)
}
}
threadPool.shutdownGracefully { error in
XCTAssertNil(error)
allDone.signal()
}
blockOneThreadSem.signal() // that'll unblock the thread in the pool
allDone.wait()
}

func testPoolDoesGetReleasedWhenStoppedAndReferencedDropped() throws {
let taskRunningSem = DispatchSemaphore(value: 0)
let doneSem = DispatchSemaphore(value: 0)
let shutdownDoneSem = DispatchSemaphore(value: 0)
weak var weakThreadPool: BlockingIOThreadPool? = nil
({
let threadPool = BlockingIOThreadPool(numberOfThreads: 1)
weakThreadPool = threadPool
threadPool.start()
threadPool.submit { state in
XCTAssertEqual(.active, state)
taskRunningSem.signal()
doneSem.wait()
}
taskRunningSem.wait()
threadPool.shutdownGracefully { error in
XCTAssertNil(error)
shutdownDoneSem.signal()
}
})()
XCTAssertNotNil(weakThreadPool)
doneSem.signal()
shutdownDoneSem.wait()
assert(weakThreadPool == nil, within: .seconds(1))
}
}

0 comments on commit 176dd6e

Please sign in to comment.