Skip to content

Commit

Permalink
Improve atomicity of dom updates
Browse files Browse the repository at this point in the history
  • Loading branch information
leighmacdonald committed Feb 8, 2025
1 parent 8c17a63 commit fdda03c
Show file tree
Hide file tree
Showing 12 changed files with 113 additions and 107 deletions.
2 changes: 0 additions & 2 deletions frontend/src/api/playerqueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ export type PurgePayload = {
message_ids: number[];
};

export type pingPayload = QueueRequest<{ created_on: Date }>;

export type clientQueueState = {
steam_id: string;
};
Expand Down
21 changes: 9 additions & 12 deletions frontend/src/component/QueueProvider.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import {
LeaveQueuePayload,
Operation,
PermissionLevel,
pingPayload,
PurgePayload,
QueueMember,
QueueRequest,
Expand All @@ -33,7 +32,6 @@ export const QueueProvider = ({ children }: { children: ReactNode }) => {
const [messages, setMessages] = useState<ServerQueueMessage[]>([]);
const [showChat, setShowChat] = useState(false);
const [servers, setServers] = useState<ServerQueueState[]>([]);
const [lastPong, setLastPong] = useState(new Date());
const { profile } = useAuth();
const [chatStatus, setChatStatus] = useState<ChatStatus>(profile.playerqueue_chat_status);
const [reason, setReason] = useState<string>('');
Expand All @@ -43,16 +41,14 @@ export const QueueProvider = ({ children }: { children: ReactNode }) => {
const { readyState, sendJsonMessage, lastJsonMessage } = useWebSocket(websocketURL(), {
queryParams: { token: readAccessToken() },
share: false,
// heartbeat: true,
reconnectInterval: 10,
//reconnectInterval: 10,
shouldReconnect: () => true
});

useEffect(() => {
switch (readyState) {
case ReadyState.OPEN:
setIsReady(true);
sendJsonMessage({ op: Operation.Ping, payload: { created_on: new Date() } } as pingPayload);
setMessages((prevState) => [
...prevState,
{
Expand Down Expand Up @@ -94,18 +90,20 @@ export const QueueProvider = ({ children }: { children: ReactNode }) => {

const handleIncomingOperation = async (request: QueueRequest<never>) => {
switch (request.op) {
case Operation.Pong: {
setLastPong(new Date());
break;
}

case Operation.StateUpdate: {
updateState(request.payload as ClientStatePayload);
break;
}

case Operation.Message: {
setMessages((prev) => [...prev, transformCreatedOnDate(request.payload as ServerQueueMessage)]);
setMessages((prev) => {
const messages = (request.payload as ServerQueueMessage[]).map(transformCreatedOnDate);
let all = [...prev, ...messages];
if (all.length > 100) {
all = all.slice(all.length - 100, 100);
}
return all;
});
break;
}

Expand Down Expand Up @@ -189,7 +187,6 @@ export const QueueProvider = ({ children }: { children: ReactNode }) => {
sendMessage,
joinQueue,
leaveQueue,
lastPong,
showChat,
setShowChat,
chatStatus,
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/component/ServerList.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ export const ServerList = () => {
<Tooltip title="Join/Leave server queue. Number indicates actively queued players. (in testing)">
<IconButton
disabled={!hasPermission(PermissionLevel.Moderator)}
color={queued ? 'success' : undefined}
color={queued ? 'success' : 'primary'}
onClick={() => {
if (queued) {
leaveQueue([String(info.row.original.server_id)]);
Expand Down
2 changes: 0 additions & 2 deletions frontend/src/hooks/useQueueCtx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { ChatStatus, QueueMember, ServerQueueMessage, ServerQueueState } from '.
import { noop } from '../util/lists.ts';

type QueueCtxProps = {
lastPong: Date;
showChat: boolean;
setShowChat: (showChat: boolean) => void;
isReady: boolean;
Expand All @@ -18,7 +17,6 @@ type QueueCtxProps = {
};

export const QueueCtx = createContext<QueueCtxProps>({
lastPong: new Date(),
showChat: false,
isReady: false,
chatStatus: 'noaccess',
Expand Down
32 changes: 19 additions & 13 deletions frontend/src/routes/_guest.servers.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import Typography from '@mui/material/Typography';
import Grid from '@mui/material/Unstable_Grid2';
import { createFileRoute } from '@tanstack/react-router';
import { LatLngLiteral } from 'leaflet';
import { apiGetServerStates, BaseServer } from '../api';
import { apiGetServerStates, BaseServer, PermissionLevel } from '../api';
import { ContainerWithHeaderAndButtons } from '../component/ContainerWithHeaderAndButtons.tsx';
import { LoadingPlaceholder } from '../component/LoadingPlaceholder.tsx';
import { QueueHelp } from '../component/QueueHelp.tsx';
Expand All @@ -23,6 +23,7 @@ import { ServerList } from '../component/ServerList.tsx';
import { ServerMap } from '../component/ServerMap.tsx';
import { Title } from '../component/Title.tsx';
import { MapStateCtx } from '../contexts/MapStateCtx.tsx';
import { useAuth } from '../hooks/useAuth.ts';
import { useMapStateCtx } from '../hooks/useMapStateCtx.ts';
import { ensureFeatureEnabled } from '../util/features.ts';
import { sum } from '../util/lists.ts';
Expand Down Expand Up @@ -122,6 +123,7 @@ export const ServerStats = () => {

function Servers() {
const [servers, setServers] = useState<BaseServer[]>([]);
const { hasPermission } = useAuth();
const [pos, setPos] = useState<LatLngLiteral>({
lat: 0.0,
lng: 0.0
Expand Down Expand Up @@ -195,18 +197,22 @@ function Servers() {
{showHelp && <QueueHelp />}
<ContainerWithHeaderAndButtons
title={`Servers (${selectedServers.length}/${servers.length})`}
buttons={[
<Tooltip title={'Toggle server queue help'} key={'help-queue-button'}>
<IconButton
color={'default'}
onClick={() => {
setShowHelp((prevState) => !prevState);
}}
>
{showHelp ? <HelpIcon /> : <HelpOutlineIcon />}
</IconButton>
</Tooltip>
]}
buttons={
!hasPermission(PermissionLevel.Moderator)
? []
: [
<Tooltip title={'Toggle server queue help'} key={'help-queue-button'}>
<IconButton
color={'default'}
onClick={() => {
setShowHelp((prevState) => !prevState);
}}
>
{showHelp ? <HelpIcon /> : <HelpOutlineIcon />}
</IconButton>
</Tooltip>
]
}
iconLeft={<StorageIcon />}
>
<ServerList />
Expand Down
1 change: 1 addition & 0 deletions internal/cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ func serveCmd() *cobra.Command { //nolint:maintidx
chatlogs = []domain.ChatLog{}
}
playerqueueUC := playerqueue.NewPlayerqueueUsecase(playerqueueRepo, personUsecase, serversUC, stateUsecase, chatlogs, notificationUsecase)
go playerqueueUC.Start(ctx)
playerqueue.NewPlayerqueueHandler(router, authUsecase, configUsecase, playerqueueUC)

if conf.Debug.AddRCONLogAddress != "" {
Expand Down
3 changes: 1 addition & 2 deletions internal/domain/playerqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type PlayerqueueUsecase interface {
Disconnect(client QueueClient)
JoinLobbies(client QueueClient, servers []int) error
LeaveLobbies(client QueueClient, servers []int) error
Start(ctx context.Context)
}

type PlayerqueueQueryOpts struct {
Expand All @@ -48,8 +49,6 @@ type QueueClient interface {
ID() string
// Next handles the incoming operation request
Next(r *Request) error
// Ping performs a ping/pong relay with the client
Ping()
SteamID() steamid.SteamID
Name() string
Avatarhash() string
Expand Down
12 changes: 5 additions & 7 deletions internal/playerqueue/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var (
ErrFindLobby = errors.New("failed to find a Lobby")
ErrHostname = errors.New("failed to resolve hostname")
ErrReadRequest = errors.New("failed to read/decode request")
ErrClosedConnection = errors.New("closed connection")
)

func newClient(steamID steamid.SteamID, name string, avatarHash string, conn *websocket.Conn) domain.QueueClient {
Expand Down Expand Up @@ -74,19 +75,16 @@ func (c *Client) Avatarhash() string {

func (c *Client) Next(request *domain.Request) error {
if err := c.conn.ReadJSON(request); err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
return ErrClosedConnection
}

return errors.Join(err, ErrReadRequest)
}

return nil
}

func (c *Client) Ping() {
c.Send(domain.Response{
Op: domain.Pong,
Payload: pongPayload{CreatedOn: time.Now()},
})
}

func (c *Client) ID() string {
return c.conn.RemoteAddr().String()
}
Expand Down
68 changes: 26 additions & 42 deletions internal/playerqueue/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package playerqueue
import (
"context"
"fmt"
"log/slog"
"sync"
"time"

"github.com/gorilla/websocket"
"github.com/leighmacdonald/gbans/internal/domain"
"github.com/leighmacdonald/gbans/pkg/log"
"github.com/leighmacdonald/steamid/v4/steamid"
"golang.org/x/exp/slices"
"log/slog"
"sync"
)

// TODO Track a users desired minimum size for them to be counted towards play.
Expand All @@ -24,7 +22,7 @@ type Coordinator struct {
clients []domain.QueueClient
chatLogs []domain.ChatLog
mu *sync.RWMutex
currentState func() ([]Lobby, error)
validLobbies func() ([]Lobby, error)
}

func New(chatLogHistorySize int, minQueueSize int, chatlogs []domain.ChatLog, currentStateFunc func() ([]Lobby, error)) *Coordinator {
Expand All @@ -35,36 +33,22 @@ func New(chatLogHistorySize int, minQueueSize int, chatlogs []domain.ChatLog, cu
lobbies: []*Lobby{},
mu: &sync.RWMutex{},
chatLogHistorySize: chatLogHistorySize,
currentState: currentStateFunc,
validLobbies: currentStateFunc,
}
}

func (q *Coordinator) Start(ctx context.Context) {
cleanupTicker := time.NewTicker(time.Second * 30)
refreshState := time.NewTicker(time.Second * 2)
func (q *Coordinator) updateState() {
lobbies, errUpdate := q.validLobbies()
if errUpdate != nil {
slog.Error("Failed to update state", log.ErrAttr(errUpdate))

for {
select {
case <-cleanupTicker.C:
q.removeZombies()
case <-refreshState.C:
state, errUpdate := q.currentState()
if errUpdate != nil {
slog.Error("Failed to update state", log.ErrAttr(errUpdate))

continue
}
return
}

q.UpdateState(state)
q.replaceLobbies(lobbies)

if err := q.checkQueueCompat(); err != nil {
slog.Error("Failed to check queue compatibility", log.ErrAttr(err))
}
case <-ctx.Done():
q.broadcast(domain.Response{Op: domain.Bye, Payload: byePayload{Message: "Server shutting down... run!!!"}})

return
}
if err := q.checkQueueCompat(); err != nil {
slog.Error("Failed to check queue compatibility", log.ErrAttr(err))
}
}

Expand Down Expand Up @@ -119,7 +103,7 @@ func (q *Coordinator) updateClientStates(fullUpdate bool) {
update.Users = players
}

q.broadcast(domain.Response{Op: domain.StateUpdate, Payload: update})
go q.broadcast(domain.Response{Op: domain.StateUpdate, Payload: update})
}

func (q *Coordinator) Leave(client domain.QueueClient, servers []int) error {
Expand Down Expand Up @@ -148,7 +132,7 @@ func (q *Coordinator) Leave(client domain.QueueClient, servers []int) error {
q.mu.Unlock()

if changed {
q.updateClientStates(false)
go q.updateClientStates(false)
}

return nil
Expand Down Expand Up @@ -218,7 +202,7 @@ func (q *Coordinator) Connect(ctx context.Context, steamID steamid.SteamID, name
return client
}

func (q *Coordinator) UpdateState(lobbies []Lobby) {
func (q *Coordinator) replaceLobbies(lobbies []Lobby) {
q.mu.Lock()
defer q.mu.Unlock()

Expand Down Expand Up @@ -427,7 +411,7 @@ func (q *Coordinator) Message(message domain.ChatLog) {
q.mu.RLock()
q.broadcast(domain.Response{
Op: domain.Message,
Payload: message,
Payload: []domain.ChatLog{message},
})
q.mu.RUnlock()
}
Expand Down Expand Up @@ -466,19 +450,19 @@ func (q *Coordinator) removeFromQueues(client domain.QueueClient) {

// sendClientChatHistory sends the last N messages of the chatLogs history to the client provided.
func (q *Coordinator) sendClientChatHistory(client domain.QueueClient) {

var payload []domain.ChatLog

q.mu.RLock()
msgs := make([]domain.Response, len(q.chatLogs))
for i, cl := range q.chatLogs {
msgs[i] = domain.Response{
Op: domain.Message,
Payload: cl,
}
for _, cl := range q.chatLogs {
payload = append(payload, cl)
}
q.mu.RUnlock()

for _, msg := range msgs {
client.Send(msg)
}
go client.Send(domain.Response{
Op: domain.Message,
Payload: payload,
})
}

// UpdateChatStatus updates a client with their new ChatStatus.
Expand Down
8 changes: 1 addition & 7 deletions internal/playerqueue/messages.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
package playerqueue

import (
"time"

"github.com/leighmacdonald/steamid/v4/steamid"
)

type byePayload struct {
Message string `json:"message"`
}

type pingPayload struct {
CreatedOn time.Time `json:"created_on"`
}

type pongPayload = pingPayload
type emptyPayload struct{}

type JoinPayload struct {
Servers []int `json:"servers"`
Expand Down
Loading

0 comments on commit fdda03c

Please sign in to comment.