Skip to content

Commit

Permalink
Close connection in job cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
kraftwerk28 committed Jun 29, 2021
1 parent 655ab4b commit a8bf75a
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 39 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ repositories {

val tgBotVersion = "6.0.4"
val retrofitVersion = "2.7.1"
val plugDir = "MinecraftServers/spigot_1.17/plugins/"
val plugDir = "MinecraftServers/spigot_1.16.5/plugins/"
val homeDir = System.getProperty("user.home")

tasks {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.kraftwerk28.spigot_tg_bridge

import com.google.gson.annotations.SerializedName as Name
import retrofit2.Call
import retrofit2.http.*

interface TgApiService {
Expand Down Expand Up @@ -56,11 +57,11 @@ interface TgApiService {
): TgResponse<Message>

@GET("getUpdates")
suspend fun getUpdates(
fun getUpdates(
@Query("offset") offset: Long,
@Query("limit") limit: Int = 100,
@Query("timeout") timeout: Int = 0,
): TgResponse<List<Update>>
): Call<TgResponse<List<Update>>>

@GET("getMe")
suspend fun getMe(): TgResponse<User>
Expand Down
16 changes: 11 additions & 5 deletions src/main/kotlin/org/kraftwerk28/spigot_tg_bridge/Plugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ class Plugin : JavaPlugin() {
return

val cmdHandler = CommandHandler(this)
tgBot = TgBot(this, config)
loadBot()
tgBot?.let { bot ->
val eventHandler = EventHandler(bot, config)
server.pluginManager.registerEvents(eventHandler, this)
}
getCommand(C.COMMANDS.PLUGIN_RELOAD)?.setExecutor(cmdHandler)
val eventHandler = EventHandler(tgBot!!, config)
server.pluginManager.registerEvents(eventHandler, this)

// Notify Telegram groups about server start
config.serverStartMessage?.let { message ->
Expand All @@ -34,6 +36,11 @@ class Plugin : JavaPlugin() {
logger.info("Plugin started.")
}

fun loadBot() {
tgBot?.let { it.stop() }
tgBot = TgBot(this, config)
}

override fun onDisable() {
if (!config.isEnabled) return
config.serverStopMessage?.let {
Expand All @@ -55,8 +62,7 @@ class Plugin : JavaPlugin() {
fun reload() {
logger.info(C.INFO.reloading)
config.reload(this)
tgBot?.stop()
tgBot = TgBot(this, config)
loadBot()
logger.info(C.INFO.reloadComplete)
}
}
63 changes: 32 additions & 31 deletions src/main/kotlin/org/kraftwerk28/spigot_tg_bridge/TgBot.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import kotlinx.coroutines.channels.*
import okhttp3.OkHttpClient
import okhttp3.logging.HttpLoggingInterceptor
import retrofit2.Retrofit
import retrofit2.Call
import retrofit2.converter.gson.GsonConverterFactory
import java.time.Duration

Expand All @@ -32,13 +33,8 @@ class TgBot(
}

init {
val interceptor = HttpLoggingInterceptor().apply {
level = HttpLoggingInterceptor.Level.NONE;
}

client = OkHttpClient
.Builder()
.addInterceptor(interceptor)
.readTimeout(Duration.ZERO)
.build();

Expand All @@ -65,40 +61,45 @@ class TgBot(
api.setMyCommands(commands)
}

pollJob = scope.launch {
try {
while (true) {
try {
pollUpdates()
} catch (e: Exception) {
e.printStackTrace()
}
}
} catch (e: CancellationException) {}
}
pollJob = initPolling()
handlerJob = initHandler()
}

handlerJob = scope.launch {
try {
while (true) {
handleUpdate()
private fun initPolling() = scope.launch {
var request:
Call<TgApiService.TgResponse<List<TgApiService.Update>>>? = null
try {
while (true) {
try {
request = api.getUpdates(
offset = currentOffset,
timeout = pollTimeout,
)
val response = request.execute().body()
response?.result?.let { updates ->
if (!updates.isEmpty()) {
updates.forEach { updateChan.send(it) }
currentOffset = updates.last().updateId + 1
}
}
} catch (e: Exception) {
e.printStackTrace()
}
} catch (e: CancellationException) {}
}
} catch (e: CancellationException) {
request?.cancel()
}
}

private suspend fun pollUpdates() {
val updatesResponse = api
.getUpdates(offset = currentOffset, timeout = pollTimeout)
updatesResponse.result?.let { updates ->
if (!updates.isEmpty()) {
updates.forEach { updateChan.send(it) }
currentOffset = updates.last().updateId + 1
private fun initHandler() = scope.launch {
try {
while (true) {
handleUpdate(updateChan.receive())
}
}
} catch (e: CancellationException) {}
}

private suspend fun handleUpdate() {
val update = updateChan.receive()
suspend fun handleUpdate(update: TgApiService.Update) {
update.message?.text?.let {
commandRegex.matchEntire(it)?.groupValues?.let {
commandMap[it[1]]?.let { it(update) }
Expand Down

0 comments on commit a8bf75a

Please sign in to comment.