From 064720f00d326893edc24f935cbab34517d60d6e Mon Sep 17 00:00:00 2001 From: Walter Huf Date: Sat, 2 Nov 2024 21:46:41 -0700 Subject: [PATCH] Split out BclMultiplexer --- .../io/bimmergestalt/bcl/BclOutputStream.kt | 6 +- .../io/bimmergestalt/bcl/BclPacketSender.kt | 8 +- .../bimmergestalt/bcl/android/BtConnection.kt | 18 ++- .../bcl/client/BclClientTransport.kt | 127 ++++-------------- .../bcl/multiplex/BclMultiplexer.kt | 126 +++++++++++++++++ 5 files changed, 172 insertions(+), 113 deletions(-) create mode 100644 app/src/main/java/io/bimmergestalt/bcl/multiplex/BclMultiplexer.kt diff --git a/app/src/main/java/io/bimmergestalt/bcl/BclOutputStream.kt b/app/src/main/java/io/bimmergestalt/bcl/BclOutputStream.kt index 82185e9..2ebc986 100644 --- a/app/src/main/java/io/bimmergestalt/bcl/BclOutputStream.kt +++ b/app/src/main/java/io/bimmergestalt/bcl/BclOutputStream.kt @@ -4,12 +4,12 @@ import java.io.OutputStream class BclOutputStream(private val src: Short, private val dest: Short, private val output: BclPacketSender): OutputStream() { override fun write(b: Int) { - output.write(BclPacket.Specialized.Data(src, dest, ByteArray(b))) + output.writePacket(BclPacket.Specialized.Data(src, dest, ByteArray(b))) } override fun write(b: ByteArray?) { b ?: return - output.write(BclPacket.Specialized.Data(src, dest, b)) + output.writePacket(BclPacket.Specialized.Data(src, dest, b)) } override fun write(b: ByteArray?, off: Int, len: Int) { @@ -22,6 +22,6 @@ class BclOutputStream(private val src: Short, private val dest: Short, private v } override fun close() { - output.write(BclPacket.Specialized.Close(src, dest)) + output.writePacket(BclPacket.Specialized.Close(src, dest)) } } \ No newline at end of file diff --git a/app/src/main/java/io/bimmergestalt/bcl/BclPacketSender.kt b/app/src/main/java/io/bimmergestalt/bcl/BclPacketSender.kt index de7feb4..25db8ac 100644 --- a/app/src/main/java/io/bimmergestalt/bcl/BclPacketSender.kt +++ b/app/src/main/java/io/bimmergestalt/bcl/BclPacketSender.kt @@ -3,9 +3,11 @@ package io.bimmergestalt.bcl import org.tinylog.Logger import java.io.OutputStream -class BclPacketSender(private val output: OutputStream) { - @Suppress("BlockingMethodInNonBlockingContext") // Why does linter think this context is nonblocking - fun write(packet: BclPacket) { +interface BclPacketSender { + fun writePacket(packet: BclPacket) +} +class BclPacketSenderConcrete(private val output: OutputStream): BclPacketSender { + override fun writePacket(packet: BclPacket) { if (packet.dest == 5001.toShort()) { Logger.debug {"Sending $packet"} } diff --git a/app/src/main/java/io/bimmergestalt/bcl/android/BtConnection.kt b/app/src/main/java/io/bimmergestalt/bcl/android/BtConnection.kt index e031202..2aa0306 100644 --- a/app/src/main/java/io/bimmergestalt/bcl/android/BtConnection.kt +++ b/app/src/main/java/io/bimmergestalt/bcl/android/BtConnection.kt @@ -5,6 +5,7 @@ import android.bluetooth.BluetoothSocket import io.bimmergestalt.bcl.client.BclClientTransport import io.bimmergestalt.bcl.ConnectionState import io.bimmergestalt.bcl.MutableConnectionState +import io.bimmergestalt.bcl.multiplex.BclMultiplexer import io.bimmergestalt.bcl.protocols.DestProtocolFactory import org.tinylog.kotlin.Logger import java.io.IOException @@ -26,6 +27,8 @@ class BtConnection(val device: BluetoothDevice, val connectionState: MutableConn private var socket: BluetoothSocket? = null var bclConnection: BclClientTransport? = null private set + var bclMultiplexer: BclMultiplexer? = null + private set val isConnected: Boolean get() = bclConnection?.isConnected == true @@ -69,16 +72,21 @@ class BtConnection(val device: BluetoothDevice, val connectionState: MutableConn private fun connectBcl(socket: BluetoothSocket) { while (socket.isConnected) { try { - connectionState.transportState = ConnectionState.TransportState.ACTIVE - val bclConnection = BclClientTransport(socket.inputStream, socket.outputStream, connectionState, destProtocolFactories) - this.bclConnection = bclConnection - bclConnection.connect() - bclConnection.run() + connectionState.transportState = ConnectionState.TransportState.ACTIVE + val bclConnection = BclClientTransport(socket.inputStream, socket.outputStream, connectionState) + this.bclConnection = bclConnection + bclConnection.connect() + val bclMultiplexer = BclMultiplexer(bclConnection, destProtocolFactories) + this.bclMultiplexer = bclMultiplexer + bclMultiplexer.openProtocols() + bclMultiplexer.run() } catch (_: SecurityException) { } catch (e: IOException) { Logger.warn(e) { "IOException communicating BCL" } } catch (_: InterruptedException) { } finally { + bclMultiplexer?.shutdown() + bclMultiplexer = null bclConnection?.shutdown() bclConnection = null } diff --git a/app/src/main/java/io/bimmergestalt/bcl/client/BclClientTransport.kt b/app/src/main/java/io/bimmergestalt/bcl/client/BclClientTransport.kt index ade9e7b..ce7e947 100644 --- a/app/src/main/java/io/bimmergestalt/bcl/client/BclClientTransport.kt +++ b/app/src/main/java/io/bimmergestalt/bcl/client/BclClientTransport.kt @@ -13,8 +13,7 @@ import java.io.OutputStream /** * Runs the BCL protocol over the given socket streams */ -class BclClientTransport(input: InputStream, output: OutputStream, val connectionState: MutableConnectionState, - val destProtocolFactories: Iterable) { +class BclClientTransport(input: InputStream, output: OutputStream, val connectionState: MutableConnectionState): BclPacketSender { companion object { private const val SESSION_INIT_WAIT = 1000L } @@ -22,14 +21,15 @@ class BclClientTransport(input: InputStream, output: OutputStream, val connectio @Suppress("UnstableApiUsage") private val input = CountingInputStream(input) private val output = CountingOutputStream(output) - private val packetOutput = BclPacketSender(output) + private val packetOutput = BclPacketSenderConcrete(output) + @Suppress("UnstableApiUsage") val bytesRead: Long get() = input.count val bytesWritten: Long get() = output.count + var openConnectionCount: Int = 0 - var running = true var state: ConnectionState.BclState private set(value) { connectionState.bclState = value } get() = connectionState.bclState @@ -40,16 +40,12 @@ class BclClientTransport(input: InputStream, output: OutputStream, val connectio val instanceId: Int get() = connectionHandshake?.instanceId?.toInt() ?: -1 - val destProtocols: MutableList = ArrayList() - val openConnections: MutableMap, ProxyClientConnection> = HashMap() - @Throws(IOException::class) fun connect() { state = ConnectionState.BclState.OPENING waitForHandshake() selectProtocol() state = ConnectionState.BclState.ACTIVE - openProtocols() } private fun waitForHandshake() { @@ -86,14 +82,14 @@ class BclClientTransport(input: InputStream, output: OutputStream, val connectio val version = connectionHandshake.version.toByte() if (version > 3) { // hardcoded to version 3, version 4 has an unknown watchdog behavior - packetOutput.write(BclPacket.Specialized.SelectProto(3.toShort())) + packetOutput.writePacket(BclPacket.Specialized.SelectProto(3.toShort())) doKnock() } } private fun doKnock() { // i think empty values work fine here - packetOutput.write(BclPacket.Specialized.Knock( + packetOutput.writePacket(BclPacket.Specialized.Knock( ByteArray(0), ByteArray(0), ByteArray(0), ByteArray(0), 0 /*A4A*/, 1 @@ -102,95 +98,30 @@ class BclClientTransport(input: InputStream, output: OutputStream, val connectio // otherwise 1 /*TouchCommand*/ and 7 } - private fun openProtocols() { - destProtocols.add(WatchdogProtocol.Factory().onConnect( - this, - BclClientConnectionOpener() - )) - destProtocolFactories.forEach { factory -> - try { - destProtocols.add(factory.onConnect(this, BclClientConnectionOpener())) - } catch (e: Exception) { - shutdown() - throw IOException("Failed to initialize protocol $factory") - } - } - } - - inner class BclClientConnectionOpener: ProxyConnectionOpener { - override fun openConnection( - srcPort: Short, - destPort: Short, - client: OutputStream - ): ProxyClientConnection { - val connection = synchronized(openConnections) { - val key = Pair(srcPort, destPort) - Logger.info {"Opening BCL Connection $srcPort:$destPort"} - if (openConnections.containsKey(key)) { - throw IllegalArgumentException("Duplicate to/from connection") - } - val connection = ProxyClientConnection( - srcPort, - destPort, - client, - BclOutputStream(srcPort, destPort, packetOutput) - ) { - closeConnection(it) - } - - openConnections[key] = connection - connection - } - packetOutput.write(BclPacket.Specialized.Open(srcPort, destPort)) - return connection - } - - fun closeConnection(connection: ProxyClientConnection) { - Logger.info {"Closing BCL Connection ${connection.srcPort}:${connection.destPort}"} - synchronized(openConnections) { openConnections.remove(connection.key) } - } - } - - fun run() { - while (running) { - readPacket() - } - } - - private fun readPacket() { + fun readPacket(): BclPacket? { val packet = BclPacket.readFrom(input) - packetOutput.write(BclPacket.Specialized.DataAck(8 + packet.data.size)) + packetOutput.writePacket(BclPacket.Specialized.DataAck(8 + packet.data.size)) if (packet.dest == 5001.toShort() || (packet.command != BclPacket.COMMAND.DATA && packet.command != BclPacket.COMMAND.DATAACK)) { Logger.info {"Received support packet $packet"} } - if (packet.command == BclPacket.COMMAND.DATA) { - val key = Pair(packet.src, packet.dest) - val connection = synchronized(openConnections) { openConnections[key] } - if (connection == null) { - // manually send, which normally the connection.close() handles - packetOutput.write(BclPacket.Specialized.Close(packet.src, packet.dest)) - } - try { - connection?.toClient?.write(packet.data) - } catch (e: IOException) { - synchronized(openConnections) { openConnections.remove(key)?.close() } - } - } - else if (packet.command == BclPacket.COMMAND.CLOSE) { - val key = Pair(packet.src, packet.dest) - val connection = synchronized(openConnections) { openConnections[key] } - connection?.toClient?.close() - } - else if (packet.command == BclPacket.COMMAND.HANGUP) { - shutdown() + when (packet.command) { + BclPacket.COMMAND.DATA -> return packet + BclPacket.COMMAND.CLOSE -> return packet + BclPacket.COMMAND.HANGUP -> shutdown() + else -> Logger.warn {"Received unhandled packet $packet"} } + return null + } + + override fun writePacket(packet: BclPacket) { + packetOutput.writePacket(packet) } fun getReport() = BclConnectionReport( startupTimestamp, bytesRead, bytesWritten, - openConnections.size, + openConnectionCount, instanceId.toShort(), 0, connectionHandshake?.bufferSize ?: -1, @@ -199,20 +130,12 @@ class BclClientTransport(input: InputStream, output: OutputStream, val connectio ) fun shutdown() { - if (running) { - running = false - try { - destProtocols.forEach { protocol -> - protocol.shutdown() - } - openConnections.values.toMutableList().forEach { - it.close() - } - packetOutput.write(BclPacket(BclPacket.COMMAND.HANGUP, 0, 0, ByteArray(0))) - output.flush() - } finally { - state = ConnectionState.BclState.SHUTDOWN - } + try { + packetOutput.writePacket(BclPacket(BclPacket.COMMAND.HANGUP, 0, 0, ByteArray(0))) + output.flush() + state = ConnectionState.BclState.SHUTDOWN + } catch (e: Exception) { + Logger.warn(e) {"Error while shutting down BCL"} } } } \ No newline at end of file diff --git a/app/src/main/java/io/bimmergestalt/bcl/multiplex/BclMultiplexer.kt b/app/src/main/java/io/bimmergestalt/bcl/multiplex/BclMultiplexer.kt new file mode 100644 index 0000000..ecefb20 --- /dev/null +++ b/app/src/main/java/io/bimmergestalt/bcl/multiplex/BclMultiplexer.kt @@ -0,0 +1,126 @@ +package io.bimmergestalt.bcl.multiplex + +import io.bimmergestalt.bcl.BclOutputStream +import io.bimmergestalt.bcl.BclPacket +import io.bimmergestalt.bcl.client.BclClientTransport +import io.bimmergestalt.bcl.client.ProxyConnectionOpener +import io.bimmergestalt.bcl.protocols.DestProtocolFactory +import io.bimmergestalt.bcl.protocols.Protocol +import io.bimmergestalt.bcl.protocols.ProxyClientConnection +import io.bimmergestalt.bcl.protocols.WatchdogProtocol +import org.tinylog.Logger +import java.io.IOException +import java.io.OutputStream + +class BclMultiplexer(val bclTransport: BclClientTransport, + val destProtocolFactories: Iterable) { + + var running = true + + val destProtocols: MutableList = ArrayList() + val openConnections: MutableMap, ProxyClientConnection> = HashMap() + + fun openProtocols() { + destProtocols.add( + WatchdogProtocol.Factory().onConnect( + bclTransport, + BclClientConnectionOpener() + )) + destProtocolFactories.forEach { factory -> + try { + destProtocols.add(factory.onConnect(bclTransport, BclClientConnectionOpener())) + } catch (e: Exception) { + bclTransport.shutdown() + throw IOException("Failed to initialize protocol $factory") + } + } + } + + fun run() { + while (running) { + val packet = bclTransport.readPacket() + if (packet != null) { + handlePacket(packet) + } + } + } + + private fun handlePacket(packet: BclPacket) { + if (packet.command == BclPacket.COMMAND.DATA) { + val key = Pair(packet.src, packet.dest) + val connection = synchronized(openConnections) { openConnections[key] } + if (connection == null) { + // manually send, which normally the connection.close() handles + bclTransport.writePacket(BclPacket.Specialized.Close(packet.src, packet.dest)) + } + try { + connection?.toClient?.write(packet.data) + } catch (e: IOException) { + synchronized(openConnections) { + openConnections.remove(key)?.close() + bclTransport.openConnectionCount = openConnections.size + } + } + } + else if (packet.command == BclPacket.COMMAND.CLOSE) { + val key = Pair(packet.src, packet.dest) + val connection = synchronized(openConnections) { openConnections[key] } + connection?.toClient?.close() + } + } + + inner class BclClientConnectionOpener: ProxyConnectionOpener { + override fun openConnection( + srcPort: Short, + destPort: Short, + client: OutputStream + ): ProxyClientConnection { + val connection = synchronized(openConnections) { + val key = Pair(srcPort, destPort) + Logger.info {"Opening BCL Connection $srcPort:$destPort"} + if (openConnections.containsKey(key)) { + throw IllegalArgumentException("Duplicate to/from connection") + } + val connection = ProxyClientConnection( + srcPort, + destPort, + client, + BclOutputStream(srcPort, destPort, bclTransport) + ) { + closeConnection(it) + } + + openConnections[key] = connection + bclTransport.openConnectionCount = openConnections.size + connection + } + bclTransport.writePacket(BclPacket.Specialized.Open(srcPort, destPort)) + return connection + } + + fun closeConnection(connection: ProxyClientConnection) { + Logger.info {"Closing BCL Connection ${connection.srcPort}:${connection.destPort}"} + synchronized(openConnections) { + openConnections.remove(connection.key) + bclTransport.openConnectionCount = openConnections.size + } + } + } + + fun shutdown() { + if (running) { + running = false + try { + destProtocols.forEach { protocol -> + protocol.shutdown() + } + openConnections.values.toMutableList().forEach { + it.close() + } + bclTransport.shutdown() + } catch (e: Exception) { + Logger.warn(e) { "Error while shutting down BclMultiplexer" } + } + } + } +} \ No newline at end of file