2.0.9
Creates a new QlobberPG
object for publishing and subscribing to a
PostgreSQL queue.
Extends EventEmitter
(Object)
Configures the PostgreSQL queue.
Name | Description |
---|---|
options.name String
|
Unique identifier for this
QlobberPG
instance. Every instance connected to the queue at the same time
must have a different name.
|
options.db Object
|
node-postgres configuration
used for communicating with PostgreSQL.
|
options.single_ttl Integer
(default 1h )
|
Default time-to-live (in milliseconds) for messages which should be read by at most one subscriber. This value is added to the current time and the resulting expiry time is put into the message's database row. After the expiry time, the message is ignored and deleted when convenient. |
options.multi_ttl Integer
(default 1m )
|
Default time-to-live (in milliseconds) for messages which can be read by many subscribers. This value is added to the current time and the resulting expiry time is put into the message's database row. After the expiry time, the message is ignored and deleted when convenient. |
options.expire_interval Integer
(default 10s )
|
Number of milliseconds between deleting expired messages from the database. |
options.poll_interval Integer
(default 1s )
|
Number of milliseconds between checking the database for new messages. |
options.notify Boolean
(default true )
|
Whether to use a database trigger
to watch for new messages. Note that this will be done in addition to
polling the database every
poll_interval
milliseconds.
|
options.message_concurrency Integer
(default 1 )
|
The number of messages to process at once. |
options.handler_concurrency Integer
(default 0 )
|
By default (0), a message
is considered handled by a subscriber only when all its data has been
read. If you set
handler_concurrency
to non-zero, a message is
considered handled as soon as a subscriber receives it. The next message
will then be processed straight away. The value of
handler_concurrency
limits the number of messages being handled by subscribers at any one
time.
|
options.order_by_expiry Boolean
(default false )
|
Pass messages to subscribers in order of their expiry time. |
options.dedup Boolean
(default true )
|
Whether to ensure each handler function is called at most once when a message is received. |
options.single Boolean
(default true )
|
Whether to process messages meant
for
at most
one subscriber (across all
QlobberPG
instances), i.e.
work queues.
|
options.separator String
(default '.' )
|
The character to use for separating words in message topics. |
options.wildcard_one String
(default '*' )
|
The character to use for matching exactly one word in a message topic to a subscriber. |
options.wildcard_some String
(default '#' )
|
The character to use for matching zero or more words in a message topic to a subscriber. |
options.filter (Function | Array<Function>)?
|
Function called before each message is processed.
|
options.batch_size Integer
(default 100 )
|
Passed to https://github.com/brianc/node-pg-query-stream[`node-pg-query-stream`] and specifies how many messages to retrieve from the database at a time (using a cursor). |
Check the database for new messages now rather than waiting for the next periodic check to occur.
Same as QlobberPG#refresh_now.
Stop checking for new messages.
(Function?)
Optional function to call once access to the
database has stopped. Alternatively, you can listen for the
QlobberPG#stop
event.
Same as QlobberPG#stop.
(any)
Subscribe to messages in the PostgreSQL queue.
(String)
Which messages you're interested in receiving.
Message topics are split into words using
.
as the separator. You can
use
*
to match exactly one word in a topic or
#
to match zero or more
words. For example,
foo.*
would match
foo.bar
whereas
foo.#
would
match
foo
,
foo.bar
and
foo.bar.wup
. Note you can change the
separator and wildcard characters by specifying the
separator
,
wildcard_one
and
wildcard_some
options when
constructing
QlobberPG
objects. See the
qlobber
documentation
for more information. Valid characters in
topic
are:
A-Za-z0-9_*#.
(Function)
Function to call when a new message is
received on the PostgreSQL queue and its topic matches against
topic
.
handler
will be passed the following arguments:
data
(Readable
| Buffer
)
Message payload as a Readable stream or a Buffer.
By default you'll receive a Buffer. If handler
has a property
accept_stream
set to a truthy value then you'll receive a stream.
Note that all subscribers will receive the same stream or content for
each message. You should take this into account when reading from the
stream. The stream can be piped into multiple
Writable
streams but bear in mind it will go at the rate of the slowest one.info
(Object
) Metadata for the message, with the following
properties:
topic
(String
) Topic the message was published with.expires
(Integer
) When the message expires (number of
milliseconds after January 1970 00:00:00 UTC).single
(Boolean
) Whether this message is being given to at
most one subscriber (across all QlobberPG
instances).size
(Integer
) Message size in bytes.publisher
(String
) Name of the QlobberPG
instance which
published the message.done
(Function
) Function to call one you've handled the
message. Note that calling this function is only mandatory if
info.single === true
, in order to delete and unlock the message
row in the database table. done
takes two arguments:
err
(Object
) If an error occurred then pass details of the
error, otherwise pass null
or undefined
.finish
(Function
) Optional function to call once the message
has been deleted and unlocked, in the case of
info.single === true
, or straight away otherwise. It will be
passed the following argument:
err
(Object
) If an error occurred then details of the
error, otherwise null
.(Object?)
Optional settings for this subscription.
Name | Description |
---|---|
options.subscribe_to_existing Boolean
(default false )
|
If
true
then
handler
will be called with any existing, unexpired messages that
match
topic
, as well as new ones. If
false
(the default) then
handler
will be called with new messages only.
|
(Function?)
Optional function to call once the subscription
has been registered. This will be passed the following argument:
err
(Object
) If an error occurred then details of the error,
otherwise null
.Unsubscribe from messages in the PostgreSQL queue.
(String?)
Which messages you're no longer interested in
receiving via the
handler
function. This should be a topic you've
previously passed to
QlobberPG#subscribe
. If
topic
is
undefined
then all handlers for all topics are unsubscribed.
(Function?)
The function you no longer want to be
called with messages published to the topic
topic
. This should be a
function you've previously passed to
QlobberPG#subscribe
.
If you subscribed
handler
to a different topic then it will still
be called for messages which match that topic. If
handler
is
undefined
, all handlers for the topic
topic
are unsubscribed.
(Function?)
Optional function to call once
handler
has
been unsubscribed from
topic
. This will be passed the following
argument:
err
(Object
) If an error occurred then details of the error,
otherwise null
.Publish a message to the PostgreSQL queue.
(String)
Message topic. The topic should be a series of
words separated by
.
(or the
separator
character you passed to
the
constructor
). Valid characters in
topic
are:
A-Za-z0-9_.
(Object)
Optional settings for this publication.
Name | Description |
---|---|
options.single Boolean
(default false )
|
If
true
then the message
will be given to
at most
one interested subscriber, across all
QlobberPG
instances querying the PostgreSQL queue. Otherwise all
interested subscribers will receive the message (the default).
|
options.ttl Integer?
|
Time-to-live (in milliseconds) for this
message. If you don't specify anything then
single_ttl
or
multi_ttl
(provided to the
constructor
) will be
used, depending on the value of
single
. After the time-to-live
for the message has passed, the message is ignored and deleted when
convenient.
|
options.encoding String
(default "utf8" )
|
If
payload
is a string,
the encoding to use when writing to the database.
|
(Function?)
Optional function to call once the message has
been written to the database. It will be passed the following
arguments:
err
(Object
) If an error occurred then details of the error,
otherwise null
.info
(Object
) Metadata for the message. See
QlobberPG#subscribe for a description of info
's properties.(Stream | undefined)
:
A
Writable
if no
payload
was passed, otherwise
undefined
.
Error event. Emitted if an unrecoverable error occurs. QlobberPG may stop querying the database for messages.
Type: Object
Start event. Emitted when messages can be published and subscribed to.
Stop event. Emitted after QlobberPG#stop has been called and access to the database has stopped.