Skip to content

Commit

Permalink
Merge pull request #831 from supabase-community/realtime-channel
Browse files Browse the repository at this point in the history
Improve behavior for realtime channel creation and improve docs
  • Loading branch information
jan-tennert authored Jan 12, 2025
2 parents d6e29cb + c9ca945 commit 6956309
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ sealed interface Realtime : MainPlugin<Realtime.Config>, CustomSerializationPlug
*/
fun disconnect()

@SupabaseInternal
fun Realtime.addChannel(channel: RealtimeChannel)

/**
* Unsubscribes and removes a channel from the [subscriptions]
* @param channel The channel to remove
Expand Down Expand Up @@ -103,6 +100,17 @@ sealed interface Realtime : MainPlugin<Realtime.Config>, CustomSerializationPlug
*/
suspend fun setAuth(token: String? = null)

/**
* Creates a new [RealtimeChannel] and adds it to the [subscriptions]
*
* - This method does not subscribe to the channel. You have to call [RealtimeChannel.subscribe] to do so.
* - If a channel with the same [channelId] already exists, it will be returned
*
* @param channelId The id of the channel
* @param builder The builder for the channel
*/
fun channel(channelId: String, builder: RealtimeChannelBuilder): RealtimeChannel

/**
* @property websocketConfig Custom configuration for the Ktor Websocket Client. This only applies if [Realtime.Config.websocketFactory] is null.
* @property secure Whether to use wss or ws. Defaults to [SupabaseClient.useHTTPS] when null
Expand Down Expand Up @@ -187,11 +195,15 @@ sealed interface Realtime : MainPlugin<Realtime.Config>, CustomSerializationPlug
}

/**
* Creates a new [RealtimeChannel]
* Creates a new [RealtimeChannel] and adds it to the [Realtime.subscriptions]
*
* - This method does not subscribe to the channel. You have to call [RealtimeChannel.subscribe] to do so.
* - If a channel with the same [channelId] already exists, it will be returned
*
* @param channelId The id of the channel
* @param builder The builder for the channel
*/
inline fun Realtime.channel(channelId: String, builder: RealtimeChannelBuilder.() -> Unit = {}): RealtimeChannel {
return RealtimeChannelBuilder("realtime:$channelId", this as RealtimeImpl).apply(builder).build()
}
inline fun Realtime.channel(channelId: String, builder: RealtimeChannelBuilder.() -> Unit = {}): RealtimeChannel = channel(channelId, RealtimeChannelBuilder(RealtimeTopic.withChannelId(channelId)).apply(builder))

/**
* Supabase Realtime is a way to listen to changes in the PostgreSQL database via websockets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import io.github.jan.supabase.realtime.annotations.ChannelDsl
* Used to build a realtime channel
*/
@ChannelDsl
class RealtimeChannelBuilder @PublishedApi internal constructor(private val topic: String, private val realtimeImpl: RealtimeImpl) {
class RealtimeChannelBuilder @PublishedApi internal constructor(private val topic: String) {

private var broadcastJoinConfig = BroadcastJoinConfig(acknowledgeBroadcasts = false, receiveOwnBroadcasts = false)
private var presenceJoinConfig = PresenceJoinConfig("")
Expand All @@ -33,9 +33,9 @@ class RealtimeChannelBuilder @PublishedApi internal constructor(private val topi
}

@SupabaseInternal
fun build(): RealtimeChannel {
fun build(realtime: Realtime): RealtimeChannel {
return RealtimeChannelImpl(
realtimeImpl,
realtime,
topic,
broadcastJoinConfig,
presenceJoinConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,23 @@ import kotlin.reflect.KClass
import kotlin.reflect.KType

internal class RealtimeChannelImpl(
private val realtimeImpl: RealtimeImpl,
override val realtime: Realtime,
override val topic: String,
private val broadcastJoinConfig: BroadcastJoinConfig,
private val presenceJoinConfig: PresenceJoinConfig,
private val isPrivate: Boolean,
) : RealtimeChannel {

private val realtimeImpl: RealtimeImpl = realtime as RealtimeImpl
private val clientChanges = AtomicMutableList<PostgresJoinConfig>()
@SupabaseInternal
override val callbackManager = CallbackManagerImpl(realtimeImpl.serializer)
private val _status = MutableStateFlow(RealtimeChannel.Status.UNSUBSCRIBED)
override val status = _status.asStateFlow()
override val realtime: Realtime = realtimeImpl
override val supabaseClient = realtimeImpl.supabaseClient

private val broadcastUrl = realtimeImpl.broadcastUrl()
private val subTopic = topic.replaceFirst(Regex("^realtime:", RegexOption.IGNORE_CASE), "")
private val subTopic = topic.replaceFirst(Regex("^${RealtimeTopic.PREFIX}:", RegexOption.IGNORE_CASE), "")
private val httpClient = realtimeImpl.supabaseClient.httpClient

private suspend fun accessToken() = realtimeImpl.config.accessToken(supabaseClient) ?: realtimeImpl.accessToken
Expand All @@ -54,9 +54,6 @@ internal class RealtimeChannelImpl(
if(!realtimeImpl.config.connectOnSubscribe) error("You can't subscribe to a channel while the realtime client is not connected. Did you forget to call `realtime.connect()`?")
realtimeImpl.connect()
}
realtimeImpl.run {
addChannel(this@RealtimeChannelImpl)
}
_status.value = RealtimeChannel.Status.SUBSCRIBING
Realtime.logger.d { "Subscribing to channel $topic" }
val currentJwt = accessToken()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.github.jan.supabase.realtime

import io.github.jan.supabase.SupabaseClient
import io.github.jan.supabase.annotations.SupabaseInternal
import io.github.jan.supabase.auth.Auth
import io.github.jan.supabase.auth.status.SessionStatus
import io.github.jan.supabase.buildUrl
Expand Down Expand Up @@ -162,6 +161,14 @@ import kotlin.io.encoding.ExperimentalEncodingApi
_status.value = Realtime.Status.DISCONNECTED
}

override fun channel(channelId: String, builder: RealtimeChannelBuilder): RealtimeChannel {
val topic = RealtimeTopic.withChannelId(channelId)
if(subscriptions.containsKey(topic)) return subscriptions[topic]!!
val channel = builder.build(this)
_subscriptions[topic] = channel
return channel
}

private suspend fun onMessage(message: RealtimeMessage) {
Realtime.logger.d { "Received message $message" }
val channel = subscriptions[message.topic] as? RealtimeChannelImpl
Expand Down Expand Up @@ -234,11 +241,6 @@ import kotlin.io.encoding.ExperimentalEncodingApi
}
}

@SupabaseInternal
override fun Realtime.addChannel(channel: RealtimeChannel) {
_subscriptions[channel.topic] = channel
}

override suspend fun close() {
disconnect()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.github.jan.supabase.realtime

@PublishedApi
internal object RealtimeTopic {

const val PREFIX = "realtime"

fun withChannelId(channelId: String): String {
return "$PREFIX:$channelId"
}

}
17 changes: 17 additions & 0 deletions Realtime/src/commonTest/kotlin/RealtimeTest.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import io.github.jan.supabase.realtime.Realtime
import io.github.jan.supabase.realtime.RealtimeImpl
import io.github.jan.supabase.realtime.RealtimeMessage
import io.github.jan.supabase.realtime.channel
import io.github.jan.supabase.realtime.realtime
import io.ktor.util.encodeBase64
import kotlinx.coroutines.test.runTest
Expand Down Expand Up @@ -31,6 +32,22 @@ class RealtimeTest {
}
}

@Test
fun testExistingChannelShouldBeReturned() {
runTest {
createTestClient(
wsHandler = { _, _ ->
//Does not matter for this test
},
supabaseHandler = {
val channel = it.realtime.channel("channelId")
val channel2 = it.realtime.channel("channelId")
assertEquals(channel, channel2)
}
)
}
}

@Test
fun testSendingRealtimeMessages() {
val expectedMessage = RealtimeMessage(
Expand Down
7 changes: 4 additions & 3 deletions Realtime/src/commonTest/kotlin/RealtimeTestUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import io.github.jan.supabase.realtime.Presence
import io.github.jan.supabase.realtime.RealtimeChannel.Companion.CHANNEL_EVENT_PRESENCE_DIFF
import io.github.jan.supabase.realtime.RealtimeChannel.Companion.CHANNEL_EVENT_SYSTEM
import io.github.jan.supabase.realtime.RealtimeMessage
import io.github.jan.supabase.realtime.RealtimeTopic
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.flow.Flow
Expand All @@ -29,18 +30,18 @@ suspend fun Auth.importAuthTokenValid(token: String) {

suspend fun handleSubscribe(incoming: ReceiveChannel<RealtimeMessage>, outgoing: SendChannel<RealtimeMessage>, channelId: String) {
incoming.receive()
outgoing.send(RealtimeMessage("realtime:$channelId", CHANNEL_EVENT_SYSTEM, buildJsonObject { put("status", "ok") }, ""))
outgoing.send(RealtimeMessage(RealtimeTopic.withChannelId(channelId), CHANNEL_EVENT_SYSTEM, buildJsonObject { put("status", "ok") }, ""))
}

suspend fun SendChannel<RealtimeMessage>.sendBroadcast(channelId: String, event: String, message: JsonObject) {
send(RealtimeMessage("realtime:$channelId", "broadcast", buildJsonObject {
send(RealtimeMessage(RealtimeTopic.withChannelId(channelId), "broadcast", buildJsonObject {
put("event", event)
put("payload", message)
}, ""))
}

suspend fun SendChannel<RealtimeMessage>.sendPresence(channelId: String, joins: Map<String, Presence>, leaves: Map<String, Presence>) {
send(RealtimeMessage("realtime:$channelId", CHANNEL_EVENT_PRESENCE_DIFF, buildJsonObject {
send(RealtimeMessage(RealtimeTopic.withChannelId(channelId), CHANNEL_EVENT_PRESENCE_DIFF, buildJsonObject {
put("joins", transformPresenceMap(joins))
put("leaves", transformPresenceMap(leaves))
}, ""))
Expand Down
19 changes: 19 additions & 0 deletions Realtime/src/commonTest/kotlin/RealtimeTopicTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import io.github.jan.supabase.realtime.RealtimeTopic
import kotlin.test.Test
import kotlin.test.assertEquals

class RealtimeTopicTest {

@Test
fun testRealtimeTopic() {
val channelId = "channelId"
val topic = RealtimeTopic.withChannelId(channelId)
assertEquals("realtime:channelId", topic)
}

@Test
fun testRealtimePrefix() {
assertEquals("realtime", RealtimeTopic.PREFIX)
}

}

0 comments on commit 6956309

Please sign in to comment.