-
Notifications
You must be signed in to change notification settings - Fork 9
PgPubSub
Globals / PgPubSub
Implements LISTEN/NOTIFY client for PostgreSQL connections.
It is a basic public interface of this library, so the end-user is going to work with this class directly to solve his/her tasks.
Importing:
import { AnyJson, PgPubSub } from '@imqueue/pg-pubsub';
Instantiation:
const pubSub = new PgPubSub(options)
see
PgPubSubOptions
Connecting and listening:
pubSub.on('connect', async () => {
await pubSub.listen('ChannelOne');
await pubSub.listen('ChannelTwo');
});
// or, even better:
pubSub.on('connect', async () => {
await Promise.all(
['ChannelOne', 'ChannelTwo'].map(channel => channel.listen()),
);
});
// or. less reliable:
await pubSub.connect();
await Promise.all(
['ChannelOne', 'ChannelTwo'].map(channel => channel.listen()),
);
Handle messages:
pubSub.on('message', (channel: string, payload: AnyJson) =>
console.log(channel, payload);
);
// or, using channels
pubSub.channels.on('ChannelOne', (payload: AnyJson) =>
console.log(1, payload),
);
pubSub.channels.on('ChannelTwo', (payload: AnyJson) =>
console.log(2, payload),
);
Destroying:
await pubSub.destroy();
Closing and re-using connection:
await pubSub.close();
await pubSub.connect();
This close/connect technique may be used when doing some heavy message handling, so while you close, another running copy may handle next messages...
-
EventEmitter
↳ PgPubSub
- activeChannels
- addListener
- allChannels
- close
- connect
- destroy
- emit
- eventNames
- getMaxListeners
- inactiveChannels
- isActive
- listen
- listenerCount
- listeners
- notify
- off
- on
- once
- prependListener
- prependOnceListener
- rawListeners
- removeAllListeners
- removeListener
- setMaxListeners
- unlisten
- unlistenAll
- listenerCount
+ new PgPubSub(options
: Partial<PgPubSubOptions>, logger?
: AnyLogger): PgPubSub
Overrides PgChannelEmitter.constructor
Name | Type | Default value | Description |
---|---|---|---|
options |
Partial<PgPubSubOptions> | - | options |
logger |
AnyLogger | console | logger |
Returns: PgPubSub
• close(): void
'close'
event, occurs each time connection closed. Differs from 'end'
event, because 'end'
event may occur many times during re-connectable
connection process, but 'close'
event states that connection was
safely programmatically closed and further re-connections won't happen.
asmemberof
PgPubSub
Returns: void
• connect(): void
'connect'
event, occurs each time database connection is established.
asmemberof
PgPubSub
Returns: void
• end(): void
'end'
event, occurs whenever pg connection ends, so, literally it's simply
proxy to 'end'
event from pg.Client
asmemberof
PgPubSub
Returns: void
• error(err
: Error): void
'error'
event occurs each time connection error is happened
asmemberof
PgPubSub
Name | Type | Description |
---|---|---|
err |
Error | error occurred during connection |
Returns: void
• listen(channels
: string[]): void
'listen'
event occurs each time channel starts being listening
asmemberof
PgPubSub
Name | Type | Description |
---|---|---|
channels |
string[] | list of channels being started listening |
Returns: void
• message(chan
: string, payload
: AnyJson): void
'message'
event occurs each time database connection gets notification
to any listening channel. Fired before channel event emitted.
asmemberof
PgPubSub
Name | Type | Description |
---|---|---|
chan |
string | channel to which notification corresponding to |
payload |
AnyJson | notification message payload |
Returns: void
• notify(chan
: string, payload
: AnyJson): void
'notify'
event occurs each time new message has been published to a
particular channel. Occurs right after database NOTIFY command succeeded.
asmemberof
PgPubSub
Name | Type | Description |
---|---|---|
chan |
string | channel to which notification was sent |
payload |
AnyJson | notification message payload |
Returns: void
• reconnect(retries
: number): void
'reconnect'
event occurs each time, when the connection is successfully
established after connection retry. It is followed by a corresponding
'connect'
event, but after all possible channel locks finished their
attempts to be re-acquired.
asmemberof
PgPubSub
Name | Type | Description |
---|---|---|
retries |
number | number of retries made before re-connect succeeded |
Returns: void
• unlisten(channels
: string[]): void
'unlisten'
event occurs each time channel ends being listening
asmemberof
PgPubSub
Name | Type | Description |
---|---|---|
channels |
string[] | list of channels being stopped listening |
Returns: void
• Readonly
channels: PgChannelEmitter = new PgChannelEmitter()
• Readonly
logger: AnyLogger
logger
• Readonly
options: PgPubSubOptions
• Readonly
pgClient: PgClient
▪ Static
defaultMaxListeners: number
Inherited from PgChannelEmitter.defaultMaxListeners
▪ Static
Readonly
errorMonitor: unique symbol
Inherited from PgChannelEmitter.errorMonitor
This symbol shall be used to install a listener for only monitoring 'error'
events. Listeners installed using this symbol are called before the regular
'error'
listeners are called.
Installing a listener using this symbol does not change the behavior once an
'error'
event is emitted, therefore the process will still crash if no
regular 'error'
listener is installed.
▸ activeChannels(): string[]
Returns list of all active subscribed channels
Returns: string[]
▸ addListener(event
: string | symbol, listener
: (...args: any[]) => void): this
Inherited from PgClient.addListener
Name | Type |
---|---|
event |
string | symbol |
listener |
(...args: any[]) => void |
Returns: this
▸ allChannels(): string[]
Returns list of all known channels, despite the fact they are listening (active) or not (inactive).
Returns: string[]
▸ close(): Promise<void>
Safely closes this database connection
Returns: Promise<void>
▸ connect(): Promise<void>
Establishes re-connectable database connection
Returns: Promise<void>
▸ destroy(): Promise<void>
Destroys this object properly, destroying all locks, closing all connections and removing all event listeners to avoid memory leaking. So whenever you need to destroy an object programmatically - use this method. Note, that after destroy it is broken and should be removed from memory.
Returns: Promise<void>
▸ emit(event
: string | symbol, ...args
: any[]): boolean
Name | Type |
---|---|
event |
string | symbol |
...args |
any[] |
Returns: boolean
▸ eventNames(): Array<string | symbol>
Inherited from PgClient.eventNames
Returns: Array<string | symbol>
▸ getMaxListeners(): number
Inherited from PgClient.getMaxListeners
Returns: number
▸ inactiveChannels(): string[]
Returns list of all inactive channels (those which are known, but not actively listening at a time)
Returns: string[]
▸ isActive(channel?
: undefined | string): boolean
If channel argument passed will return true if channel is in active state (listening by this pub/sub), false - otherwise. If channel is not specified - will return true if there is at least one active channel listened by this pub/sub, false - otherwise.
Name | Type |
---|---|
channel? |
undefined | string |
Returns: boolean
▸ listen(channel
: string): Promise<void>
Starts listening given channel. If singleListener option is set to true, it guarantees that only one process would be able to listen this channel at a time.
Name | Type | Description |
---|---|---|
channel |
string | channel name to listen |
Returns: Promise<void>
▸ listenerCount(event
: string | symbol): number
Inherited from PgClient.listenerCount
Name | Type |
---|---|
event |
string | symbol |
Returns: number
▸ listeners(event
: string | symbol): Function[]
Inherited from PgClient.listeners
Name | Type |
---|---|
event |
string | symbol |
Returns: Function[]
▸ notify(channel
: string, payload
: AnyJson): Promise<void>
Performs NOTIFY to a given channel with a given payload to all listening subscribers
Name | Type | Description |
---|---|---|
channel |
string | channel to publish to |
payload |
AnyJson | payload to publish for subscribers |
Returns: Promise<void>
▸ off(event
: string | symbol, listener
: (...args: any[]) => void): this
Name | Type |
---|---|
event |
string | symbol |
listener |
(...args: any[]) => void |
Returns: this
▸ on(event
: "end", listener
: typeof end): this
Overrides void
Sets 'end'
event handler
Name | Type |
---|---|
event |
"end" |
listener |
typeof end |
Returns: this
▸ on(event
: "connect", listener
: typeof connect): this
Overrides void
Sets 'connect'
event handler
Name | Type |
---|---|
event |
"connect" |
listener |
typeof connect |
Returns: this
▸ on(event
: "close", listener
: typeof close): this
Overrides void
Sets 'close'
event handler
Name | Type |
---|---|
event |
"close" |
listener |
typeof close |
Returns: this
▸ on(event
: "listen", listener
: typeof listen): this
Overrides void
Sets 'listen'
event handler
Name | Type |
---|---|
event |
"listen" |
listener |
typeof listen |
Returns: this
▸ on(event
: "unlisten", listener
: typeof unlisten): this
Overrides void
Sets 'unlisten'
event handler
Name | Type |
---|---|
event |
"unlisten" |
listener |
typeof unlisten |
Returns: this
▸ on(event
: "error", listener
: typeof error): this
Overrides void
Sets 'error'
event handler
Name | Type |
---|---|
event |
"error" |
listener |
typeof error |
Returns: this
▸ on(event
: "reconnect", listener
: typeof reconnect): this
Overrides void
Sets 'reconnect'
event handler
Name | Type |
---|---|
event |
"reconnect" |
listener |
typeof reconnect |
Returns: this
▸ on(event
: "message", listener
: typeof message): this
Overrides void
Sets 'message'
event handler
Name | Type |
---|---|
event |
"message" |
listener |
typeof message |
Returns: this
▸ on(event
: "notify", listener
: typeof notify): this
Overrides void
Sets 'notify'
event handler
Name | Type |
---|---|
event |
"notify" |
listener |
typeof notify |
Returns: this
▸ on(event
: string | symbol, listener
: (...args: any[]) => void): this
Overrides void
Sets any unknown or user-defined event handler
Name | Type | Description |
---|---|---|
event |
string | symbol | event name |
listener |
(...args: any[]) => void | event handler |
Returns: this
▸ once(event
: "end", listener
: typeof end): this
Sets 'end'
event handler, which fired only one single time
Name | Type |
---|---|
event |
"end" |
listener |
typeof end |
Returns: this
▸ once(event
: "connect", listener
: typeof connect): this
Sets 'connect'
event handler, which fired only one single time
Name | Type |
---|---|
event |
"connect" |
listener |
typeof connect |
Returns: this
▸ once(event
: "close", listener
: typeof close): this
Sets 'close'
event handler, which fired only one single time
Name | Type |
---|---|
event |
"close" |
listener |
typeof close |
Returns: this
▸ once(event
: "listen", listener
: typeof listen): this
Sets 'listen'
event handler, which fired only one single time
Name | Type |
---|---|
event |
"listen" |
listener |
typeof listen |
Returns: this
▸ once(event
: "unlisten", listener
: typeof unlisten): this
Sets 'unlisten'
event handler, which fired only one single time
Name | Type |
---|---|
event |
"unlisten" |
listener |
typeof unlisten |
Returns: this
▸ once(event
: "error", listener
: typeof error): this
Sets 'error'
event handler, which fired only one single time
Name | Type |
---|---|
event |
"error" |
listener |
typeof error |
Returns: this
▸ once(event
: "reconnect", listener
: typeof reconnect): this
Sets 'reconnect'
event handler, which fired only one single time
Name | Type |
---|---|
event |
"reconnect" |
listener |
typeof reconnect |
Returns: this
▸ once(event
: "message", listener
: typeof message): this
Sets 'message'
event handler, which fired only one single time
Name | Type |
---|---|
event |
"message" |
listener |
typeof message |
Returns: this
▸ once(event
: "notify", listener
: typeof notify): this
Sets 'notify'
event handler, which fired only one single time
Name | Type |
---|---|
event |
"notify" |
listener |
typeof notify |
Returns: this
▸ once(event
: string | symbol, listener
: (...args: any[]) => void): this
Sets any unknown or user-defined event handler, which would fire only one single time
Name | Type | Description |
---|---|---|
event |
string | symbol | event name |
listener |
(...args: any[]) => void | event handler |
Returns: this
▸ prependListener(event
: string | symbol, listener
: (...args: any[]) => void): this
Inherited from PgClient.prependListener
Name | Type |
---|---|
event |
string | symbol |
listener |
(...args: any[]) => void |
Returns: this
▸ prependOnceListener(event
: string | symbol, listener
: (...args: any[]) => void): this
Inherited from PgClient.prependOnceListener
Name | Type |
---|---|
event |
string | symbol |
listener |
(...args: any[]) => void |
Returns: this
▸ rawListeners(event
: string | symbol): Function[]
Inherited from PgClient.rawListeners
Name | Type |
---|---|
event |
string | symbol |
Returns: Function[]
▸ removeAllListeners(event?
: string | symbol): this
Inherited from PgClient.removeAllListeners
Name | Type |
---|---|
event? |
string | symbol |
Returns: this
▸ removeListener(event
: string | symbol, listener
: (...args: any[]) => void): this
Inherited from PgClient.removeListener
Name | Type |
---|---|
event |
string | symbol |
listener |
(...args: any[]) => void |
Returns: this
▸ setMaxListeners(n
: number): this
Inherited from PgClient.setMaxListeners
Name | Type |
---|---|
n |
number |
Returns: this
▸ unlisten(channel
: string): Promise<void>
Stops listening of the given channel, and, if singleListener option is set to true - will release an acquired lock (if it was settled).
Name | Type | Description |
---|---|---|
channel |
string | channel name to unlisten |
Returns: Promise<void>
▸ unlistenAll(): Promise<void>
Stops listening all connected channels, and, if singleListener option is set to true - will release all acquired locks (if any was settled).
Returns: Promise<void>
▸ Static
listenerCount(emitter
: EventEmitter, event
: string | symbol): number
Inherited from PgChannelEmitter.listenerCount
deprecated
since v4.0.0
Name | Type |
---|---|
emitter |
EventEmitter |
event |
string | symbol |
Returns: number