Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add web socket server #407

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1 +1,36 @@
# GraphQL Subscription Server

A subscription server for GraphQL subscriptions. Supports streaming over plain web sockets
or Socket.IO, and integrates with Redis or any other Pub/Sub service.

## Setup

### Socket.IO

```js
import http from 'http';
import {
SocketIOSubscriptionServer, // or WebSocketSubscriptionServer
JwtCredentialManager,
RedisSubscriber,
} from '@4c/graphql-subscription-server';

const server = http.createServer();

const subscriptionServer = new SocketIOSubscriptionServer({
schema,
path: '/socket.io/graphql',
subscriber: new RedisSubscriber(),
hasPermission: (message, credentials) => {
authorize(message, credentials);
},
createCredentialsManager: (req) => new JwtCredentialManager(),
createLogger: () => console.debug,
});

subscriptionServer.attach(server);

server.listen(4000, () => {
console.log('server running');
});
```
8 changes: 6 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
"tdd": "jest --watch",
"test": "yarn lint && yarn typecheck && jest",
"testonly": "jest",
"typecheck": "tsc --noEmit && tsc -p test --noEmit"
"typecheck": "tsc --noEmit && tsc -p test --noEmit",
"update-schema": "NODE_ENV=test babel-node ./update-schema.js"
},
"gitHooks": {
"pre-commit": "lint-staged"
Expand All @@ -43,8 +44,11 @@
"conventionalCommits": true
},
"dependencies": {
"@types/ws": "^7.4.1",
"express": "^4.17.1",
"redis": "^3.1.2"
"graphql-ws": "^4.3.2",
"redis": "^3.1.2",
"ws": "^7.4.5"
},
"peerDependencies": {
"graphql": ">=0.12.3",
Expand Down
21 changes: 10 additions & 11 deletions src/AuthorizedSocketConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import {
validate,
} from 'graphql';
import { ExecutionResult } from 'graphql/execution/execute';
import io from 'socket.io';

import * as AsyncUtils from './AsyncUtils';
import { CredentialsManager } from './CredentialsManager';
import { CreateLogger, Logger } from './Logger';
import { Subscriber } from './Subscriber';
import SubscriptionContext from './SubscriptionContext';
import { WebSocket } from './types';

export type CreateValidationRules = ({
query,
Expand Down Expand Up @@ -62,7 +62,7 @@ const acknowledge = (cb?: () => void) => {
* - Rudimentary connection constraints (max connections)
*/
export default class AuthorizedSocketConnection<TContext, TCredentials> {
socket: io.Socket;
socket: WebSocket;

config: AuthorizedSocketOptions<TContext, TCredentials>;

Expand All @@ -76,7 +76,7 @@ export default class AuthorizedSocketConnection<TContext, TCredentials> {
readonly clientId: string;

constructor(
socket: io.Socket,
socket: WebSocket,
config: AuthorizedSocketOptions<TContext, TCredentials>,
) {
this.socket = socket;
Expand All @@ -85,14 +85,13 @@ export default class AuthorizedSocketConnection<TContext, TCredentials> {
this.log = config.createLogger('AuthorizedSocket');
this.subscriptionContexts = new Map();

this.clientId = this.socket.id;
this.clientId = this.socket.id!;

this.socket
.on('authenticate', this.handleAuthenticate)
.on('subscribe', this.handleSubscribe)
.on('unsubscribe', this.handleUnsubscribe)
.on('connect', this.handleConnect)
.on('disconnect', this.handleDisconnect);
this.socket.on('authenticate', this.handleAuthenticate);
this.socket.on('subscribe', this.handleSubscribe);
this.socket.on('unsubscribe', this.handleUnsubscribe);
this.socket.on('connect', this.handleConnect);
this.socket.on('disconnect', this.handleDisconnect);
}

emitError(error: { code: string; data?: any }) {
Expand Down Expand Up @@ -125,7 +124,7 @@ export default class AuthorizedSocketConnection<TContext, TCredentials> {
});

await this.config.credentialsManager.authenticate(authorization);
} catch (error) {
} catch (error: any) {
this.log('error', error.message, { error, clientId: this.clientId });
this.emitError({ code: 'invalid_authorization' });
}
Expand Down
84 changes: 84 additions & 0 deletions src/SocketIOSubscriptionServer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { promisify } from 'util';

import express from 'express';
import type io from 'socket.io';

import SubscriptionServer, {
SubscriptionServerConfig,
} from './SubscriptionServer';

export interface SocketIOSubscriptionServerConfig<TContext, TCredentials>
extends SubscriptionServerConfig<TContext, TCredentials> {
socketIoServer?: io.Server;
}

export default class SocketIOSubscriptionServer<
TContext,
TCredentials,
> extends SubscriptionServer<TContext, TCredentials> {
io: io.Server;

constructor({
socketIoServer,
...config
}: SocketIOSubscriptionServerConfig<TContext, TCredentials>) {
super(config);

this.io = socketIoServer!;
if (!this.io) {
// eslint-disable-next-line global-require, @typescript-eslint/no-var-requires
const IoServer = require('socket.io').Server;
this.io = new IoServer({
serveClient: false,
path: this.config.path,
transports: ['websocket'],
allowEIO3: true,
});
}

this.io.on('connection', (socket: io.Socket) => {
const clientId = socket.id;

const request = Object.create((express as any).request);
Object.assign(request, socket.request);

this.log('debug', 'new socket connection', {
clientId,
numClients: this.io.engine?.clientsCount ?? 0,
});

this.initConnection(
{
id: clientId,
protocol: '4c-subscription-server',
on: socket.on.bind(socket),
emit(event: string, data: any) {
socket.emit(event, data);
},
close() {
socket.disconnect();
},
},
request,
);

// add after so the logs happen in order
socket.once('disconnect', (reason) => {
this.log('debug', 'socket disconnected', {
reason,
clientId,
numClients: (this.io.engine.clientsCount ?? 0) - 1, // number hasn't decremented at this point for this client
});
});
});
}

attach(httpServer: any) {
this.io.attach(httpServer);
}

async close() {
// @ts-ignore
await promisify((...args) => this.io.close(...args))();
}
}
64 changes: 11 additions & 53 deletions src/SubscriptionServer.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
import { promisify } from 'util';

import express from 'express';
import { Request } from 'express';
import type { GraphQLSchema } from 'graphql';
import type { Server, Socket } from 'socket.io';

import AuthorizedSocketConnection from './AuthorizedSocketConnection';
import type { CreateValidationRules } from './AuthorizedSocketConnection';
import type { CredentialsManager } from './CredentialsManager';
import { CreateLogger, Logger, noopCreateLogger } from './Logger';
import type { Subscriber } from './Subscriber';
import { WebSocket } from './types';

export type SubscriptionServerConfig<TContext, TCredentials> = {
export interface SubscriptionServerConfig<TContext, TCredentials> {
path: string;
schema: GraphQLSchema;
subscriber: Subscriber<any>;
Expand All @@ -23,52 +21,24 @@ export type SubscriptionServerConfig<TContext, TCredentials> = {
maxSubscriptionsPerConnection?: number;
createValidationRules?: CreateValidationRules;
createLogger?: CreateLogger;
socketIoServer?: Server;
};
}

export default class SubscriptionServer<TContext, TCredentials> {
export default abstract class SubscriptionServer<TContext, TCredentials> {
config: SubscriptionServerConfig<TContext, TCredentials>;

log: Logger;

io: Server;

constructor(config: SubscriptionServerConfig<TContext, TCredentials>) {
this.config = config;

const createLogger = config.createLogger || noopCreateLogger;
this.log = createLogger('SubscriptionServer');

this.io = config.socketIoServer!;
if (!this.io) {
// eslint-disable-next-line global-require, @typescript-eslint/no-var-requires
const IoServer = require('socket.io').Server;
this.io = new IoServer({
serveClient: false,
path: this.config.path,
transports: ['websocket'],
allowEIO3: true,
});
}

this.io.on('connection', this.handleConnection);
}
const createLogger: CreateLogger = config.createLogger || noopCreateLogger;

attach(httpServer: any) {
this.io.attach(httpServer);
this.log = createLogger('SubscriptionServer');
}

handleConnection = (socket: Socket) => {
const clientId = socket.id;

this.log('debug', 'new socket connection', {
clientId,
numClients: this.io.engine?.clientsCount ?? 0,
});

const request = Object.create((express as any).request);
Object.assign(request, socket.request);
public abstract attach(httpServer: any): void;

protected initConnection(socket: WebSocket, request: Request) {
const { createContext } = this.config;

// eslint-disable-next-line no-new
Expand All @@ -85,19 +55,7 @@ export default class SubscriptionServer<TContext, TCredentials> {
createValidationRules: this.config.createValidationRules,
createLogger: this.config.createLogger || noopCreateLogger,
});

// add after so the logs happen in order
socket.once('disconnect', (reason) => {
this.log('debug', 'socket disconnected', {
reason,
clientId,
numClients: (this.io.engine.clientsCount ?? 0) - 1, // number hasn't decremented at this point for this client
});
});
};

async close() {
// @ts-ignore
await promisify((...args) => this.io.close(...args))();
}

abstract close(): void | Promise<void>;
}
Loading