> [Globals](../globals) / PgPubSub # Class: PgPubSub Implements LISTEN/NOTIFY client for PostgreSQL connections. It is a basic public interface of this library, so the end-user is going to work with this class directly to solve his/her tasks. Importing: ~~~typescript import { AnyJson, PgPubSub } from '@imqueue/pg-pubsub'; ~~~ Instantiation: ~~~typescript const pubSub = new PgPubSub(options) ~~~ **`see`** PgPubSubOptions Connecting and listening: ~~~typescript pubSub.on('connect', async () => { await pubSub.listen('ChannelOne'); await pubSub.listen('ChannelTwo'); }); // or, even better: pubSub.on('connect', async () => { await Promise.all( ['ChannelOne', 'ChannelTwo'].map(channel => channel.listen()), ); }); // or. less reliable: await pubSub.connect(); await Promise.all( ['ChannelOne', 'ChannelTwo'].map(channel => channel.listen()), ); ~~~ Handle messages: ~~~typescript pubSub.on('message', (channel: string, payload: AnyJson) => console.log(channel, payload); ); // or, using channels pubSub.channels.on('ChannelOne', (payload: AnyJson) => console.log(1, payload), ); pubSub.channels.on('ChannelTwo', (payload: AnyJson) => console.log(2, payload), ); ~~~ Destroying: ~~~typescript await pubSub.destroy(); ~~~ Closing and re-using connection: ~~~typescript await pubSub.close(); await pubSub.connect(); ~~~ This close/connect technique may be used when doing some heavy message handling, so while you close, another running copy may handle next messages... ## Hierarchy * EventEmitter ↳ **PgPubSub** ## Index ### Constructors * [constructor](PgPubSub#constructor) ### Events * [close](PgPubSub#close) * [connect](PgPubSub#connect) * [end](PgPubSub#end) * [error](PgPubSub#error) * [listen](PgPubSub#listen) * [message](PgPubSub#message) * [notify](PgPubSub#notify) * [reconnect](PgPubSub#reconnect) * [unlisten](PgPubSub#unlisten) ### Properties * [channels](PgPubSub#channels) * [logger](PgPubSub#logger) * [options](PgPubSub#options) * [pgClient](PgPubSub#pgclient) * [defaultMaxListeners](PgPubSub#defaultmaxlisteners) * [errorMonitor](PgPubSub#errormonitor) ### Methods * [activeChannels](PgPubSub#activechannels) * [addListener](PgPubSub#addlistener) * [allChannels](PgPubSub#allchannels) * [close](PgPubSub#close) * [connect](PgPubSub#connect) * [destroy](PgPubSub#destroy) * [emit](PgPubSub#emit) * [eventNames](PgPubSub#eventnames) * [getMaxListeners](PgPubSub#getmaxlisteners) * [inactiveChannels](PgPubSub#inactivechannels) * [isActive](PgPubSub#isactive) * [listen](PgPubSub#listen) * [listenerCount](PgPubSub#listenercount) * [listeners](PgPubSub#listeners) * [notify](PgPubSub#notify) * [off](PgPubSub#off) * [on](PgPubSub#on) * [once](PgPubSub#once) * [prependListener](PgPubSub#prependlistener) * [prependOnceListener](PgPubSub#prependoncelistener) * [rawListeners](PgPubSub#rawlisteners) * [removeAllListeners](PgPubSub#removealllisteners) * [removeListener](PgPubSub#removelistener) * [setMaxListeners](PgPubSub#setmaxlisteners) * [unlisten](PgPubSub#unlisten) * [unlistenAll](PgPubSub#unlistenall) * [listenerCount](PgPubSub#listenercount) ## Constructors ### constructor \+ **new PgPubSub**(`options`: Partial\<[PgPubSubOptions](../PgPubSubOptions)>, `logger?`: [AnyLogger](../AnyLogger.md)): [PgPubSub](PgPubSub.md) *Overrides [PgChannelEmitter](PgChannelEmitter).[constructor](PgChannelEmitter.md#constructor)* #### Parameters: Name | Type | Default value | Description | ------ | ------ | ------ | ------ | `options` | Partial\<[PgPubSubOptions](../PgPubSubOptions)> | - | options | `logger` | [AnyLogger](../AnyLogger) | console | logger | **Returns:** [PgPubSub](PgPubSub) ## Events ### close • **close**(): void `'close'` event, occurs each time connection closed. Differs from `'end'` event, because `'end'` event may occur many times during re-connectable connection process, but `'close'` event states that connection was safely programmatically closed and further re-connections won't happen. **`asmemberof`** PgPubSub **Returns:** void ___ ### connect • **connect**(): void `'connect'` event, occurs each time database connection is established. **`asmemberof`** PgPubSub **Returns:** void ___ ### end • **end**(): void `'end'` event, occurs whenever pg connection ends, so, literally it's simply proxy to `'end'` event from `pg.Client` **`asmemberof`** PgPubSub **Returns:** void ___ ### error • **error**(`err`: Error): void `'error'` event occurs each time connection error is happened **`asmemberof`** PgPubSub #### Parameters: Name | Type | Description | ------ | ------ | ------ | `err` | Error | error occurred during connection | **Returns:** void ___ ### listen • **listen**(`channels`: string[]): void `'listen'` event occurs each time channel starts being listening **`asmemberof`** PgPubSub #### Parameters: Name | Type | Description | ------ | ------ | ------ | `channels` | string[] | list of channels being started listening | **Returns:** void ___ ### message • **message**(`chan`: string, `payload`: [AnyJson](../globals#anyjson)): void `'message'` event occurs each time database connection gets notification to any listening channel. Fired before channel event emitted. **`asmemberof`** PgPubSub #### Parameters: Name | Type | Description | ------ | ------ | ------ | `chan` | string | channel to which notification corresponding to | `payload` | [AnyJson](../globals#anyjson) | notification message payload | **Returns:** void ___ ### notify • **notify**(`chan`: string, `payload`: [AnyJson](../globals#anyjson)): void `'notify'` event occurs each time new message has been published to a particular channel. Occurs right after database NOTIFY command succeeded. **`asmemberof`** PgPubSub #### Parameters: Name | Type | Description | ------ | ------ | ------ | `chan` | string | channel to which notification was sent | `payload` | [AnyJson](../globals#anyjson) | notification message payload | **Returns:** void ___ ### reconnect • **reconnect**(`retries`: number): void `'reconnect'` event occurs each time, when the connection is successfully established after connection retry. It is followed by a corresponding `'connect'` event, but after all possible channel locks finished their attempts to be re-acquired. **`asmemberof`** PgPubSub #### Parameters: Name | Type | Description | ------ | ------ | ------ | `retries` | number | number of retries made before re-connect succeeded | **Returns:** void ___ ### unlisten • **unlisten**(`channels`: string[]): void `'unlisten'` event occurs each time channel ends being listening **`asmemberof`** PgPubSub #### Parameters: Name | Type | Description | ------ | ------ | ------ | `channels` | string[] | list of channels being stopped listening | **Returns:** void ## Properties ### channels • `Readonly` **channels**: [PgChannelEmitter](PgChannelEmitter) = new PgChannelEmitter() ___ ### logger • `Readonly` **logger**: [AnyLogger](../AnyLogger) logger ___ ### options • `Readonly` **options**: [PgPubSubOptions](../PgPubSubOptions) ___ ### pgClient • `Readonly` **pgClient**: [PgClient](../PgClient) ___ ### defaultMaxListeners ▪ `Static` **defaultMaxListeners**: number *Inherited from [PgChannelEmitter](PgChannelEmitter).[defaultMaxListeners](PgChannelEmitter.md#defaultmaxlisteners)* ___ ### errorMonitor ▪ `Static` `Readonly` **errorMonitor**: unique symbol *Inherited from [PgChannelEmitter](PgChannelEmitter).[errorMonitor](PgChannelEmitter.md#errormonitor)* This symbol shall be used to install a listener for only monitoring `'error'` events. Listeners installed using this symbol are called before the regular `'error'` listeners are called. Installing a listener using this symbol does not change the behavior once an `'error'` event is emitted, therefore the process will still crash if no regular `'error'` listener is installed. ## Methods ### activeChannels ▸ **activeChannels**(): string[] Returns list of all active subscribed channels **Returns:** string[] ___ ### addListener ▸ **addListener**(`event`: string \| symbol, `listener`: (...args: any[]) => void): this *Inherited from [PgClient](../PgClient).[addListener](../interfaces/PgClient.md#addlistener)* #### Parameters: Name | Type | ------ | ------ | `event` | string \| symbol | `listener` | (...args: any[]) => void | **Returns:** this ___ ### allChannels ▸ **allChannels**(): string[] Returns list of all known channels, despite the fact they are listening (active) or not (inactive). **Returns:** string[] ___ ### close ▸ **close**(): Promise\ Safely closes this database connection **Returns:** Promise\ ___ ### connect ▸ **connect**(): Promise\ Establishes re-connectable database connection **Returns:** Promise\ ___ ### destroy ▸ **destroy**(): Promise\ Destroys this object properly, destroying all locks, closing all connections and removing all event listeners to avoid memory leaking. So whenever you need to destroy an object programmatically - use this method. Note, that after destroy it is broken and should be removed from memory. **Returns:** Promise\ ___ ### emit ▸ **emit**(`event`: string \| symbol, ...`args`: any[]): boolean *Inherited from [PgClient](../PgClient).[emit](../interfaces/PgClient.md#emit)* #### Parameters: Name | Type | ------ | ------ | `event` | string \| symbol | `...args` | any[] | **Returns:** boolean ___ ### eventNames ▸ **eventNames**(): Array\ *Inherited from [PgClient](../PgClient).[eventNames](../interfaces/PgClient.md#eventnames)* **Returns:** Array\ ___ ### getMaxListeners ▸ **getMaxListeners**(): number *Inherited from [PgClient](../PgClient).[getMaxListeners](../interfaces/PgClient.md#getmaxlisteners)* **Returns:** number ___ ### inactiveChannels ▸ **inactiveChannels**(): string[] Returns list of all inactive channels (those which are known, but not actively listening at a time) **Returns:** string[] ___ ### isActive ▸ **isActive**(`channel?`: undefined \| string): boolean If channel argument passed will return true if channel is in active state (listening by this pub/sub), false - otherwise. If channel is not specified - will return true if there is at least one active channel listened by this pub/sub, false - otherwise. #### Parameters: Name | Type | ------ | ------ | `channel?` | undefined \| string | **Returns:** boolean ___ ### listen ▸ **listen**(`channel`: string): Promise\ Starts listening given channel. If singleListener option is set to true, it guarantees that only one process would be able to listen this channel at a time. #### Parameters: Name | Type | Description | ------ | ------ | ------ | `channel` | string | channel name to listen | **Returns:** Promise\ ___ ### listenerCount ▸ **listenerCount**(`event`: string \| symbol): number *Inherited from [PgClient](../PgClient).[listenerCount](../interfaces/PgClient.md#listenercount)* #### Parameters: Name | Type | ------ | ------ | `event` | string \| symbol | **Returns:** number ___ ### listeners ▸ **listeners**(`event`: string \| symbol): Function[] *Inherited from [PgClient](../PgClient).[listeners](../interfaces/PgClient.md#listeners)* #### Parameters: Name | Type | ------ | ------ | `event` | string \| symbol | **Returns:** Function[] ___ ### notify ▸ **notify**(`channel`: string, `payload`: [AnyJson](../globals#anyjson)): Promise\ Performs NOTIFY to a given channel with a given payload to all listening subscribers #### Parameters: Name | Type | Description | ------ | ------ | ------ | `channel` | string | channel to publish to | `payload` | [AnyJson](../globals#anyjson) | payload to publish for subscribers | **Returns:** Promise\ ___ ### off ▸ **off**(`event`: string \| symbol, `listener`: (...args: any[]) => void): this *Inherited from [PgClient](../PgClient).[off](../interfaces/PgClient.md#off)* #### Parameters: Name | Type | ------ | ------ | `event` | string \| symbol | `listener` | (...args: any[]) => void | **Returns:** this ___ ### on ▸ **on**(`event`: \"end\", `listener`: *typeof* end): this *Overrides void* Sets `'end'` event handler #### Parameters: Name | Type | ------ | ------ | `event` | \"end\" | `listener` | *typeof* end | **Returns:** this ▸ **on**(`event`: \"connect\", `listener`: *typeof* connect): this *Overrides void* Sets `'connect'` event handler #### Parameters: Name | Type | ------ | ------ | `event` | \"connect\" | `listener` | *typeof* connect | **Returns:** this ▸ **on**(`event`: \"close\", `listener`: *typeof* close): this *Overrides void* Sets `'close'` event handler #### Parameters: Name | Type | ------ | ------ | `event` | \"close\" | `listener` | *typeof* close | **Returns:** this ▸ **on**(`event`: \"listen\", `listener`: *typeof* listen): this *Overrides void* Sets `'listen'` event handler #### Parameters: Name | Type | ------ | ------ | `event` | \"listen\" | `listener` | *typeof* listen | **Returns:** this ▸ **on**(`event`: \"unlisten\", `listener`: *typeof* unlisten): this *Overrides void* Sets `'unlisten'` event handler #### Parameters: Name | Type | ------ | ------ | `event` | \"unlisten\" | `listener` | *typeof* unlisten | **Returns:** this ▸ **on**(`event`: \"error\", `listener`: *typeof* error): this *Overrides void* Sets `'error'` event handler #### Parameters: Name | Type | ------ | ------ | `event` | \"error\" | `listener` | *typeof* error | **Returns:** this ▸ **on**(`event`: \"reconnect\", `listener`: *typeof* reconnect): this *Overrides void* Sets `'reconnect'` event handler #### Parameters: Name | Type | ------ | ------ | `event` | \"reconnect\" | `listener` | *typeof* reconnect | **Returns:** this ▸ **on**(`event`: \"message\", `listener`: *typeof* message): this *Overrides void* Sets `'message'` event handler #### Parameters: Name | Type | ------ | ------ | `event` | \"message\" | `listener` | *typeof* message | **Returns:** this ▸ **on**(`event`: \"notify\", `listener`: *typeof* notify): this *Overrides void* Sets `'notify'` event handler #### Parameters: Name | Type | ------ | ------ | `event` | \"notify\" | `listener` | *typeof* notify | **Returns:** this ▸ **on**(`event`: string \| symbol, `listener`: (...args: any[]) => void): this *Overrides void* Sets any unknown or user-defined event handler #### Parameters: Name | Type | Description | ------ | ------ | ------ | `event` | string \| symbol | event name | `listener` | (...args: any[]) => void | event handler | **Returns:** this ___ ### once ▸ **once**(`event`: \"end\", `listener`: *typeof* end): this *Overrides [PgClient](../PgClient).[once](../interfaces/PgClient.md#once)* Sets `'end'` event handler, which fired only one single time #### Parameters: Name | Type | ------ | ------ | `event` | \"end\" | `listener` | *typeof* end | **Returns:** this ▸ **once**(`event`: \"connect\", `listener`: *typeof* connect): this *Overrides [PgClient](../PgClient).[once](../interfaces/PgClient.md#once)* Sets `'connect'` event handler, which fired only one single time #### Parameters: Name | Type | ------ | ------ | `event` | \"connect\" | `listener` | *typeof* connect | **Returns:** this ▸ **once**(`event`: \"close\", `listener`: *typeof* close): this *Overrides [PgClient](../PgClient).[once](../interfaces/PgClient.md#once)* Sets `'close'` event handler, which fired only one single time #### Parameters: Name | Type | ------ | ------ | `event` | \"close\" | `listener` | *typeof* close | **Returns:** this ▸ **once**(`event`: \"listen\", `listener`: *typeof* listen): this *Overrides [PgClient](../PgClient).[once](../interfaces/PgClient.md#once)* Sets `'listen'` event handler, which fired only one single time #### Parameters: Name | Type | ------ | ------ | `event` | \"listen\" | `listener` | *typeof* listen | **Returns:** this ▸ **once**(`event`: \"unlisten\", `listener`: *typeof* unlisten): this *Overrides [PgClient](../PgClient).[once](../interfaces/PgClient.md#once)* Sets `'unlisten'` event handler, which fired only one single time #### Parameters: Name | Type | ------ | ------ | `event` | \"unlisten\" | `listener` | *typeof* unlisten | **Returns:** this ▸ **once**(`event`: \"error\", `listener`: *typeof* error): this *Overrides [PgClient](../PgClient).[once](../interfaces/PgClient.md#once)* Sets `'error'` event handler, which fired only one single time #### Parameters: Name | Type | ------ | ------ | `event` | \"error\" | `listener` | *typeof* error | **Returns:** this ▸ **once**(`event`: \"reconnect\", `listener`: *typeof* reconnect): this *Overrides [PgClient](../PgClient).[once](../interfaces/PgClient.md#once)* Sets `'reconnect'` event handler, which fired only one single time #### Parameters: Name | Type | ------ | ------ | `event` | \"reconnect\" | `listener` | *typeof* reconnect | **Returns:** this ▸ **once**(`event`: \"message\", `listener`: *typeof* message): this *Overrides [PgClient](../PgClient).[once](../interfaces/PgClient.md#once)* Sets `'message'` event handler, which fired only one single time #### Parameters: Name | Type | ------ | ------ | `event` | \"message\" | `listener` | *typeof* message | **Returns:** this ▸ **once**(`event`: \"notify\", `listener`: *typeof* notify): this *Overrides [PgClient](../PgClient).[once](../interfaces/PgClient.md#once)* Sets `'notify'` event handler, which fired only one single time #### Parameters: Name | Type | ------ | ------ | `event` | \"notify\" | `listener` | *typeof* notify | **Returns:** this ▸ **once**(`event`: string \| symbol, `listener`: (...args: any[]) => void): this *Overrides [PgClient](../PgClient).[once](../interfaces/PgClient.md#once)* Sets any unknown or user-defined event handler, which would fire only one single time #### Parameters: Name | Type | Description | ------ | ------ | ------ | `event` | string \| symbol | event name | `listener` | (...args: any[]) => void | event handler | **Returns:** this ___ ### prependListener ▸ **prependListener**(`event`: string \| symbol, `listener`: (...args: any[]) => void): this *Inherited from [PgClient](../PgClient).[prependListener](../interfaces/PgClient.md#prependlistener)* #### Parameters: Name | Type | ------ | ------ | `event` | string \| symbol | `listener` | (...args: any[]) => void | **Returns:** this ___ ### prependOnceListener ▸ **prependOnceListener**(`event`: string \| symbol, `listener`: (...args: any[]) => void): this *Inherited from [PgClient](../PgClient).[prependOnceListener](../interfaces/PgClient.md#prependoncelistener)* #### Parameters: Name | Type | ------ | ------ | `event` | string \| symbol | `listener` | (...args: any[]) => void | **Returns:** this ___ ### rawListeners ▸ **rawListeners**(`event`: string \| symbol): Function[] *Inherited from [PgClient](../PgClient).[rawListeners](../interfaces/PgClient.md#rawlisteners)* #### Parameters: Name | Type | ------ | ------ | `event` | string \| symbol | **Returns:** Function[] ___ ### removeAllListeners ▸ **removeAllListeners**(`event?`: string \| symbol): this *Inherited from [PgClient](../PgClient).[removeAllListeners](../interfaces/PgClient.md#removealllisteners)* #### Parameters: Name | Type | ------ | ------ | `event?` | string \| symbol | **Returns:** this ___ ### removeListener ▸ **removeListener**(`event`: string \| symbol, `listener`: (...args: any[]) => void): this *Inherited from [PgClient](../PgClient).[removeListener](../interfaces/PgClient.md#removelistener)* #### Parameters: Name | Type | ------ | ------ | `event` | string \| symbol | `listener` | (...args: any[]) => void | **Returns:** this ___ ### setMaxListeners ▸ **setMaxListeners**(`n`: number): this *Inherited from [PgClient](../PgClient).[setMaxListeners](../interfaces/PgClient.md#setmaxlisteners)* #### Parameters: Name | Type | ------ | ------ | `n` | number | **Returns:** this ___ ### unlisten ▸ **unlisten**(`channel`: string): Promise\ Stops listening of the given channel, and, if singleListener option is set to true - will release an acquired lock (if it was settled). #### Parameters: Name | Type | Description | ------ | ------ | ------ | `channel` | string | channel name to unlisten | **Returns:** Promise\ ___ ### unlistenAll ▸ **unlistenAll**(): Promise\ Stops listening all connected channels, and, if singleListener option is set to true - will release all acquired locks (if any was settled). **Returns:** Promise\ ___ ### listenerCount ▸ `Static`**listenerCount**(`emitter`: EventEmitter, `event`: string \| symbol): number *Inherited from [PgChannelEmitter](PgChannelEmitter).[listenerCount](PgChannelEmitter.md#listenercount)* **`deprecated`** since v4.0.0 #### Parameters: Name | Type | ------ | ------ | `emitter` | EventEmitter | `event` | string \| symbol | **Returns:** number