Skip to content

PgPubSub

Mykhailo Stadnyk edited this page Nov 30, 2020 · 6 revisions

Globals / PgPubSub

Class: PgPubSub

Implements LISTEN/NOTIFY client for PostgreSQL connections.

It is a basic public interface of this library, so the end-user is going to work with this class directly to solve his/her tasks.

Importing:

import { AnyJson, PgPubSub } from '@imqueue/pg-pubsub';

Instantiation:

const pubSub = new PgPubSub(options)

see PgPubSubOptions

Connecting and listening:

pubSub.on('connect', async () => {
    await pubSub.listen('ChannelOne');
    await pubSub.listen('ChannelTwo');
});
// or, even better:
pubSub.on('connect', async () => {
    await Promise.all(
        ['ChannelOne', 'ChannelTwo'].map(channel => channel.listen()),
    );
});
// or. less reliable:
await pubSub.connect();
await Promise.all(
    ['ChannelOne', 'ChannelTwo'].map(channel => channel.listen()),
);

Handle messages:

pubSub.on('message', (channel: string, payload: AnyJson) =>
    console.log(channel, payload);
);
// or, using channels
pubSub.channels.on('ChannelOne', (payload: AnyJson) =>
    console.log(1, payload),
);
pubSub.channels.on('ChannelTwo', (payload: AnyJson) =>
    console.log(2, payload),
);

Destroying:

await pubSub.destroy();

Closing and re-using connection:

await pubSub.close();
await pubSub.connect();

This close/connect technique may be used when doing some heavy message handling, so while you close, another running copy may handle next messages...

Hierarchy

  • EventEmitter

    PgPubSub

Index

Constructors

Events

Properties

Methods

Constructors

constructor

+ new PgPubSub(options: Partial<PgPubSubOptions>, logger?: AnyLogger): PgPubSub

Overrides PgChannelEmitter.constructor

Parameters:

Name Type Default value Description
options Partial<PgPubSubOptions> - options
logger AnyLogger console logger

Returns: PgPubSub

Events

close

close(): void

'close' event, occurs each time connection closed. Differs from 'end' event, because 'end' event may occur many times during re-connectable connection process, but 'close' event states that connection was safely programmatically closed and further re-connections won't happen.

asmemberof PgPubSub

Returns: void


connect

connect(): void

'connect' event, occurs each time database connection is established.

asmemberof PgPubSub

Returns: void


end

end(): void

'end' event, occurs whenever pg connection ends, so, literally it's simply proxy to 'end' event from pg.Client

asmemberof PgPubSub

Returns: void


error

error(err: Error): void

'error' event occurs each time connection error is happened

asmemberof PgPubSub

Parameters:

Name Type Description
err Error error occurred during connection

Returns: void


listen

listen(channels: string[]): void

'listen' event occurs each time channel starts being listening

asmemberof PgPubSub

Parameters:

Name Type Description
channels string[] list of channels being started listening

Returns: void


message

message(chan: string, payload: AnyJson): void

'message' event occurs each time database connection gets notification to any listening channel. Fired before channel event emitted.

asmemberof PgPubSub

Parameters:

Name Type Description
chan string channel to which notification corresponding to
payload AnyJson notification message payload

Returns: void


notify

notify(chan: string, payload: AnyJson): void

'notify' event occurs each time new message has been published to a particular channel. Occurs right after database NOTIFY command succeeded.

asmemberof PgPubSub

Parameters:

Name Type Description
chan string channel to which notification was sent
payload AnyJson notification message payload

Returns: void


reconnect

reconnect(retries: number): void

'reconnect' event occurs each time, when the connection is successfully established after connection retry. It is followed by a corresponding 'connect' event, but after all possible channel locks finished their attempts to be re-acquired.

asmemberof PgPubSub

Parameters:

Name Type Description
retries number number of retries made before re-connect succeeded

Returns: void


unlisten

unlisten(channels: string[]): void

'unlisten' event occurs each time channel ends being listening

asmemberof PgPubSub

Parameters:

Name Type Description
channels string[] list of channels being stopped listening

Returns: void

Properties

channels

Readonly channels: PgChannelEmitter = new PgChannelEmitter()


logger

Readonly logger: AnyLogger

logger


options

Readonly options: PgPubSubOptions


pgClient

Readonly pgClient: PgClient


defaultMaxListeners

Static defaultMaxListeners: number

Inherited from PgChannelEmitter.defaultMaxListeners


errorMonitor

Static Readonly errorMonitor: unique symbol

Inherited from PgChannelEmitter.errorMonitor

This symbol shall be used to install a listener for only monitoring 'error' events. Listeners installed using this symbol are called before the regular 'error' listeners are called.

Installing a listener using this symbol does not change the behavior once an 'error' event is emitted, therefore the process will still crash if no regular 'error' listener is installed.

Methods

activeChannels

activeChannels(): string[]

Returns list of all active subscribed channels

Returns: string[]


addListener

addListener(event: string | symbol, listener: (...args: any[]) => void): this

Inherited from PgClient.addListener

Parameters:

Name Type
event string | symbol
listener (...args: any[]) => void

Returns: this


allChannels

allChannels(): string[]

Returns list of all known channels, despite the fact they are listening (active) or not (inactive).

Returns: string[]


close

close(): Promise<void>

Safely closes this database connection

Returns: Promise<void>


connect

connect(): Promise<void>

Establishes re-connectable database connection

Returns: Promise<void>


destroy

destroy(): Promise<void>

Destroys this object properly, destroying all locks, closing all connections and removing all event listeners to avoid memory leaking. So whenever you need to destroy an object programmatically - use this method. Note, that after destroy it is broken and should be removed from memory.

Returns: Promise<void>


emit

emit(event: string | symbol, ...args: any[]): boolean

Inherited from PgClient.emit

Parameters:

Name Type
event string | symbol
...args any[]

Returns: boolean


eventNames

eventNames(): Array<string | symbol>

Inherited from PgClient.eventNames

Returns: Array<string | symbol>


getMaxListeners

getMaxListeners(): number

Inherited from PgClient.getMaxListeners

Returns: number


inactiveChannels

inactiveChannels(): string[]

Returns list of all inactive channels (those which are known, but not actively listening at a time)

Returns: string[]


isActive

isActive(channel?: undefined | string): boolean

If channel argument passed will return true if channel is in active state (listening by this pub/sub), false - otherwise. If channel is not specified - will return true if there is at least one active channel listened by this pub/sub, false - otherwise.

Parameters:

Name Type
channel? undefined | string

Returns: boolean


listen

listen(channel: string): Promise<void>

Starts listening given channel. If singleListener option is set to true, it guarantees that only one process would be able to listen this channel at a time.

Parameters:

Name Type Description
channel string channel name to listen

Returns: Promise<void>


listenerCount

listenerCount(event: string | symbol): number

Inherited from PgClient.listenerCount

Parameters:

Name Type
event string | symbol

Returns: number


listeners

listeners(event: string | symbol): Function[]

Inherited from PgClient.listeners

Parameters:

Name Type
event string | symbol

Returns: Function[]


notify

notify(channel: string, payload: AnyJson): Promise<void>

Performs NOTIFY to a given channel with a given payload to all listening subscribers

Parameters:

Name Type Description
channel string channel to publish to
payload AnyJson payload to publish for subscribers

Returns: Promise<void>


off

off(event: string | symbol, listener: (...args: any[]) => void): this

Inherited from PgClient.off

Parameters:

Name Type
event string | symbol
listener (...args: any[]) => void

Returns: this


on

on(event: "end", listener: typeof end): this

Overrides void

Sets 'end' event handler

Parameters:

Name Type
event "end"
listener typeof end

Returns: this

on(event: "connect", listener: typeof connect): this

Overrides void

Sets 'connect' event handler

Parameters:

Name Type
event "connect"
listener typeof connect

Returns: this

on(event: "close", listener: typeof close): this

Overrides void

Sets 'close' event handler

Parameters:

Name Type
event "close"
listener typeof close

Returns: this

on(event: "listen", listener: typeof listen): this

Overrides void

Sets 'listen' event handler

Parameters:

Name Type
event "listen"
listener typeof listen

Returns: this

on(event: "unlisten", listener: typeof unlisten): this

Overrides void

Sets 'unlisten' event handler

Parameters:

Name Type
event "unlisten"
listener typeof unlisten

Returns: this

on(event: "error", listener: typeof error): this

Overrides void

Sets 'error' event handler

Parameters:

Name Type
event "error"
listener typeof error

Returns: this

on(event: "reconnect", listener: typeof reconnect): this

Overrides void

Sets 'reconnect' event handler

Parameters:

Name Type
event "reconnect"
listener typeof reconnect

Returns: this

on(event: "message", listener: typeof message): this

Overrides void

Sets 'message' event handler

Parameters:

Name Type
event "message"
listener typeof message

Returns: this

on(event: "notify", listener: typeof notify): this

Overrides void

Sets 'notify' event handler

Parameters:

Name Type
event "notify"
listener typeof notify

Returns: this

on(event: string | symbol, listener: (...args: any[]) => void): this

Overrides void

Sets any unknown or user-defined event handler

Parameters:

Name Type Description
event string | symbol event name
listener (...args: any[]) => void event handler

Returns: this


once

once(event: "end", listener: typeof end): this

Overrides PgClient.once

Sets 'end' event handler, which fired only one single time

Parameters:

Name Type
event "end"
listener typeof end

Returns: this

once(event: "connect", listener: typeof connect): this

Overrides PgClient.once

Sets 'connect' event handler, which fired only one single time

Parameters:

Name Type
event "connect"
listener typeof connect

Returns: this

once(event: "close", listener: typeof close): this

Overrides PgClient.once

Sets 'close' event handler, which fired only one single time

Parameters:

Name Type
event "close"
listener typeof close

Returns: this

once(event: "listen", listener: typeof listen): this

Overrides PgClient.once

Sets 'listen' event handler, which fired only one single time

Parameters:

Name Type
event "listen"
listener typeof listen

Returns: this

once(event: "unlisten", listener: typeof unlisten): this

Overrides PgClient.once

Sets 'unlisten' event handler, which fired only one single time

Parameters:

Name Type
event "unlisten"
listener typeof unlisten

Returns: this

once(event: "error", listener: typeof error): this

Overrides PgClient.once

Sets 'error' event handler, which fired only one single time

Parameters:

Name Type
event "error"
listener typeof error

Returns: this

once(event: "reconnect", listener: typeof reconnect): this

Overrides PgClient.once

Sets 'reconnect' event handler, which fired only one single time

Parameters:

Name Type
event "reconnect"
listener typeof reconnect

Returns: this

once(event: "message", listener: typeof message): this

Overrides PgClient.once

Sets 'message' event handler, which fired only one single time

Parameters:

Name Type
event "message"
listener typeof message

Returns: this

once(event: "notify", listener: typeof notify): this

Overrides PgClient.once

Sets 'notify' event handler, which fired only one single time

Parameters:

Name Type
event "notify"
listener typeof notify

Returns: this

once(event: string | symbol, listener: (...args: any[]) => void): this

Overrides PgClient.once

Sets any unknown or user-defined event handler, which would fire only one single time

Parameters:

Name Type Description
event string | symbol event name
listener (...args: any[]) => void event handler

Returns: this


prependListener

prependListener(event: string | symbol, listener: (...args: any[]) => void): this

Inherited from PgClient.prependListener

Parameters:

Name Type
event string | symbol
listener (...args: any[]) => void

Returns: this


prependOnceListener

prependOnceListener(event: string | symbol, listener: (...args: any[]) => void): this

Inherited from PgClient.prependOnceListener

Parameters:

Name Type
event string | symbol
listener (...args: any[]) => void

Returns: this


rawListeners

rawListeners(event: string | symbol): Function[]

Inherited from PgClient.rawListeners

Parameters:

Name Type
event string | symbol

Returns: Function[]


removeAllListeners

removeAllListeners(event?: string | symbol): this

Inherited from PgClient.removeAllListeners

Parameters:

Name Type
event? string | symbol

Returns: this


removeListener

removeListener(event: string | symbol, listener: (...args: any[]) => void): this

Inherited from PgClient.removeListener

Parameters:

Name Type
event string | symbol
listener (...args: any[]) => void

Returns: this


setMaxListeners

setMaxListeners(n: number): this

Inherited from PgClient.setMaxListeners

Parameters:

Name Type
n number

Returns: this


unlisten

unlisten(channel: string): Promise<void>

Stops listening of the given channel, and, if singleListener option is set to true - will release an acquired lock (if it was settled).

Parameters:

Name Type Description
channel string channel name to unlisten

Returns: Promise<void>


unlistenAll

unlistenAll(): Promise<void>

Stops listening all connected channels, and, if singleListener option is set to true - will release all acquired locks (if any was settled).

Returns: Promise<void>


listenerCount

StaticlistenerCount(emitter: EventEmitter, event: string | symbol): number

Inherited from PgChannelEmitter.listenerCount

deprecated since v4.0.0

Parameters:

Name Type
emitter EventEmitter
event string | symbol

Returns: number