'use strict'; const EventEmitter = require('node:events'); const process = require('node:process'); const { setImmediate } = require('node:timers'); const { Collection } = require('@discordjs/collection'); const { WebSocketManager: WSWebSocketManager, WebSocketShardEvents: WSWebSocketShardEvents, CompressionMethod, CloseCodes, } = require('@discordjs/ws'); const { GatewayCloseCodes, GatewayDispatchEvents } = require('discord-api-types/v10'); const WebSocketShard = require('./WebSocketShard'); const PacketHandlers = require('./handlers'); const { DiscordjsError, ErrorCodes } = require('../../errors'); const Events = require('../../util/Events'); const Status = require('../../util/Status'); const WebSocketShardEvents = require('../../util/WebSocketShardEvents'); let zlib; try { zlib = require('zlib-sync'); } catch {} // eslint-disable-line no-empty const BeforeReadyWhitelist = [ GatewayDispatchEvents.Ready, GatewayDispatchEvents.Resumed, GatewayDispatchEvents.GuildCreate, GatewayDispatchEvents.GuildDelete, GatewayDispatchEvents.GuildMembersChunk, GatewayDispatchEvents.GuildMemberAdd, GatewayDispatchEvents.GuildMemberRemove, ]; const WaitingForGuildEvents = [GatewayDispatchEvents.GuildCreate, GatewayDispatchEvents.GuildDelete]; const UNRESUMABLE_CLOSE_CODES = [ CloseCodes.Normal, GatewayCloseCodes.AlreadyAuthenticated, GatewayCloseCodes.InvalidSeq, ]; const reasonIsDeprecated = 'the reason property is deprecated, use the code property to determine the reason'; let deprecationEmittedForInvalidSessionEvent = false; let deprecationEmittedForDestroyedEvent = false; /** * The WebSocket manager for this client. * This class forwards raw dispatch events, * read more about it here {@link https://discord.com/developers/docs/topics/gateway} * @extends {EventEmitter} */ class WebSocketManager extends EventEmitter { constructor(client) { super(); /** * The client that instantiated this WebSocketManager * @type {Client} * @readonly * @name WebSocketManager#client */ Object.defineProperty(this, 'client', { value: client }); /** * The gateway this manager uses * @type {?string} */ this.gateway = null; /** * A collection of all shards this manager handles * @type {Collection} */ this.shards = new Collection(); /** * An array of queued events before this WebSocketManager became ready * @type {Object[]} * @private * @name WebSocketManager#packetQueue */ Object.defineProperty(this, 'packetQueue', { value: [] }); /** * The current status of this WebSocketManager * @type {Status} */ this.status = Status.Idle; /** * If this manager was destroyed. It will prevent shards from reconnecting * @type {boolean} * @private */ this.destroyed = false; /** * The internal WebSocketManager from `@discordjs/ws`. * @type {WSWebSocketManager} * @private */ this._ws = null; } /** * The average ping of all WebSocketShards * @type {number} * @readonly */ get ping() { const sum = this.shards.reduce((a, b) => a + b.ping, 0); return sum / this.shards.size; } /** * Emits a debug message. * @param {string} message The debug message * @param {?number} [shardId] The id of the shard that emitted this message, if any * @private */ debug(message, shardId) { this.client.emit( Events.Debug, `[WS => ${typeof shardId === 'number' ? `Shard ${shardId}` : 'Manager'}] ${message}`, ); } /** * Connects this manager to the gateway. * @private */ async connect() { const invalidToken = new DiscordjsError(ErrorCodes.TokenInvalid); const { shards, shardCount, intents, ws } = this.client.options; if (this._ws && this._ws.options.token !== this.client.token) { await this._ws.destroy({ code: CloseCodes.Normal, reason: 'Login with differing token requested' }); this._ws = null; } if (!this._ws) { const wsOptions = { intents: intents.bitfield, rest: this.client.rest, token: this.client.token, largeThreshold: ws.large_threshold, version: ws.version, shardIds: shards === 'auto' ? null : shards, shardCount: shards === 'auto' ? null : shardCount, initialPresence: ws.presence, retrieveSessionInfo: shardId => this.shards.get(shardId).sessionInfo, updateSessionInfo: (shardId, sessionInfo) => { this.shards.get(shardId).sessionInfo = sessionInfo; }, compression: zlib ? CompressionMethod.ZlibStream : null, }; if (ws.buildIdentifyThrottler) wsOptions.buildIdentifyThrottler = ws.buildIdentifyThrottler; if (ws.buildStrategy) wsOptions.buildStrategy = ws.buildStrategy; this._ws = new WSWebSocketManager(wsOptions); this.attachEvents(); } const { url: gatewayURL, shards: recommendedShards, session_start_limit: sessionStartLimit, } = await this._ws.fetchGatewayInformation().catch(error => { throw error.status === 401 ? invalidToken : error; }); const { total, remaining } = sessionStartLimit; this.debug(`Fetched Gateway Information URL: ${gatewayURL} Recommended Shards: ${recommendedShards}`); this.debug(`Session Limit Information Total: ${total} Remaining: ${remaining}`); this.gateway = `${gatewayURL}/`; this.client.options.shardCount = await this._ws.getShardCount(); this.client.options.shards = await this._ws.getShardIds(); this.totalShards = this.client.options.shards.length; for (const id of this.client.options.shards) { if (!this.shards.has(id)) { const shard = new WebSocketShard(this, id); this.shards.set(id, shard); shard.on(WebSocketShardEvents.AllReady, unavailableGuilds => { /** * Emitted when a shard turns ready. * @event Client#shardReady * @param {number} id The shard id that turned ready * @param {?Set} unavailableGuilds Set of unavailable guild ids, if any */ this.client.emit(Events.ShardReady, shard.id, unavailableGuilds); this.checkShardsReady(); }); shard.status = Status.Connecting; } } await this._ws.connect(); this.shards.forEach(shard => { if (shard.listenerCount(WebSocketShardEvents.InvalidSession) > 0 && !deprecationEmittedForInvalidSessionEvent) { process.emitWarning( 'The WebSocketShard#invalidSession event is deprecated and will never emit.', 'DeprecationWarning', ); deprecationEmittedForInvalidSessionEvent = true; } if (shard.listenerCount(WebSocketShardEvents.Destroyed) > 0 && !deprecationEmittedForDestroyedEvent) { process.emitWarning( 'The WebSocketShard#destroyed event is deprecated and will never emit.', 'DeprecationWarning', ); deprecationEmittedForDestroyedEvent = true; } }); } /** * Attaches event handlers to the internal WebSocketShardManager from `@discordjs/ws`. * @private */ attachEvents() { this._ws.on(WSWebSocketShardEvents.Debug, ({ message, shardId }) => this.debug(message, shardId)); this._ws.on(WSWebSocketShardEvents.Dispatch, ({ data, shardId }) => { this.client.emit(Events.Raw, data, shardId); this.emit(data.t, data.d, shardId); const shard = this.shards.get(shardId); this.handlePacket(data, shard); if (shard.status === Status.WaitingForGuilds && WaitingForGuildEvents.includes(data.t)) { shard.gotGuild(data.d.id); } }); this._ws.on(WSWebSocketShardEvents.Ready, ({ data, shardId }) => { this.shards.get(shardId).onReadyPacket(data); }); this._ws.on(WSWebSocketShardEvents.Closed, ({ code, shardId }) => { const shard = this.shards.get(shardId); shard.emit(WebSocketShardEvents.Close, { code, reason: reasonIsDeprecated, wasClean: true }); if (UNRESUMABLE_CLOSE_CODES.includes(code) && this.destroyed) { shard.status = Status.Disconnected; /** * Emitted when a shard's WebSocket disconnects and will no longer reconnect. * @event Client#shardDisconnect * @param {CloseEvent} event The WebSocket close event * @param {number} id The shard id that disconnected */ this.client.emit(Events.ShardDisconnect, { code, reason: reasonIsDeprecated, wasClean: true }, shardId); this.debug(GatewayCloseCodes[code], shardId); return; } this.shards.get(shardId).status = Status.Connecting; /** * Emitted when a shard is attempting to reconnect or re-identify. * @event Client#shardReconnecting * @param {number} id The shard id that is attempting to reconnect */ this.client.emit(Events.ShardReconnecting, shardId); }); this._ws.on(WSWebSocketShardEvents.Hello, ({ shardId }) => { const shard = this.shards.get(shardId); if (shard.sessionInfo) { shard.closeSequence = shard.sessionInfo.sequence; shard.status = Status.Resuming; } else { shard.status = Status.Identifying; } }); this._ws.on(WSWebSocketShardEvents.Resumed, ({ shardId }) => { const shard = this.shards.get(shardId); shard.status = Status.Ready; /** * Emitted when the shard resumes successfully * @event WebSocketShard#resumed */ shard.emit(WebSocketShardEvents.Resumed); }); this._ws.on(WSWebSocketShardEvents.HeartbeatComplete, ({ heartbeatAt, latency, shardId }) => { this.debug(`Heartbeat acknowledged, latency of ${latency}ms.`, shardId); const shard = this.shards.get(shardId); shard.lastPingTimestamp = heartbeatAt; shard.ping = latency; }); this._ws.on(WSWebSocketShardEvents.Error, ({ error, shardId }) => { /** * Emitted whenever a shard's WebSocket encounters a connection error. * @event Client#shardError * @param {Error} error The encountered error * @param {number} shardId The shard that encountered this error */ this.client.emit(Events.ShardError, error, shardId); }); } /** * Broadcasts a packet to every shard this manager handles. * @param {Object} packet The packet to send * @private */ broadcast(packet) { for (const shardId of this.shards.keys()) this._ws.send(shardId, packet); } /** * Destroys this manager and all its shards. * @private */ async destroy() { if (this.destroyed) return; // TODO: Make a util for getting a stack this.debug(`Manager was destroyed. Called by:\n${new Error().stack}`); this.destroyed = true; await this._ws?.destroy({ code: CloseCodes.Normal }); } /** * Processes a packet and queues it if this WebSocketManager is not ready. * @param {Object} [packet] The packet to be handled * @param {WebSocketShard} [shard] The shard that will handle this packet * @returns {boolean} * @private */ handlePacket(packet, shard) { if (packet && this.status !== Status.Ready) { if (!BeforeReadyWhitelist.includes(packet.t)) { this.packetQueue.push({ packet, shard }); return false; } } if (this.packetQueue.length) { const item = this.packetQueue.shift(); setImmediate(() => { this.handlePacket(item.packet, item.shard); }).unref(); } if (packet && PacketHandlers[packet.t]) { PacketHandlers[packet.t](this.client, packet, shard); } return true; } /** * Checks whether the client is ready to be marked as ready. * @private */ checkShardsReady() { if (this.status === Status.Ready) return; if (this.shards.size !== this.totalShards || this.shards.some(s => s.status !== Status.Ready)) { return; } this.triggerClientReady(); } /** * Causes the client to be marked as ready and emits the ready event. * @private */ triggerClientReady() { this.status = Status.Ready; this.client.readyTimestamp = Date.now(); /** * Emitted when the client becomes ready to start working. * @event Client#ready * @param {Client} client The client */ this.client.emit(Events.ClientReady, this.client); this.handlePacket(); } } module.exports = WebSocketManager;