Title: | Lightweight Portable Message Queue Using 'SQLite' |
Version: | 1.1.0 |
Author: | Gábor Csárdi |
Maintainer: | Gábor Csárdi <csardi.gabor@gmail.com> |
Description: | Temporary and permanent message queues for R. Built on top of 'SQLite' databases. 'SQLite' provides locking, and makes it possible to detect crashed consumers. Crashed jobs can be automatically marked as "failed", or put in the queue again, potentially a limited number of times. |
License: | MIT + file LICENSE |
LazyData: | true |
URL: | https://github.com/r-lib/liteq#readme |
BugReports: | https://github.com/r-lib/liteq/issues |
RoxygenNote: | 6.1.1 |
Imports: | assertthat, DBI, rappdirs, RSQLite |
Suggests: | callr, covr, processx, testthat, withr |
Encoding: | UTF-8 |
NeedsCompilation: | no |
Packaged: | 2019-03-08 10:41:18 UTC; gaborcsardi |
Repository: | CRAN |
Date/Publication: | 2019-03-08 13:40:10 UTC |
Lightweight Portable Message Queue Using 'SQLite'
Description
Message queues for R. Built on top of 'SQLite' databases.
Concurrency
liteq works with multiple producer and/or consumer processes accessing
the same queue, via the locking mechanism of 'SQLite'. If a queue is
locked by 'SQLite', the process that tries to access it, must wait until
it is unlocked. The maximum amount of waiting time is by default 10
seconds, and it can be changed via the R_LITEQ_BUSY_TIMEOUT
environment variable, in milliseconds. If you have many concurrent
processes using the same liteq database, and see database locked
errors, then you can try to increase the timeout value.
Examples
# We don't run this, because it writes to the cache directory db <- tempfile() q <- ensure_queue("jobs", db = db) q list_queues(db) # Publish two messages publish(q, title = "First message", message = "Hello world!") publish(q, title = "Second message", message = "Hello again!") is_empty(q) message_count(q) list_messages(q) # Consume one msg <- try_consume(q) msg ack(msg) list_messages(q) msg2 <- try_consume(q) nack(msg2) list_messages(q) # No more messages is_empty(q) try_consume(q)
Examples
## See the manual page
Acknowledge that the work on a message has finished successfully
Description
Acknowledge that the work on a message has finished successfully
Usage
ack(message)
Arguments
message |
The message object. |
See Also
liteq for examples
Other liteq messages: consume
,
is_empty
,
list_failed_messages
,
list_messages
, message_count
,
publish
,
remove_failed_messages
,
requeue_failed_messages
,
try_consume
Consume a message from a queue
Description
Blocks and waits for a message if there isn't one to work on currently.
Usage
consume(queue, poll_interval = 500)
Arguments
queue |
The queue object. |
poll_interval |
Poll interval in milliseconds. How often to poll the queue for new jobs, if none are immediately available. |
Value
A message.
See Also
liteq for examples
Other liteq messages: ack
,
is_empty
,
list_failed_messages
,
list_messages
, message_count
,
publish
,
remove_failed_messages
,
requeue_failed_messages
,
try_consume
Create a queue in a database
Description
It also creates the database, if it does not exist.
Usage
create_queue(name = NULL, db = default_db(), crash_strategy = "fail")
Arguments
name |
Name of the queue. If not specified or |
db |
Path to the database file. |
crash_strategy |
What to do with crashed jobs. The default is that
they will |
See Also
liteq for examples
Other liteq queues: delete_queue
,
ensure_queue
, list_queues
Positive or negative ackowledgement
Description
If positive, then we need to remove the message from the queue.
If negative, we just set the status to FAILED
.
Usage
db_ack(db, queue, id, lock, success)
Arguments
db |
DB file. |
queue |
Queue name. |
id |
Message id. |
lock |
Name of the message lock file. |
success |
Whether this is a positive or negative ACK. |
Consume a message from a message queue
Description
This is the blocking version of try_consume()
. Currently it just
polls twice a second, and sleeps between the polls. Each poll will also
trigger a crash cleanup, if there are workers running.
Usage
db_consume(db, queue, poll_interval = 500)
Arguments
queue |
The queue object. |
Create a queue
Description
The database columns:
id Id of the message, it is generated automatically by the database.
title The title of the message, can be empty. In the future, it can be used to filter messages.
message The message, arbitrary text, can be empty.
status Can be:
-
READY
, ready to be consumed -
WORKING
, it is being consumed -
FAILED
, failed.
-
requeued How many times the message was requeued.
Usage
db_create_queue(name, db, crash_strategy)
Arguments
name |
Name of the queue. If not specified or |
db |
Path to the database file. |
crash_strategy |
What to do with crashed jobs. The default is that
they will |
Try to consume a message from the queue
Description
If there is a message that it READY
, it returns that. Otherwise it
checks for crashed workers.
Usage
db_try_consume(db, queue, crashed = TRUE, con = NULL)
Arguments
db |
DB file name. |
queue |
Name of the queue. |
Details of the implementation
The database must be locked for the whole operation, including checking on or creating the lock databases.
If there is a
READY
message, that one is taken.Otherwise if there are
WORKING
messages, then we check them one by one. This might take a lot of time, and the DB must be locked for the whole search, so it is not ideal. But I don't have a better solution right now.
Taking a message means
Updating its row.status to
WORKING
.Creating another database that serves as the lock for this message.
The name of the default database
Description
If the queue database is not specified explicitly,
then liteq
uses this file. Its location is determined via the
rappdirs
package, see rappdirs::user_data_dir()
.
Usage
default_db()
Value
A characater scalar, the name of the default database.
Delete a queue
Description
Delete a queue
Usage
delete_queue(queue, force = FALSE)
Arguments
queue |
The queue to delete. |
force |
Whether to delete the queue even if it contains messages. |
See Also
liteq for examples
Other liteq queues: create_queue
,
ensure_queue
, list_queues
Ensure that the DB exists and has the right columns
Description
We try a query, and if it fails then we try to create the DB.
Usage
ensure_db(db)
Arguments
db |
DB file. |
Make sure that a queue exists
Description
If it does not exist, then the queue will be created.
Usage
ensure_queue(name, db = default_db(), crash_strategy = "fail")
Arguments
name |
Name of the queue. If not specified or |
db |
Path to the database file. |
crash_strategy |
What to do with crashed jobs. The default is that
they will |
Value
The queue object.
See Also
liteq for examples
Other liteq queues: create_queue
,
delete_queue
, list_queues
Check if a queue is empty
Description
Check if a queue is empty
Usage
is_empty(queue)
Arguments
queue |
The queue object. |
Value
Logical, whether the queue is empty.
See Also
liteq for examples
Other liteq messages: ack
,
consume
,
list_failed_messages
,
list_messages
, message_count
,
publish
,
remove_failed_messages
,
requeue_failed_messages
,
try_consume
List failed messages in a queue
Description
List failed messages in a queue
Usage
list_failed_messages(queue)
Arguments
queue |
The queue object. |
Value
Data frame with columns: id
, title
, status
.
See Also
liteq for examples
Other liteq messages: ack
,
consume
, is_empty
,
list_messages
, message_count
,
publish
,
remove_failed_messages
,
requeue_failed_messages
,
try_consume
List all messages in a queue
Description
List all messages in a queue
Usage
list_messages(queue)
Arguments
queue |
The queue object. |
Value
Data frame with columns: id
, title
, status
.
See Also
liteq for examples
Other liteq messages: ack
,
consume
, is_empty
,
list_failed_messages
,
message_count
, publish
,
remove_failed_messages
,
requeue_failed_messages
,
try_consume
List all queues in a database
Description
List all queues in a database
Usage
list_queues(db = default_db())
Arguments
db |
The queue database to query. |
Value
A list of liteq_queue
objects.
See Also
liteq for examples
Other liteq queues: create_queue
,
delete_queue
, ensure_queue
Make a message object
Description
It creates the lock for the message as well.
Usage
make_message(id, title, message, db, queue, lockdir)
Arguments
id |
Message id, integer, auto-generated. |
title |
Title of message. |
message |
The message itself. |
db |
Main DB file. |
queue |
Name of the queue. |
lockdir |
Directory to create the message lock in. |
Details
The message object contains the connection to the message lock. If the worker crashes, then there will be no reference to the connection, and the lock will be released. This is how we detect crashed workers.
Value
message object
Get the number of messages in a queue.
Description
Get the number of messages in a queue.
Usage
message_count(queue)
Arguments
queue |
The queue object. |
Value
Number of messages in the queue.
See Also
liteq for examples
Other liteq messages: ack
,
consume
, is_empty
,
list_failed_messages
,
list_messages
, publish
,
remove_failed_messages
,
requeue_failed_messages
,
try_consume
Report that the work on a message has failed
Description
Report that the work on a message has failed
Usage
nack(message)
Arguments
message |
The message object. |
See Also
liteq for examples
Publish a message in a queue
Description
Publish a message in a queue
Usage
publish(queue, title = "", message = "")
Arguments
queue |
The queue object. |
title |
The title of the message. It can be the empty string. |
message |
The body of the message. It can be the empty string. |
See Also
liteq for examples
Other liteq messages: ack
,
consume
, is_empty
,
list_failed_messages
,
list_messages
, message_count
,
remove_failed_messages
,
requeue_failed_messages
,
try_consume
Remove failed messages from the queue
Description
Remove failed messages from the queue
Usage
remove_failed_messages(queue, id = NULL)
Arguments
queue |
The queue object. |
id |
Ids of the messages to requeue. If it is |
See Also
liteq for examples
Other liteq messages: ack
,
consume
, is_empty
,
list_failed_messages
,
list_messages
, message_count
,
publish
,
requeue_failed_messages
,
try_consume
Requeue failed messages
Description
Requeue failed messages
Usage
requeue_failed_messages(queue, id = NULL)
Arguments
queue |
The queue object. |
id |
Ids of the messages to requeue. If it is |
See Also
liteq for examples
Other liteq messages: ack
,
consume
, is_empty
,
list_failed_messages
,
list_messages
, message_count
,
publish
,
remove_failed_messages
,
try_consume
Consume a message if there is one available
Description
Consume a message if there is one available
Usage
try_consume(queue)
Arguments
queue |
The queue object. |
Value
A message, or NULL
if there is not message to work on.
See Also
liteq for examples
Other liteq messages: ack
,
consume
, is_empty
,
list_failed_messages
,
list_messages
, message_count
,
publish
,
remove_failed_messages
,
requeue_failed_messages