diff --git a/lib/adapter.ts b/lib/adapter.ts new file mode 100644 index 0000000..90e2847 --- /dev/null +++ b/lib/adapter.ts @@ -0,0 +1,133 @@ +import { type Pool } from "pg"; +import { ClusterAdapterWithHeartbeat } from "socket.io-adapter"; +import type { + ClusterAdapterOptions, + ClusterMessage, + ClusterResponse, + Offset, + ServerId, +} from "socket.io-adapter"; +import debugModule from "debug"; +import { PubSubClient } from "./util"; + +const debug = debugModule("socket.io-postgres-adapter"); + +export interface PostgresAdapterOptions { + /** + * The prefix of the notification channel + * @default "socket.io" + */ + channelPrefix?: string; + /** + * The name of the table for payloads over the 8000 bytes limit or containing binary data + * @default "socket_io_attachments" + */ + tableName?: string; + /** + * The threshold for the payload size in bytes (see https://www.postgresql.org/docs/current/sql-notify.html) + * @default 8000 + */ + payloadThreshold?: number; + /** + * Number of ms between two cleanup queries + * @default 30000 + */ + cleanupInterval?: number; + /** + * Handler for errors. If undefined, the errors will be simply logged. + * + * @default undefined + */ + errorHandler?: (err: Error) => void; +} + +/** + * Returns a function that will create a PostgresAdapter instance. + * + * @param pool - a pg.Pool instance + * @param opts - additional options + * + * @public + */ +export function createAdapter( + pool: Pool, + opts: PostgresAdapterOptions & ClusterAdapterOptions = {} +) { + const options = Object.assign( + { + channelPrefix: "socket.io", + tableName: "socket_io_attachments", + payloadThreshold: 8_000, + cleanupInterval: 30_000, + errorHandler: (err: Error) => debug(err), + }, + opts + ); + + const namespaces = new Map(); + const client = new PubSubClient( + pool, + options, + (msg) => { + // @ts-expect-error uid is protected + return namespaces.get(msg.nsp)?.uid === msg.uid; + }, + (msg) => { + namespaces.get(msg.nsp)?.onMessage(msg); + } + ); + + return function (nsp: any) { + let adapter = new PostgresAdapter(nsp, opts, client); + + namespaces.set(nsp.name, adapter); + client.addNamespace(nsp.name); + + const defaultClose = adapter.close; + + adapter.close = () => { + namespaces.delete(nsp.name); + + if (namespaces.size === 0) { + client.close(); + } + + defaultClose.call(adapter); + }; + + return adapter; + }; +} + +export class PostgresAdapter extends ClusterAdapterWithHeartbeat { + /** + * Adapter constructor. + * + * @param nsp - the namespace + * @param opts - additional options + * @param client + * + * @public + */ + constructor( + nsp: any, + opts: ClusterAdapterOptions, + private readonly client: PubSubClient + ) { + super(nsp, opts); + } + + protected override doPublish(message: ClusterMessage): Promise { + return this.client.publish(message).then(() => { + // connection state recovery is not currently supported + return ""; + }); + } + + protected override doPublishResponse( + _requesterUid: ServerId, + response: ClusterResponse + ) { + return this.client.publish(response); + } +} diff --git a/lib/index.ts b/lib/index.ts index 5acc7e4..8f292f3 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -1,310 +1,6 @@ -import { encode, decode } from "@msgpack/msgpack"; -import { type Pool, type PoolClient, type Notification } from "pg"; -import { ClusterAdapterWithHeartbeat, MessageType } from "socket.io-adapter"; -import type { - ClusterAdapterOptions, - ClusterMessage, - ClusterResponse, - Offset, - ServerId, -} from "socket.io-adapter"; -import debugModule from "debug"; - -const debug = debugModule("socket.io-postgres-adapter"); - -const hasBinary = (obj: any, toJSON?: boolean): boolean => { - if (!obj || typeof obj !== "object") { - return false; - } - - if (obj instanceof ArrayBuffer || ArrayBuffer.isView(obj)) { - return true; - } - - if (Array.isArray(obj)) { - for (let i = 0, l = obj.length; i < l; i++) { - if (hasBinary(obj[i])) { - return true; - } - } - return false; - } - - for (const key in obj) { - if (Object.prototype.hasOwnProperty.call(obj, key) && hasBinary(obj[key])) { - return true; - } - } - - if (obj.toJSON && typeof obj.toJSON === "function" && !toJSON) { - return hasBinary(obj.toJSON(), true); - } - - return false; -}; - -export interface PostgresAdapterOptions { - /** - * The prefix of the notification channel - * @default "socket.io" - */ - channelPrefix: string; - /** - * The name of the table for payloads over the 8000 bytes limit or containing binary data - * @default "socket_io_attachments" - */ - tableName: string; - /** - * The threshold for the payload size in bytes (see https://www.postgresql.org/docs/current/sql-notify.html) - * @default 8000 - */ - payloadThreshold: number; - /** - * Number of ms between two cleanup queries - * @default 30000 - */ - cleanupInterval: number; - /** - * Handler for errors. If undefined, the errors will be simply logged. - * - * @default undefined - */ - errorHandler: (err: Error) => void; -} - -const defaultErrorHandler = (err: Error) => debug(err); - -/** - * Returns a function that will create a PostgresAdapter instance. - * - * @param pool - a pg.Pool instance - * @param opts - additional options - * - * @public - */ -export function createAdapter( - pool: Pool, - opts: Partial = {} -) { - const errorHandler = opts.errorHandler || defaultErrorHandler; - const tableName = opts.tableName || "socket_io_attachments"; - const cleanupInterval = opts.cleanupInterval || 30000; - - const channelToAdapters = new Map(); - let isConnectionInProgress = false; - let client: PoolClient | undefined; - let cleanupTimer: NodeJS.Timeout; - let reconnectTimer: NodeJS.Timeout; - - const scheduleReconnection = () => { - const reconnectionDelay = Math.floor(2000 * (0.5 + Math.random())); - reconnectTimer = setTimeout(initClient, reconnectionDelay); - }; - - const initClient = async () => { - try { - debug("fetching client from the pool"); - client = await pool.connect(); - isConnectionInProgress = false; - - for (const [channel] of channelToAdapters) { - debug("client listening to %s", channel); - await client.query(`LISTEN "${channel}"`); - } - - client.on("notification", async (msg: Notification) => { - try { - await channelToAdapters.get(msg.channel)?.onEvent(msg.payload); - } catch (err) { - errorHandler(err as Error); - } - }); - - client.on("error", () => { - debug("client error"); - }); - - client.on("end", () => { - debug("client was closed, scheduling reconnection..."); - scheduleReconnection(); - }); - } catch (err) { - errorHandler(err as Error); - debug("error while initializing client, scheduling reconnection..."); - scheduleReconnection(); - } - }; - - const scheduleCleanup = () => { - cleanupTimer = setTimeout(async () => { - try { - await pool.query( - `DELETE FROM ${tableName} WHERE created_at < now() - interval '${cleanupInterval} milliseconds'` - ); - } catch (err) { - errorHandler(err as Error); - } - scheduleCleanup(); - }, cleanupInterval); - }; - - return function (nsp: any) { - let adapter = new PostgresAdapter(nsp, pool, opts); - - channelToAdapters.set(adapter.channel, adapter); - - if (isConnectionInProgress) { - // nothing to do - } else if (client) { - debug("client listening to %s", adapter.channel); - client.query(`LISTEN "${adapter.channel}"`).catch(errorHandler); - } else { - isConnectionInProgress = true; - initClient(); - - scheduleCleanup(); - } - - const defaultClose = adapter.close; - - adapter.close = () => { - channelToAdapters.delete(adapter.channel); - - if (channelToAdapters.size === 0) { - if (client) { - client.removeAllListeners("end"); - client.release(); - client = undefined; - } - clearTimeout(reconnectTimer); - clearTimeout(cleanupTimer); - } - - defaultClose.call(adapter); - }; - - return adapter; - }; -} - -export class PostgresAdapter extends ClusterAdapterWithHeartbeat { - public readonly channel: string; - public readonly tableName: string; - public payloadThreshold: number; - public errorHandler: (err: Error) => void; - - private readonly pool: Pool; - /** - * Adapter constructor. - * - * @param nsp - the namespace - * @param pool - a pg.Pool instance - * @param opts - additional options - * - * @public - */ - constructor( - nsp: any, - pool: Pool, - opts: Partial = {} - ) { - super(nsp, opts); - this.pool = pool; - const channelPrefix = opts.channelPrefix || "socket.io"; - this.channel = `${channelPrefix}#${nsp.name}`; - this.tableName = opts.tableName || "socket_io_attachments"; - this.payloadThreshold = opts.payloadThreshold || 8000; - this.errorHandler = opts.errorHandler || defaultErrorHandler; - } - - public async onEvent(event: any) { - let document = JSON.parse(event); - - if (document.uid === this.uid) { - return debug("ignore message from self"); - } - - if (document.attachmentId) { - const result = await this.pool.query( - `SELECT payload FROM ${this.tableName} WHERE id = $1`, - [document.attachmentId] - ); - document = decode(result.rows[0].payload); - } - - this.onMessage(document as ClusterMessage); - } - - protected doPublish(message: ClusterMessage): Promise { - return this._publish(message).then(() => { - // connection state recovery is not currently supported - return ""; - }); - } - - protected doPublishResponse( - requesterUid: ServerId, - response: ClusterResponse - ) { - return this._publish(response); - } - - private async _publish(document: any) { - document.uid = this.uid; - try { - if ( - [ - MessageType.BROADCAST, - MessageType.BROADCAST_ACK, - MessageType.SERVER_SIDE_EMIT, - MessageType.SERVER_SIDE_EMIT_RESPONSE, - ].includes(document.type) && - hasBinary(document) - ) { - return await this.publishWithAttachment(document); - } - - const payload = JSON.stringify(document); - if (Buffer.byteLength(payload) > this.payloadThreshold) { - return await this.publishWithAttachment(document); - } - - debug( - "sending event of type %s to channel %s", - document.type, - this.channel - ); - await this.pool.query(`SELECT pg_notify($1, $2)`, [ - this.channel, - payload, - ]); - } catch (err) { - this.errorHandler(err as Error); - } - } - - private async publishWithAttachment(document: any) { - const payload = encode(document); - - debug( - "sending event of type %s with attachment to channel %s", - document.type, - this.channel - ); - const result = await this.pool.query( - `INSERT INTO ${this.tableName} (payload) VALUES ($1) RETURNING id;`, - [payload] - ); - const attachmentId = result.rows[0].id; - const headerPayload = JSON.stringify({ - uid: document.uid, - type: document.type, - attachmentId, - }); - - await this.pool.query(`SELECT pg_notify($1, $2)`, [ - this.channel, - headerPayload, - ]); - } -} +export { + createAdapter, + PostgresAdapter, + type PostgresAdapterOptions, +} from "./adapter"; +export { setupPrimary } from "./node-cluster"; diff --git a/lib/node-cluster.ts b/lib/node-cluster.ts new file mode 100644 index 0000000..181ab68 --- /dev/null +++ b/lib/node-cluster.ts @@ -0,0 +1,74 @@ +import cluster from "cluster"; +import type { Pool } from "pg"; +import { + ClusterAdapterOptions, + ClusterMessage, + MessageType, +} from "socket.io-adapter"; +import { PostgresAdapterOptions } from "./adapter"; +import debugModule from "debug"; +import { ExtendedClusterMessage, randomId, PubSubClient } from "./util"; + +const debug = debugModule("socket.io-postgres-adapter"); + +function ignoreError() {} + +export function setupPrimary( + pool: Pool, + opts: PostgresAdapterOptions & ClusterAdapterOptions = {} +) { + if (!cluster.isPrimary) { + throw "not primary"; + } + + const options = Object.assign( + { + channelPrefix: "socket.io", + tableName: "socket_io_attachments", + payloadThreshold: 8_000, + cleanupInterval: 30_000, + errorHandler: (err: Error) => debug(err), + }, + opts + ); + + const nodeId = randomId(); + + const client = new PubSubClient( + pool, + options, + (msg) => { + return msg.nodeId === nodeId; + }, + (msg) => { + debug("forwarding message %s to all workers", msg.type); + for (const workerId in cluster.workers) { + cluster.workers[workerId]?.send(msg, ignoreError); + } + } + ); + + cluster.on("message", async (worker, msg: ClusterMessage) => { + if (msg.type === MessageType.INITIAL_HEARTBEAT) { + client.addNamespace(msg.nsp); + } + + const emitterId = String(worker.id); + debug("[%s] forwarding message %s to the other workers", nodeId, msg.type); + for (const workerId in cluster.workers) { + if (workerId !== emitterId) { + cluster.workers[workerId]?.send(msg, ignoreError); + } + } + + debug("[%s] forwarding message %s to the other nodes", nodeId, msg.type); + (msg as ExtendedClusterMessage).nodeId = nodeId; + await client.publish(msg); + }); + + return { + close() { + client.close(); + }, + }; +} diff --git a/lib/util.ts b/lib/util.ts new file mode 100644 index 0000000..08f7fb0 --- /dev/null +++ b/lib/util.ts @@ -0,0 +1,220 @@ +import { randomBytes } from "node:crypto"; +import { type Notification, type Pool, type PoolClient } from "pg"; +import debugModule from "debug"; +import { type PostgresAdapterOptions } from "./adapter"; +import { + type ClusterMessage, + ClusterResponse, + MessageType, +} from "socket.io-adapter"; +import { decode, encode } from "@msgpack/msgpack"; + +const debug = debugModule("socket.io-postgres-adapter"); + +export function hasBinary(obj: any, toJSON?: boolean): boolean { + if (!obj || typeof obj !== "object") { + return false; + } + + if (obj instanceof ArrayBuffer || ArrayBuffer.isView(obj)) { + return true; + } + + if (Array.isArray(obj)) { + for (let i = 0, l = obj.length; i < l; i++) { + if (hasBinary(obj[i])) { + return true; + } + } + return false; + } + + for (const key in obj) { + if (Object.prototype.hasOwnProperty.call(obj, key) && hasBinary(obj[key])) { + return true; + } + } + + if (obj.toJSON && typeof obj.toJSON === "function" && !toJSON) { + return hasBinary(obj.toJSON(), true); + } + + return false; +} + +export function randomId() { + return randomBytes(8).toString("hex"); +} + +export type ExtendedClusterMessage = ClusterMessage & { + // the ID of the primary process, to skip messages from itself + nodeId: string; + // the ID of the attachment in the DB, in case the payload contains binary or is above the size limit + attachmentId?: string; +}; + +export class PubSubClient { + private channels = new Set(); + private client?: PoolClient; + private reconnectTimer?: NodeJS.Timeout; + private readonly cleanupTimer?: NodeJS.Timeout; + + constructor( + private readonly pool: Pool, + private readonly opts: Required, + private readonly isFromSelf: (msg: ExtendedClusterMessage) => boolean, + private readonly onMessage: (msg: ClusterMessage) => void + ) { + this.initClient().then(() => {}); // ignore error + + this.cleanupTimer = setInterval(async () => { + try { + debug("removing old events"); + await pool.query( + `DELETE FROM ${opts.tableName} WHERE created_at < now() - interval '${opts.cleanupInterval} milliseconds'` + ); + } catch (err) { + opts.errorHandler(err as Error); + } + }, opts.cleanupInterval); + } + + private scheduleReconnection() { + const reconnectionDelay = Math.floor(2000 * (0.5 + Math.random())); + debug("reconnection in %d ms", reconnectionDelay); + this.reconnectTimer = setTimeout( + () => this.initClient(), + reconnectionDelay + ); + } + + private async initClient() { + try { + debug("acquiring client from the pool"); + const client = await this.pool.connect(); + debug("client acquired"); + + client.on("notification", async (msg: Notification) => { + try { + let message = JSON.parse( + msg.payload as string + ) as ExtendedClusterMessage; + + if (this.isFromSelf(message)) { + return; + } + + if (message.attachmentId) { + const result = await this.pool.query( + `SELECT payload FROM ${this.opts.tableName} WHERE id = $1`, + [message.attachmentId] + ); + const fullMessage = decode( + result.rows[0].payload + ) as ExtendedClusterMessage; + this.onMessage(fullMessage); + } else { + this.onMessage(message); + } + } catch (e) { + this.opts.errorHandler(e as Error); + } + }); + + client.on("end", () => { + debug("client was closed, scheduling reconnection..."); + + this.client = undefined; + this.scheduleReconnection(); + }); + + for (const channel of this.channels) { + debug("client listening to %s", channel); + await client.query(`LISTEN "${channel}"`); + } + + this.client = client; + } catch (e) { + debug("error while initializing client, scheduling reconnection..."); + this.scheduleReconnection(); + } + } + + addNamespace(namespace: string) { + const channel = `${this.opts.channelPrefix}#${namespace}`; + + if (this.channels.has(channel)) { + return; + } + + this.channels.add(channel); + + if (this.client) { + debug("client listening to %s", channel); + this.client.query(`LISTEN "${channel}"`).catch(() => {}); + } + } + + async publish(message: ClusterMessage | ClusterResponse) { + try { + if ( + [ + MessageType.BROADCAST, + MessageType.BROADCAST_ACK, + MessageType.SERVER_SIDE_EMIT, + MessageType.SERVER_SIDE_EMIT_RESPONSE, + ].includes(message.type) && + hasBinary(message) + ) { + return this.publishWithAttachment(message); + } + + const payload = JSON.stringify(message); + if (Buffer.byteLength(payload) > this.opts.payloadThreshold) { + return this.publishWithAttachment(message); + } + + const channel = `${this.opts.channelPrefix}#${message.nsp}`; + + debug("sending event of type %s to channel %s", message.type, channel); + await this.pool.query("SELECT pg_notify($1, $2)", [channel, payload]); + } catch (err) { + this.opts.errorHandler(err as Error); + } + } + + private async publishWithAttachment( + message: ClusterMessage | ClusterResponse + ) { + const payload = encode(message); + const channel = `${this.opts.channelPrefix}#${message.nsp}`; + + debug( + "sending event of type %s with attachment to channel %s", + message.type, + channel + ); + const result = await this.pool.query( + `INSERT INTO ${this.opts.tableName} (payload) VALUES ($1) RETURNING id;`, + [payload] + ); + const attachmentId = result.rows[0].id; + const headerPayload = JSON.stringify({ + uid: message.uid, + type: message.type, + attachmentId, + }); + + await this.pool.query("SELECT pg_notify($1, $2)", [channel, headerPayload]); + } + + close() { + if (this.client) { + this.client.removeAllListeners("end"); + this.client.release(); + this.client = undefined; + } + clearTimeout(this.reconnectTimer); + clearInterval(this.cleanupTimer); + } +} diff --git a/package-lock.json b/package-lock.json index b396819..e64129c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -15,10 +15,11 @@ "pg": "^8.9.0" }, "devDependencies": { + "@socket.io/cluster-adapter": "~0.3.0", "@types/debug": "^4.1.12", "@types/expect.js": "^0.3.29", "@types/mocha": "^10.0.1", - "@types/node": "^14.14.7", + "@types/node": "~20.19.24", "expect.js": "0.3.1", "mocha": "^10.2.0", "prettier": "^2.8.3", @@ -565,6 +566,47 @@ "node": ">=14" } }, + "node_modules/@socket.io/cluster-adapter": { + "version": "0.3.0", + "resolved": "https://registry.npmjs.org/@socket.io/cluster-adapter/-/cluster-adapter-0.3.0.tgz", + "integrity": "sha512-pHtPlQrKCWb1FE49nSaQSqB0xWYZ/OU8ONxWLWU79u6OA92/IHeHolVTcJfrsiK2Zvrh1yvMo3tVFKKjlKkKRQ==", + "dev": true, + "license": "MIT", + "dependencies": { + "debug": "~4.4.1" + }, + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "socket.io-adapter": "~2.5.5" + } + }, + "node_modules/@socket.io/cluster-adapter/node_modules/debug": { + "version": "4.4.3", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.4.3.tgz", + "integrity": "sha512-RGwwWnwQvkVfavKVt22FGLw+xYSdzARwm0ru6DhTVA3umU5hZc28V3kO4stgYryrTlLpuvgI9GiijltAjNbcqA==", + "dev": true, + "license": "MIT", + "dependencies": { + "ms": "^2.1.3" + }, + "engines": { + "node": ">=6.0" + }, + "peerDependenciesMeta": { + "supports-color": { + "optional": true + } + } + }, + "node_modules/@socket.io/cluster-adapter/node_modules/ms": { + "version": "2.1.3", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", + "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==", + "dev": true, + "license": "MIT" + }, "node_modules/@socket.io/component-emitter": { "version": "3.1.0", "resolved": "https://npm.knotcity.io/@socket.io/component-emitter/-/component-emitter-3.1.0.tgz", @@ -610,9 +652,13 @@ "license": "MIT" }, "node_modules/@types/node": { - "version": "14.14.7", - "resolved": "https://registry.npmjs.org/@types/node/-/node-14.14.7.tgz", - "integrity": "sha512-Zw1vhUSQZYw+7u5dAwNbIA9TuTotpzY/OF7sJM9FqPOF3SPjKnxrjoTktXDZgUjybf4cWVBP7O8wvKdSaGHweg==" + "version": "20.19.24", + "resolved": "https://registry.npmjs.org/@types/node/-/node-20.19.24.tgz", + "integrity": "sha512-FE5u0ezmi6y9OZEzlJfg37mqqf6ZDSF2V/NLjUyGrR9uTZ7Sb9F7bLNZ03S4XVUNRWGA7Ck4c1kK+YnuWjl+DA==", + "license": "MIT", + "dependencies": { + "undici-types": "~6.21.0" + } }, "node_modules/@types/pg": { "version": "8.6.6", @@ -1598,6 +1644,7 @@ "version": "8.9.0", "resolved": "https://npm.knotcity.io/pg/-/pg-8.9.0.tgz", "integrity": "sha512-ZJM+qkEbtOHRuXjmvBtOgNOXOtLSbxiMiUVMgE4rV6Zwocy03RicCVvDXgx8l4Biwo8/qORUnEqn2fdQzV7KCg==", + "peer": true, "dependencies": { "buffer-writer": "2.0.0", "packet-reader": "1.0.0", @@ -1936,6 +1983,7 @@ "version": "2.5.5", "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.5.5.tgz", "integrity": "sha512-eLDQas5dzPgOWCk9GuuJC2lBqItuhKI4uxGgo9aIV7MYbk2h9Q6uULEh8WBzThoI7l+qU9Ast9fVUmkqPP9wYg==", + "peer": true, "dependencies": { "debug": "~4.3.4", "ws": "~8.17.1" @@ -2110,6 +2158,12 @@ "node": ">=4.2.0" } }, + "node_modules/undici-types": { + "version": "6.21.0", + "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.21.0.tgz", + "integrity": "sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==", + "license": "MIT" + }, "node_modules/util-deprecate": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", @@ -2560,6 +2614,32 @@ "dev": true, "optional": true }, + "@socket.io/cluster-adapter": { + "version": "0.3.0", + "resolved": "https://registry.npmjs.org/@socket.io/cluster-adapter/-/cluster-adapter-0.3.0.tgz", + "integrity": "sha512-pHtPlQrKCWb1FE49nSaQSqB0xWYZ/OU8ONxWLWU79u6OA92/IHeHolVTcJfrsiK2Zvrh1yvMo3tVFKKjlKkKRQ==", + "dev": true, + "requires": { + "debug": "~4.4.1" + }, + "dependencies": { + "debug": { + "version": "4.4.3", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.4.3.tgz", + "integrity": "sha512-RGwwWnwQvkVfavKVt22FGLw+xYSdzARwm0ru6DhTVA3umU5hZc28V3kO4stgYryrTlLpuvgI9GiijltAjNbcqA==", + "dev": true, + "requires": { + "ms": "^2.1.3" + } + }, + "ms": { + "version": "2.1.3", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", + "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==", + "dev": true + } + } + }, "@socket.io/component-emitter": { "version": "3.1.0", "resolved": "https://npm.knotcity.io/@socket.io/component-emitter/-/component-emitter-3.1.0.tgz", @@ -2603,9 +2683,12 @@ "dev": true }, "@types/node": { - "version": "14.14.7", - "resolved": "https://registry.npmjs.org/@types/node/-/node-14.14.7.tgz", - "integrity": "sha512-Zw1vhUSQZYw+7u5dAwNbIA9TuTotpzY/OF7sJM9FqPOF3SPjKnxrjoTktXDZgUjybf4cWVBP7O8wvKdSaGHweg==" + "version": "20.19.24", + "resolved": "https://registry.npmjs.org/@types/node/-/node-20.19.24.tgz", + "integrity": "sha512-FE5u0ezmi6y9OZEzlJfg37mqqf6ZDSF2V/NLjUyGrR9uTZ7Sb9F7bLNZ03S4XVUNRWGA7Ck4c1kK+YnuWjl+DA==", + "requires": { + "undici-types": "~6.21.0" + } }, "@types/pg": { "version": "8.6.6", @@ -3327,6 +3410,7 @@ "version": "8.9.0", "resolved": "https://npm.knotcity.io/pg/-/pg-8.9.0.tgz", "integrity": "sha512-ZJM+qkEbtOHRuXjmvBtOgNOXOtLSbxiMiUVMgE4rV6Zwocy03RicCVvDXgx8l4Biwo8/qORUnEqn2fdQzV7KCg==", + "peer": true, "requires": { "buffer-writer": "2.0.0", "packet-reader": "1.0.0", @@ -3550,6 +3634,7 @@ "version": "2.5.5", "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.5.5.tgz", "integrity": "sha512-eLDQas5dzPgOWCk9GuuJC2lBqItuhKI4uxGgo9aIV7MYbk2h9Q6uULEh8WBzThoI7l+qU9Ast9fVUmkqPP9wYg==", + "peer": true, "requires": { "debug": "~4.3.4", "ws": "~8.17.1" @@ -3674,6 +3759,11 @@ "integrity": "sha512-1FXk9E2Hm+QzZQ7z+McJiHL4NW1F2EzMu9Nq9i3zAaGqibafqYwCVU6WyWAuyQRRzOlxou8xZSyXLEN8oKj24g==", "dev": true }, + "undici-types": { + "version": "6.21.0", + "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.21.0.tgz", + "integrity": "sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==" + }, "util-deprecate": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", diff --git a/package.json b/package.json index f07e3c2..c27fd5a 100644 --- a/package.json +++ b/package.json @@ -14,7 +14,7 @@ "types": "./dist/index.d.ts", "scripts": { "compile": "rimraf ./dist && tsc", - "test": "npm run format:check && mocha --import=tsx test/index.ts", + "test": "npm run format:check && mocha --import=tsx test/**.ts", "format:check": "prettier --parser typescript --check lib/**/*.ts test/**/*.ts", "format:fix": "prettier --parser typescript --write lib/**/*.ts test/**/*.ts", "prepack": "npm run compile" @@ -29,10 +29,11 @@ "socket.io-adapter": "^2.5.4" }, "devDependencies": { + "@socket.io/cluster-adapter": "~0.3.0", "@types/debug": "^4.1.12", "@types/expect.js": "^0.3.29", "@types/mocha": "^10.0.1", - "@types/node": "^14.14.7", + "@types/node": "~20.19.24", "expect.js": "0.3.1", "mocha": "^10.2.0", "prettier": "^2.8.3", diff --git a/test/fixtures/primary.ts b/test/fixtures/primary.ts new file mode 100644 index 0000000..c8046b1 --- /dev/null +++ b/test/fixtures/primary.ts @@ -0,0 +1,65 @@ +import cluster from "node:cluster"; +import * as assert from "node:assert"; +import * as pg from "pg"; +import { setupPrimary } from "../../lib/"; +import { createAdapter } from "@socket.io/cluster-adapter"; +import { Server } from "socket.io"; +import { sleep } from "../util.ts"; + +const WORKERS_COUNT = 3; +const EXPECTED_PEERS_COUNT = 3 * WORKERS_COUNT - 1; +const GRACE_PERIOD_IN_MS = 200; + +if (cluster.isPrimary) { + let count = 0; + + const pgPool = new pg.Pool({ + user: "postgres", + password: "changeit", + }); + + setupPrimary(pgPool, {}); + + cluster.on("exit", (_worker, code) => { + assert.equal(code, 0); + + if (++count === WORKERS_COUNT) { + process.exit(0); + } + }); + + for (let i = 1; i <= WORKERS_COUNT; i++) { + cluster.fork(); + } +} else { + const io = new Server({ + adapter: createAdapter(), + }); + + io.on("test", (cb: (pid: number) => void) => { + cb(process.pid); + }); + + function isInitComplete() { + return io.of("/").adapter.nodesMap.size === EXPECTED_PEERS_COUNT; + } + + async function runTest() { + io.of("/").adapter.init(); + + while (!isInitComplete()) { + await sleep(20); + } + + io.serverSideEmit("test", (err, res) => { + assert.equal(err, null); + assert.equal(res.length, EXPECTED_PEERS_COUNT); + + setTimeout(() => { + process.exit(0); // exit after having responded to other nodes + }, GRACE_PERIOD_IN_MS); + }); + } + + runTest(); +} diff --git a/test/index.ts b/test/index.ts index 648b92c..1ac7e36 100644 --- a/test/index.ts +++ b/test/index.ts @@ -341,7 +341,7 @@ describe("@socket.io/postgres-adapter", () => { }); it("sends an event but timeout if one server does not respond", function (done) { - this.timeout(6000); + this.timeout(6000); // it's not currently possible to provide a timeout delay to the serverSideEmit() method servers[0].serverSideEmit("hello", (err: Error, response: any) => { expect(err.message).to.be("timeout reached: missing 1 responses"); diff --git a/test/node-cluster.ts b/test/node-cluster.ts new file mode 100644 index 0000000..0c8b191 --- /dev/null +++ b/test/node-cluster.ts @@ -0,0 +1,22 @@ +import { fork } from "node:child_process"; +import { join } from "node:path"; +import assert from "node:assert"; +import { times } from "./util.ts"; + +describe("@socket.io/postgres-adapter within Node.js cluster", () => { + it("should work", function (done) { + this.timeout(5000); + const PRIMARY_COUNT = 3; + const partialDone = times(PRIMARY_COUNT, done); + + for (let i = 1; i <= PRIMARY_COUNT; i++) { + const worker = fork(join(__dirname, "fixtures/primary.ts")); + + worker.on("exit", (code) => { + assert.equal(code, 0); + + partialDone(); + }); + } + }); +});