Skip to content

PgPubSub

Mykhailo Stadnyk edited this page Jan 2, 2020 · 6 revisions

Class: PgPubSub

Implements LISTEN/NOTIFY client for PostgreSQL connections.

It is a basic public interface of this library, so the end-user is going to work with this class directly to solve his/her tasks.

Importing:

import { AnyJson, PgPubSub } from '@imqueue/pg-pubsub';

Instantiation:

const pubSub = new PgPubSub(options)

see PgPubSubOptions

Connecting and listening:

pubSub.on('connect', async () => {
    await pubSub.listen('ChannelOne');
    await pubSub.listen('ChannelTwo');
});
// or, even better:
pubSub.on('connect', async () => {
    await Promise.all(
        ['ChannelOne', 'ChannelTwo'].map(channel => channel.listen()),
    );
});
// or. less reliable:
await pubSub.connect();
await Promise.all(
    ['ChannelOne', 'ChannelTwo'].map(channel => channel.listen()),
);

Handle messages:

pubSub.on('message', (channel: string, payload: AnyJson) =>
    console.log(channel, payload);
);
// or, using channels
pubSub.channels.on('ChannelOne', (payload: AnyJson) =>
    console.log(1, payload),
);
pubSub.channels.on('ChannelTwo', (payload: AnyJson) =>
    console.log(2, payload),
);

Destroying:

await pubSub.destroy();

Closing and re-using connection:

await pubSub.close();
await pubSub.connect();

This close/connect technique may be used when doing some heavy message handling, so while you close, another running copy may handle next messages...

Hierarchy

  • EventEmitter

    PgPubSub

Index

Constructors

Events

Properties

Methods

Constructors

constructor

+ new PgPubSub(options: Partial‹PgPubSubOptions›, logger: AnyLogger): PgPubSub

constructor

Parameters:

Name Type Default Description
options Partial‹PgPubSubOptions - options
logger AnyLogger console logger

Returns: PgPubSub

Events

close

close(): void

'close' event, occurs each time connection closed. Differs from 'end' event, because 'end' event may occur many times during re-connectable connection process, but 'close' event states that connection was safely programmatically closed and further re-connections won't happen.

asmemberof PgPubSub

Returns: void


connect

connect(): void

'connect' event, occurs each time database connection is established.

asmemberof PgPubSub

Returns: void


end

end(): void

'end' event, occurs whenever pg connection ends, so, literally it's simply proxy to 'end' event from pg.Client

asmemberof PgPubSub

Returns: void


error

error(err: Error): void

'error' event occurs each time connection error is happened

asmemberof PgPubSub

Parameters:

Name Type Description
err Error error occured during connection

Returns: void


listen

listen(channels: string[]): void

'listen' event occurs each time channel starts being listening

asmemberof PgPubSub

Parameters:

Name Type Description
channels string[] list of channels being started listening

Returns: void


message

message(channel: string, payload: AnyJson): void

'message' event occurs each time database connection gets notification to any listening channel. Fired before channel event emitted.

asmemberof PgPubSub

Parameters:

Name Type Description
channel string channel to which notification corresponding to
payload AnyJson notification message payload

Returns: void


notify

notify(channel: string, payload: AnyJson): void

'notify' event occurs each time new message has been published to a particular channel. Occurs right after database NOTIFY command succeeded.

asmemberof PgPubSub

Parameters:

Name Type Description
channel string channel to which notification was sent
payload AnyJson notification message payload

Returns: void


reconnect

reconnect(retries: number): void

'reconnect' event occurs each time, when the connection is successfully established after connection retry. It is followed by a corresponding 'connect' event, but after all possible channel locks finished their attempts to be re-acquired.

asmemberof PgPubSub

Parameters:

Name Type Description
retries number number of retries made before re-connect succeeded

Returns: void


unlisten

unlisten(channels: string[]): void

'unlisten' event occurs each time channel ends being listening

asmemberof PgPubSub

Parameters:

Name Type Description
channels string[] list of channels being stopped listening

Returns: void

Properties

channels

channels: PgChannelEmitter = new PgChannelEmitter()


logger

logger: AnyLogger

logger


options

options: PgPubSubOptions


pgClient

pgClient: PgClient


Static defaultMaxListeners

defaultMaxListeners: number

Inherited from void

Methods

activeChannels

activeChannels(): string[]

Returns list of all active subscribed channels

Returns: string[]


addListener

addListener(event: string | symbol, listener: function): this

Inherited from void

Overrides void

Parameters:

event: string | symbol

listener: function

▸ (...args: any[]): void

Parameters:

Name Type
...args any[]

Returns: this


allChannels

allChannels(): string[]

Returns list of all known channels, despite the fact they are listening (active) or not (inactive).

Returns: string[]


close

close(): Promise‹void›

Safely closes this database connection

Returns: Promise‹void›


connect

connect(): Promise‹void›

Establishes re-connectable database connection

Returns: Promise‹void›


destroy

destroy(): Promise‹void›

Destroys this object properly, destroying all locks, closing all connections and removing all event listeners to avoid memory leaking. So whenever you need to destroy an object programmatically - use this method. Note, that after destroy it is broken and should be removed from memory.

Returns: Promise‹void›


emit

emit(event: string | symbol, ...args: any[]): boolean

Inherited from void

Overrides void

Parameters:

Name Type
event string | symbol
...args any[]

Returns: boolean


eventNames

eventNames(): Array‹string | symbol›

Inherited from void

Overrides void

Returns: Array‹string | symbol›


getMaxListeners

getMaxListeners(): number

Inherited from void

Overrides void

Returns: number


inactiveChannels

inactiveChannels(): string[]

Returns list of all inactive channels (those which are known, but not actively listening at a time)

Returns: string[]


isActive

isActive(channel?: undefined | string): boolean

If channel argument passed will return true if channel is in active state (listening by this pub/sub), false - otherwise. If channel is not specified - will return true if there is at least one active channel listened by this pub/sub, false - otherwise.

Parameters:

Name Type
channel? undefined | string

Returns: boolean


listen

listen(channel: string): Promise‹void›

Starts listening given channel. If singleListener option is set to true, it guarantees that only one process would be able to listen this channel at a time.

Parameters:

Name Type Description
channel string channel name to listen

Returns: Promise‹void›


listenerCount

listenerCount(type: string | symbol): number

Inherited from void

Overrides void

Parameters:

Name Type
type string | symbol

Returns: number


listeners

listeners(event: string | symbol): Function[]

Inherited from void

Overrides void

Parameters:

Name Type
event string | symbol

Returns: Function[]


notify

notify(channel: string, payload: AnyJson): Promise‹void›

Performs NOTIFY to a given chanel with a given payload to all listening subscribers

Parameters:

Name Type Description
channel string channel to publish to
payload AnyJson payload to publish for subscribers

Returns: Promise‹void›


off

off(event: string | symbol, listener: function): this

Inherited from void

Overrides void

Parameters:

event: string | symbol

listener: function

▸ (...args: any[]): void

Parameters:

Name Type
...args any[]

Returns: this


on

on(event: "end", listener: end): this

Overrides void

Sets 'end' event handler

Parameters:

Name Type
event "end"
listener end

Returns: this

on(event: "connect", listener: connect): this

Overrides void

Sets 'connect' event handler

Parameters:

Name Type
event "connect"
listener connect

Returns: this

on(event: "close", listener: close): this

Overrides void

Sets 'close' event handler

Parameters:

Name Type
event "close"
listener close

Returns: this

on(event: "listen", listener: listen): this

Overrides void

Sets 'listen' event handler

Parameters:

Name Type
event "listen"
listener listen

Returns: this

on(event: "unlisten", listener: unlisten): this

Overrides void

Sets 'unlisten' event handler

Parameters:

Name Type
event "unlisten"
listener unlisten

Returns: this

on(event: "error", listener: error): this

Overrides void

Sets 'error' event handler

Parameters:

Name Type
event "error"
listener error

Returns: this

on(event: "reconnect", listener: reconnect): this

Overrides void

Sets 'reconnect' event handler

Parameters:

Name Type
event "reconnect"
listener reconnect

Returns: this

on(event: "message", listener: message): this

Overrides void

Sets 'message' event handler

Parameters:

Name Type
event "message"
listener message

Returns: this

on(event: "notify", listener: notify): this

Overrides void

Sets 'notify' event handler

Parameters:

Name Type
event "notify"
listener notify

Returns: this

on(event: string | symbol, listener: function): this

Overrides void

Sets any unknown or user-defined event handler

Parameters:

event: string | symbol

event name

listener: function

event handler

▸ (...args: any[]): void

Parameters:

Name Type
...args any[]

Returns: this


once

once(event: "end", listener: end): this

Overrides void

Sets 'end' event handler, which fired only one single time

Parameters:

Name Type
event "end"
listener end

Returns: this

once(event: "connect", listener: connect): this

Overrides void

Sets 'connect' event handler, which fired only one single time

Parameters:

Name Type
event "connect"
listener connect

Returns: this

once(event: "close", listener: close): this

Overrides void

Sets 'close' event handler, which fired only one single time

Parameters:

Name Type
event "close"
listener close

Returns: this

once(event: "listen", listener: listen): this

Overrides void

Sets 'listen' event handler, which fired only one single time

Parameters:

Name Type
event "listen"
listener listen

Returns: this

once(event: "unlisten", listener: unlisten): this

Overrides void

Sets 'unlisten' event handler, which fired only one single time

Parameters:

Name Type
event "unlisten"
listener unlisten

Returns: this

once(event: "error", listener: error): this

Overrides void

Sets 'error' event handler, which fired only one single time

Parameters:

Name Type
event "error"
listener error

Returns: this

once(event: "reconnect", listener: reconnect): this

Overrides void

Sets 'reconnect' event handler, which fired only one single time

Parameters:

Name Type
event "reconnect"
listener reconnect

Returns: this

once(event: "message", listener: message): this

Overrides void

Sets 'message' event handler, which fired only one single time

Parameters:

Name Type
event "message"
listener message

Returns: this

once(event: "notify", listener: notify): this

Overrides void

Sets 'notify' event handler, which fired only one single time

Parameters:

Name Type
event "notify"
listener notify

Returns: this

once(event: string | symbol, listener: function): this

Overrides void

Sets any unknown or user-defined event handler, which would fire only one single time

Parameters:

event: string | symbol

event name

listener: function

event handler

▸ (...args: any[]): void

Parameters:

Name Type
...args any[]

Returns: this


prependListener

prependListener(event: string | symbol, listener: function): this

Inherited from void

Overrides void

Parameters:

event: string | symbol

listener: function

▸ (...args: any[]): void

Parameters:

Name Type
...args any[]

Returns: this


prependOnceListener

prependOnceListener(event: string | symbol, listener: function): this

Inherited from void

Overrides void

Parameters:

event: string | symbol

listener: function

▸ (...args: any[]): void

Parameters:

Name Type
...args any[]

Returns: this


rawListeners

rawListeners(event: string | symbol): Function[]

Inherited from void

Overrides void

Parameters:

Name Type
event string | symbol

Returns: Function[]


removeAllListeners

removeAllListeners(event?: string | symbol): this

Inherited from void

Overrides void

Parameters:

Name Type
event? string | symbol

Returns: this


removeListener

removeListener(event: string | symbol, listener: function): this

Inherited from void

Overrides void

Parameters:

event: string | symbol

listener: function

▸ (...args: any[]): void

Parameters:

Name Type
...args any[]

Returns: this


setMaxListeners

setMaxListeners(n: number): this

Inherited from void

Overrides void

Parameters:

Name Type
n number

Returns: this


unlisten

unlisten(channel: string): Promise‹void›

Stops listening of the given chanel, and, if singleListener option is set to true - will release an acquired lock (if it was settled).

Parameters:

Name Type Description
channel string channel name to unlisten

Returns: Promise‹void›


unlistenAll

unlistenAll(): Promise‹void›

Stops listening all connected channels, and, if singleListener option is set to true - will release all acquired locks (if any was settled).

Returns: Promise‹void›


Static listenerCount

listenerCount(emitter: EventEmitter, event: string | symbol): number

Inherited from void

deprecated since v4.0.0

Parameters:

Name Type
emitter EventEmitter
event string | symbol

Returns: number

Clone this wiki locally