Skip to content

Commit

Permalink
Make --no-server mode also terminate on removal of processId file…
Browse files Browse the repository at this point in the history
…, use it in integration test cleanup (#4587)

This attempts to fix some of the residual flakiness encountered in
#4570

Somehow the recursive process termination we pulled in from upstream
com-lihaoyi/os-lib#359 only works some of the
time, and other times the processes get leaked. This PR extends the
`serverId`-file-deleted shutdown logic we already used for client-server
mode and enables it for `--no-server` mode as well. This lets us put an
additional guardrail in our `IntegrationTester#close` to delete any such
files, to try and force any Mill processes to terminate.

Added an additional integration test to exercise this behavior

Also fixed a bug in `ExampleTester` not honoring the `clientServerMode`
flag, and update `testkit.test` to assert on those behaviors
  • Loading branch information
lihaoyi authored Feb 19, 2025
1 parent c3ec8da commit a16a448
Show file tree
Hide file tree
Showing 16 changed files with 187 additions and 67 deletions.
2 changes: 1 addition & 1 deletion core/constants/src/mill/constants/ServerFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* and documentation about what they do
*/
public class ServerFiles {
public static final String serverId = "serverId";
public static final String processId = "processId";
public static final String sandbox = "sandbox";

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import mill.testkit.UtestIntegrationTestSuite

import utest._

object CompileErrorTests extends UtestIntegrationTestSuite {
object FatalErrorTests extends UtestIntegrationTestSuite {
val tests: Tests = Tests {
test - integrationTest { tester =>
val res = tester.eval("fatalTask")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ trait MultiLevelBuildTests extends UtestIntegrationTestSuite {
val Seq(serverFolder) = os.list(tester.workspacePath / "out/mill-server")

// client-server mode should never restart in these tests and preserve the same process,
val currentServerId = os.read(serverFolder / "serverId")
val currentServerId = os.read(serverFolder / "processId")
assert(currentServerId == savedServerId || savedServerId == "")
savedServerId = currentServerId
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package build
import mill._

def foo = Task { 123 }
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package mill.integration

import mill.testkit.{UtestIntegrationTestSuite, IntegrationTester}
import utest._

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import utest.asserts.{RetryMax, RetryInterval}

/**
* Make sure removing the `mill-server` or `mill-no-server` directory
* kills any running process
*/
object ProcessFileDeletedExit extends UtestIntegrationTestSuite {
implicit val retryMax: RetryMax = RetryMax(10.seconds)
implicit val retryInterval: RetryInterval = RetryInterval(1.seconds)
val tests: Tests = Tests {
integrationTest { tester =>
import tester._

assert(!os.exists(workspacePath / "out/mill-server"))
assert(!os.exists(workspacePath / "out/mill-no-server"))

@volatile var watchTerminated = false
Future {
eval(
("--watch", "foo"),
stdout = os.ProcessOutput.Readlines { println(_) },
stderr = os.ProcessOutput.Readlines { println(_) }
)
watchTerminated = true
}

if (tester.clientServerMode) eventually { os.exists(workspacePath / "out/mill-server") }
else eventually { os.exists(workspacePath / "out/mill-no-server") }

assert(watchTerminated == false)

val processRoot =
if (tester.clientServerMode) workspacePath / "out/mill-server"
else workspacePath / "out/mill-no-server"

os.list(processRoot).map(p => os.remove(p / "processId"))

eventually { watchTerminated == true }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ static int launchMillNoServer(String[] args) throws Exception {
interrupted = true;
throw e;
} finally {
if (!interrupted) {
if (!interrupted && Files.exists(processDir)) {
// cleanup if process terminated for sure
Files.walk(processDir)
// depth-first
Expand Down
83 changes: 47 additions & 36 deletions runner/server/src/mill/main/server/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ abstract class Server[T](
var stateCache = stateCache0
def stateCache0: T

val serverId: String = java.lang.Long.toHexString(scala.util.Random.nextLong())
val processId: String = Server.computeProcessId()
def serverLog0(s: String): Unit = {
if (os.exists(serverDir) || testLogEvenWhenServerIdWrong) {
os.write.append(serverDir / ServerFiles.serverLog, s"$s\n", createFolders = true)
}
}

def serverLog(s: String): Unit = serverLog0(s"$serverId $s")
def serverLog(s: String): Unit = serverLog0(s"$processId $s")

def run(): Unit = {
serverLog("running server in " + serverDir)
Expand All @@ -47,7 +47,15 @@ abstract class Server[T](
try {
Server.tryLockBlock(locks.processLock) {
serverLog("server file locked")
watchServerIdFile()
Server.watchProcessIdFile(
serverDir / ServerFiles.processId,
processId,
running = () => running,
exit = msg => {
serverLog(msg)
exitServer()
}
)
val serverSocket = new java.net.ServerSocket(0, 0, InetAddress.getByName(null))
os.write.over(serverDir / ServerFiles.socketPort, serverSocket.getLocalPort.toString)
serverLog("listening on port " + serverSocket.getLocalPort)
Expand Down Expand Up @@ -90,35 +98,6 @@ abstract class Server[T](
pipedInput
}

def watchServerIdFile(): Unit = {
os.write.over(serverDir / ServerFiles.serverId, serverId)

val serverIdThread = new Thread(
() =>
while (running) {
checkServerIdFile() match {
case None => Thread.sleep(100)
case Some(msg) =>
serverLog(msg)
exitServer()
}
},
"Server ID Checker Thread: " + serverDir
)
serverIdThread.start()
}
def checkServerIdFile(): Option[String] = {
Try(os.read(serverDir / ServerFiles.serverId)) match {
case scala.util.Failure(e) => Some(s"serverId file missing")

case scala.util.Success(s) =>
Option.when(s != serverId) {
s"serverId file contents $s does not match serverId $serverId"
}
}

}

def interruptWithTimeout[T](close: () => Unit, t: () => T): Option[T] = {
@volatile var interrupt = true
@volatile var interrupted = false
Expand Down Expand Up @@ -261,11 +240,43 @@ abstract class Server[T](
}

object Server {
def computeProcessId() = {
java.lang.Long.toHexString(scala.util.Random.nextLong()) +
"-pid" +
ProcessHandle.current().pid()
}
def checkProcessIdFile(processIdFile: os.Path, processId: String): Option[String] = {
Try(os.read(processIdFile)) match {
case scala.util.Failure(e) => Some(s"processId file missing")

case scala.util.Success(s) =>
Option.when(s != processId) {
s"processId file contents $s does not match processId $processId"
}
}

}

def watchProcessIdFile(
processIdFile: os.Path,
processId: String,
running: () => Boolean,
exit: String => Unit
): Unit = {
os.write.over(processIdFile, processId, createFolders = true)

def lockBlock[T](lock: Lock)(t: => T): T = {
val l = lock.lock()
try t
finally l.release()
val processIdThread = new Thread(
() =>
while (running()) {
checkProcessIdFile(processIdFile, processId) match {
case None => Thread.sleep(100)
case Some(msg) => exit(msg)
}
},
"Process ID Checker Thread: " + processIdFile
)
processIdThread.setDaemon(true)
processIdThread.start()
}

def tryLockBlock[T](lock: Lock)(t: => T): Option[T] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ object ClientServerTests extends TestSuite {

val ENDL = System.lineSeparator()
class EchoServer(
override val serverId: String,
override val processId: String,
serverDir: os.Path,
locks: Locks,
testLogEvenWhenServerIdWrong: Boolean
Expand Down Expand Up @@ -97,10 +97,10 @@ object ClientServerTests extends TestSuite {
) {
def preRun(serverDir: Path) = { /*do nothing*/ }
def initServer(serverDir: Path, b: Boolean, locks: Locks) = {
val serverId = "server-" + nextServerId
val processId = "server-" + nextServerId
nextServerId += 1
new Thread(new EchoServer(
serverId,
processId,
os.Path(serverDir, os.pwd),
locks,
testLogEvenWhenServerIdWrong
Expand Down Expand Up @@ -173,7 +173,7 @@ object ClientServerTests extends TestSuite {
os.remove.all(res3.outDir)
Thread.sleep(1000)

assert(res3.logsFor("serverId file missing") == Seq("server-1"))
assert(res3.logsFor("processId file missing") == Seq("server-1"))
assert(res3.logsFor("exiting server") == Seq("server-1", "server-1"))
}
}
Expand Down
12 changes: 12 additions & 0 deletions runner/src/mill/runner/MillMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,18 @@ object MillMain {
if (Properties.isWin && Util.hasConsole())
io.github.alexarchambault.windowsansi.WindowsAnsi.setup()

val processId = mill.main.server.Server.computeProcessId()
val out = os.Path(OutFiles.out, WorkspaceRoot.workspaceRoot)
mill.main.server.Server.watchProcessIdFile(
out / OutFiles.millNoServer / processId / ServerFiles.processId,
processId,
running = () => true,
exit = msg => {
System.err.println(msg)
System.exit(0)
}
)

val (result, _) =
try main0(
args = args.tail,
Expand Down
17 changes: 10 additions & 7 deletions testkit/src/mill/testkit/ExampleTester.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,16 @@ object ExampleTester {
millExecutable: os.Path,
bashExecutable: String = defaultBashExecutable(),
workspacePath: os.Path = os.pwd
): Unit = {
new ExampleTester(
): os.Path = {
val tester = new ExampleTester(
clientServerMode,
workspaceSourcePath,
millExecutable,
bashExecutable,
workspacePath
).run()
)
tester.run()
tester.workspacePath
}

def defaultBashExecutable(): String = {
Expand Down Expand Up @@ -96,15 +98,16 @@ class ExampleTester(
}
}
private val millExt = if (isWindows) ".bat" else ""
private val clientServerFlag = if (clientServerMode) "" else "--no-server"

def processCommand(
expectedSnippets: Vector[String],
commandStr0: String,
check: Boolean = true
): Unit = {
val commandStr = commandStr0 match {
case s"mill $rest" => s"./mill$millExt --disable-ticker $rest"
case s"./mill $rest" => s"./mill$millExt --disable-ticker $rest"
case s"mill $rest" => s"./mill$millExt $clientServerFlag --disable-ticker $rest"
case s"./mill $rest" => s"./mill$millExt $clientServerFlag --disable-ticker $rest"
case s"curl $rest" => s"curl --retry 7 --retry-all-errors $rest"
case s => s
}
Expand Down Expand Up @@ -192,7 +195,7 @@ class ExampleTester(
expected.linesIterator.exists(globMatches(_, filtered))
}

def run(): Any = {
def run(): Unit = {
os.makeDir.all(workspacePath)
val parsed = ExampleParser(workspaceSourcePath)
val usageComment = parsed.collect { case ("example", txt) => txt }.mkString("\n\n")
Expand All @@ -204,7 +207,7 @@ class ExampleTester(
for (commandBlock <- commandBlocks) processCommandBlock(commandBlock)
} finally {
if (clientServerMode) processCommand(Vector(), "./mill shutdown", check = false)
removeServerIdFile()
removeProcessIdFile()
}
}
}
2 changes: 1 addition & 1 deletion testkit/src/mill/testkit/IntegrationTester.scala
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ object IntegrationTester {
)
}

removeServerIdFile()
removeProcessIdFile()
}
}

Expand Down
14 changes: 8 additions & 6 deletions testkit/src/mill/testkit/IntegrationTesterBase.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package mill.testkit
import mill.constants.OutFiles.{millServer, out}
import mill.constants.ServerFiles.serverId
import mill.constants.OutFiles.{millServer, millNoServer, out}
import mill.constants.ServerFiles.processId
import mill.util.Retry

trait IntegrationTesterBase {
Expand Down Expand Up @@ -58,10 +58,12 @@ trait IntegrationTesterBase {
/**
* Remove any ID files to try and force them to exit
*/
def removeServerIdFile(): Unit = {
val serverPath0 = os.Path(out, workspacePath) / millServer
if (clientServerMode) {
for (serverPath <- os.list.stream(serverPath0)) os.remove(serverPath / serverId)
def removeProcessIdFile(): Unit = {
val outDir = os.Path(out, workspacePath)
if (os.exists(outDir)) {
val serverPath0 = outDir / (if (clientServerMode) millServer else millNoServer)

for (serverPath <- os.list.stream(serverPath0)) os.remove(serverPath / processId)

Thread.sleep(500) // give a moment for the server to notice the file is gone and exit
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,8 @@ def testTask = Task { os.read(testSource().path).toUpperCase() }
> cat out/testTask.json
..."HELLO WORLD SOURCE FILE!!!"...

> ls out/mill-server # not --no-server, make sure `mill-server` is generated

> ls out/mill-no-server # --no-server, make sure `mill-no-server` is generated

*/
21 changes: 19 additions & 2 deletions testkit/test/src/mill/testkit/ExampleTesterTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,30 @@ import utest._
object ExampleTesterTests extends TestSuite {

def tests: Tests = Tests {
test("example") {
ExampleTester.run(
test("fork") {
val workspacePath = ExampleTester.run(
clientServerMode = false,
workspaceSourcePath =
os.Path(sys.env("MILL_TEST_RESOURCE_DIR")) / "example-test-example-project",
millExecutable = os.Path(sys.env("MILL_EXECUTABLE_PATH"))
)

assert(os.exists(workspacePath / "out/mill-no-server"))

assert(TestkitTestUtils.getProcessIdFiles(workspacePath).isEmpty)
}

test("server") {
val workspacePath = ExampleTester.run(
clientServerMode = true,
workspaceSourcePath =
os.Path(sys.env("MILL_TEST_RESOURCE_DIR")) / "example-test-example-project",
millExecutable = os.Path(sys.env("MILL_EXECUTABLE_PATH"))
)

assert(os.exists(workspacePath / "out/mill-server"))

assert(TestkitTestUtils.getProcessIdFiles(workspacePath).isEmpty)
}
}
}
Loading

0 comments on commit a16a448

Please sign in to comment.