rrq

Getting started

This document assumes that that you have a Redis server running. If not, see the bottom of the document for options you will have for installing this on your own system. You can test if your Redis server is behaving as expected by running

redux::hiredis()$PING()
#> [Redis: PONG]

If you get an error like “Connection refused” then check your installation.

The package is designed to be easy to get started with, and has features that you might like to use later. If you run the “Hello world” section you probably have 90% of what you need - the result of the document will show features that will help you bend that around your specific needs.

Hello world

Without any great explanation, here is the basic approach to using rrq to run a task on another R process, asynchronously. First, we create a “controller” object which you can use for queuing tasks.

library(rrq)
id <- paste0("rrq:", ids::random_id(bytes = 4))
obj <- rrq_controller(id)

This controller uses an “identifier” (here, id is rrq:b09033b1) which can be anything you want but acts like a folder within the Redis server, distinguishing your queue from any others hosted on the same server.

We will set this a default controller to use, which means we can avoid passing in controller = obj to all the calls below:

rrq_default_controller_set(obj)

Submit a task to the queue with rrq_task_create_expr(), returning a key for hat task

t <- rrq_task_create_expr(1 + 1)
t
#> [1] "eb9fe0d8ba5765795cb1b3b07991960d"

We’ll also need some worker processes to carry out our tasks. Here, we’ll spawn two for now (see the section below on alternatives to this)

w <- rrq_worker_spawn(2)
#> ℹ Spawning 2 workers with prefix 'wooded_grouse'

+Wait for the task to complete:

rrq_task_wait(t)
#> [1] TRUE

then retrieve the result

rrq_task_result(t)
#> [1] 2

Things to note here:

  • Every task is given a unique identifier, which can be used to query the task later (much more on this below)
  • The process with the controller is not blocked during this operation and could carry out any other calculation it wanted (it could even be closed and the controller recreated later)
  • We submitted the tasks before we had workers ready; we could have done this in the other order, and workers can be added and removed at will regardless of the state of the queue (see below)

Design

For years, the parallel package has provided users with the ability to run tasks in parallel with very little setup. Given a list of data x and some function f, one can change from the serial code

lapply(x, fun)

to run in parallel given a cluster object cl

parallel::parLapply(cl, x, fun)

(the even simpler parallel::mclapply(x, fun) can be used on platforms other than Windows with reasonable success). Nice as this is it suffers some drawbacks, most of which follow from the simple blocking interface:

  • The R session is blocked while the calculations run, even though the host session is doing (essentially) nothing
  • The number of workers (size of the cluster) is fixed at the point where parLapply has been called, and is restricted to a single node without considerable effort. One cannot add workers to the cluster while it runs, or remove unneeded ones as tasks finish.
  • It is not possible to retrieve partially completed results - we have to wait for all the tasks to complete before working with any
  • If tasks are of uneven size, the load balancing form (parallel::parLapplyLB) is quite slow

As such it is hard to build interfaces like queues or work through dependency graphs (though see heroic work in future and targets). Attempts at doing this run into issues of where do you store the data and the queue in such a way that you can safely have multiple worker processes interacting with the queue without corrupting the database or hitting race conditions. Approaches like liteq may not work on network file systems, and therefore become limited to a single node.

At the other end of the scale, HPC systems with their schedulers can avoid all these issues, but with byzantine interfaces and slow per-task submission.

Notable features of rrq which motivate its development within this landscape:

  • Uses Redis as a task broker; this provides a database to hold tasks that allows very fast network based access to many processes
  • Uses a non-blocking design where control is returned to the controller process as immediately while tasks run in the background
  • Supports a scalable worker pool; no workers are needed for tasks to be queued, and workers can be added or removed at will while tasks are running
  • Allows for multiple non-interacting queues, multiple priority levels within a queue, and automatically resolved dependencies among completing tasks
  • Supports a fully load-balanced design with overheads of around 1ms per task
  • Supports optionally running each task in a separate process, allowing for strong isolation between tasks as well as per-task timeouts, logging, and allowing cancellation
  • Allows rich querying of task status and progress and worker history
  • Exposes both low-level primitives for working with individual tasks as well as higher level interfaces that mimic functions like lapply

Running tasks

Running tasks is a little different to many R-parallel backends, because we do not directly allocate tasks to our workers, but simply place it on first-in-first-out task queue. A pool of workers will then poll for work from this queue. See vignette("design") for more on this.

Consider enqueing this expression

t <- rrq_task_create_expr({
  Sys.sleep(2)
  runif(1)
})

This has created a task that will sleep for 2 seconds then return a random number

Initially the task has status RUNNING (it will be PENDING very briefly):

rrq_task_status(t)
#> [1] "RUNNING"

Then after a couple of seconds it will complete (we pad this out here so that it will complete even on slow systems)

Sys.sleep(3)
rrq_task_result(t)
#> [1] 0.7463973

The basic task lifecycle is this:

  • when created/submitted a task becomes PENDING
  • when it is picked up by a worker it becomes RUNNING
  • when it completes it will become COMPLETE or ERROR

In addition there are rarer ways a task can end (CANCELLED, DIED, TIMEOUT) or fail to start due to dependencies between tasks (DEFERRED or IMPOSSIBLE; see below).

A task in any terminal state (except IMPOSSIBLE, so COMPLETE, ERROR, CANCELLED, DIED or TIMEOUT) can be retried, at which point the status is MOVED and the task will “point” somewhere else (that task will move through the usual PENDING -> RUNNING -> (terminal state) flow). See vignette("fault-tolerance") for details.

Above, we slept for a few seconds in order for the task to become finished. However, this is an extremely common operation, so rrq provides a function rrq_task_wait() which will wait until a task finishes, then returns a logical indicating if the task succeeded or not, after which you can fetch the result with rrq_task_result().

t <- rrq_task_create_expr({
  Sys.sleep(2)
  runif(1)
})
rrq_task_wait(t)
#> [1] TRUE
rrq_task_result(t)
#> [1] 0.1510442

The polling interval here is 1 second by default, but if the task completes within that period it will still be returned as soon as it is complete (the interval is just the time between progress bar updates and the period where an interrupt would be caught to cancel the wait).

Configuring the worker environment

It is rare that we want our workers to run in completely empty R environments (no extra loaded packages, no custom functions available). Quite often you will want to run something to configure the workers before they accept tasks.

In order to do this, first define a function that will accept one argument which will be the environment that the worker will use, and then set that environment up.

For the most common case, where you have script files that contain function definitions and you have a set of packages to load, rrq has a helper function rrq_envir().

So, for example, suppose we want to source a file “myfuns.R” which contains some code

slowdouble <- function(x) {
  Sys.sleep(x)
  x * 2
}

We might write:

create <- rrq_envir(sources = "myfuns.R")

The next step is to register this function for your queue:

rrq_worker_envir_set(create)

By default, this will notify all running workers to update their environment. Note that if your function errors in any way, your workers will exit!

rrq_worker_log_tail(n = 4)
#>         worker_id child       time  command message
#> 1 wooded_grouse_1    NA 1731321146  MESSAGE REFRESH
#> 2 wooded_grouse_2    NA 1731321146  MESSAGE REFRESH
#> 3 wooded_grouse_1    NA 1731321146    ENVIR     new
#> 4 wooded_grouse_2    NA 1731321146    ENVIR     new
#> 5 wooded_grouse_2    NA 1731321146    ENVIR  create
#> 6 wooded_grouse_1    NA 1731321146    ENVIR  create
#> 7 wooded_grouse_2    NA 1731321146 RESPONSE REFRESH
#> 8 wooded_grouse_1    NA 1731321146 RESPONSE REFRESH

Now our workers have picked up our functions we can start using them:

t <- rrq_task_create_expr(slowdouble(1))
rrq_task_wait(t)
#> [1] TRUE

If you need more control you can write your own function. We could have written create as

create <- function(env) {
  sys.source("myfuns.R", env)
}

This approach would also allow you do do something like read an rds or Rdata file containing a large object that you want every worker to have a copy of.

Scheduling options

The rrq package does not aspire to be a fully fledged scheduler, but sometimes a little more control than first-in-first-out is required. There are a few options available that allow the user to control how tasks are run when needed. These involve:

  • blocking tasks so that they run only after other tasks are completed (e.g., a set up task followed by a series of computational tasks, followed by an aggregation task)
  • multiple queues with different priorities, or that different workers subscribe to (e.g., fast/slow high-priority/low-priority tasks, or tasks that require a limited resource)
  • put tasks in the front of the queue, allowing a last-in-first-out queue

Tasks that depend on other tasks

We support a simple system for allowing tasks to depend on other tasks. An example of this might be where you need to download a file, then run a series of analyses on it. Or where you want to run an analysis over a set of parameters, and then aggregate once they’re all done. How the output of one task feeds into the others is up to you, but practically this will require one of the following options:

  • write your results to disk and read them back
  • store results in a database (e.g., Redis!) at the end of one task, read them in the next
  • connect to the queue from your task itself

When queueing a task, you can provide a vector of task identifiers as the depends_on argument. These identifiers must all be known to rrq and the task will not be started until all these prerequisites are completed. The task lifecycle will look different to the above; rather than starting as PENDING the task begins as DEFERRED.

Once all prerequisites are complete, a task becomes possible and it moves from DEFERRED to PENDING. It will be placed at the front of the queue.

If a prerequisite task fails for any reason (an error, is cancelled, or its worker dies) then the task will become IMPOSSIBLE.

For example, suppose that we have code:

create <- function(n) {
  saveRDS(runif(n), "numbers.rds")
}

use <- function(i) {
  d <- readRDS("numbers.rds")
  d[[i]]
}

Here we have some function create that we want to run first, doing some setup, then another function use that we want to run after which will read the result of running create and do some analysis on it.

Create an rrq_controller object and tell workers to read the deps.R file which contains these function definitions

obj <- rrq_controller(paste0("rrq:", ids::random_id(bytes = 4)))
rrq_default_controller_set(obj)
rrq_worker_envir_set(rrq_envir(sources = "deps.R"))
source("deps.R")

We can then enqueue our first task:

id <- rrq_task_create_expr(create(5))

Then use this id

id_use <- rrq_task_create_expr(use(2), depends_on = id)

The status of the first task will be PENDING, per usual:

rrq_task_status(id)
#> [1] "PENDING"

however, the second task will be DEFERRED because it is not yet in the queue:

rrq_task_status(id_use)
#> [1] "DEFERRED"
rrq_queue_list()
#> [1] "881ff4c71aa89111f1d293939ef815fc"

Once the first task is processed by a worker, the status changes:

rrq_task_status(id)
#> [1] "COMPLETE"
rrq_task_status(id_use)
#> [1] "PENDING"
rrq_queue_list()
#> [1] "5a34c6970257dbcb86299b6517f70222"

At this point the second task will proceed through the queue as usual.

Points to note here:

  • The deferred tasks cannot access the queued result of the blocking task (see above)
  • The deferred tasks will be added to the front of the queue after they become undeferred
  • The deferred tasks can’t be queued until the blocking tasks have returned an id

Multiple queues

Sometimes it is useful to have different workers listen on different queues. For example, you may have workers on different machines with different capabilities (e.g., a machine with a GPU or high memory). You may have tasks that are expected to take quite a long time but want some workers to monitor a fast queue with short lived tasks.

Every worker listens to the default queue, but when starting a worker, you can add additional queues and control the priority order of these queues for that worker. When submitting tasks you then specify the queue that the task sits in.

The easiest way to configure this is to save a worker configuration:

id <- paste0("rrq:", ids::random_id(bytes = 4))
obj <- rrq_controller(id)
rrq_default_controller_set(obj)
rrq_worker_config_save(
  "short",
  rrq_worker_config(queue = "short"))
rrq_worker_config_save(
  "all",
  rrq_worker_config(queue = c("short", "long")))
rrq_worker_config_list()
#> [1] "all"       "short"     "localhost"

Above, we create two configurations: “short” which just listens on the queue short, and all which listens both on the short and long task queues (note that both these workers will also listen on the default queue).

w_short <- rrq_worker_spawn(name_config = "short")
#> ℹ Spawning 1 worker with prefix 'neurotic_pharaohhound'
w_all <- rrq_worker_spawn(name_config = "all")
#> ℹ Spawning 1 worker with prefix 'hypodermal_bedlingtonterrier'

We can then submit a long task to the worker:

id_long1 <- rrq_task_create_expr(Sys.sleep(3600), queue = "long")
id_long2 <- rrq_task_create_expr(Sys.sleep(3600), queue = "long")

After the workers have had the ability to pick up work, our “short” worker is still available:

rrq_worker_status()
#>        neurotic_pharaohhound_1 hypodermal_bedlingtonterrier_1
#>                         "IDLE"                         "BUSY"

So we can submit tasks to this short queue and have them processed

id <- rrq_task_create_expr(runif(1), queue = "short")
rrq_task_wait(id, timeout = 10)
#> [1] TRUE

Note that there is no validation to check that any worker is listening on any queue when you submit a task. Indeed there can’t be as new workers can be added at any time (so at the point of submission perhaps there were no workers).

Running tasks in separate processes

Running a task in a separate process offers some additional features at a cost of a little more overhead per task.

The cost is that we have to launch an additional process for every task run. We use callr for this to smooth over a number of rough edges, but this does impose a minimum overhead of about 0.1s per task, plus the cost of loading any packages that your task might need (if you use packages that make heavy use of things like S4 classes this can easily extend to a few seconds).

The additional features that it provides are:

  • Per-task isolation: because every task runs in a separate process it works in a fresh environment and is isolated from all other tasks
  • Cancellable tasks: you can stop a running task and the worker will gracefully pick up additional work
  • Per-task timeout: you can specify the maximum running time of any task and if the task exceeds it, it will be killed
  • Per-task logs, via rrq_task_log()

The sorts of tasks that benefit from this sort of approach are typically long-running (expected running times in the 10s of seconds or more) so that the overhead is low, but also the features of cancellation and timeouts become more useful. We have also seen this used usefully where the task may leak memory, or cache results aggressively - over time this would cause the worker process to consume more memory until the worker process was killed by the operating system.

To use a separate process, add separate_process = TRUE to calls to rrq_task_create_expr(). This will then enable the argument timeout to have an effect, as well as rrq_task_cancel().

Coping with memory use

The data for each task, and the task result itself, is saved in Redis. This is alongside the typically much smaller metadata required to run rrq. Because Redis is an in memory database, this means that some things will not be a great idea; for example sending off 1000 tasks that will each write back 100 MB of simulation output would try and write 100 GB of data into the Redis database which may cause issues for your server!

To allow for this workflow, rrq supports configuring its object store (rrq::object_store) so that objects above a certain size are written out elsewhere. Currently, the only “elsewhere” supported is to disk with the assumption that the controller and all workers share a filesystem. The approach used is safe for multiple concurrent processes, including over network mounted filesystems.

This feature can be enabled by passing an offload_path to both the controller and the worker. Additionally, the offload_threshold_size should be set in the worker configuration. In the setup below, both the controller and the worker use the same offload path, but if they were running on different machines we may need to use different paths pointing at the same underlying network filesystem.

id <- paste0("rrq:", ids::random_id(bytes = 4))
path <- tempfile()
obj <- rrq_controller(id, offload_path = path)

rrq_default_controller_set(obj)
rrq_worker_config_save(
    "localhost",
    rrq_worker_config(offload_threshold_size = 1000))

w <- rrq_worker_spawn(1, offload_path = path)
#> ℹ Spawning 1 worker with prefix 'resurrective_xuanhuasaurus'

It’s not hard at all to get to 1KB of data, we can do that by simulating a big pile of random numbers:

t <- rrq_task_create_expr(runif(200000))

Once the task has finished, data will be stored on disk below the path given above:

dir(path)
#> [1] "ce165f8c71a0a26e7a79cd423677a9e9"

This keeps the larger objects out of the database.

Orchestrating workers

This vignette uses the very basic rrq_worker_spawn() method to create workers on your local machine. This is intended primarily for development only, though it may be useful in some situations. There are other options available, depending on how you want to use rrq.

Use (and limitations) of spawn

The simplest way of getting started with rrq is to use rrq_worker_spawn(), as above. This approach has several nice features; it uses callr, so no extra work is required to make the worker R session behave like the controller session (it will find your environment variables, library, and working directory), and it behaves the same way on all platforms (compare below). However, the workers will disappear when the controlling session completes (this is either a good or a bad thing) and you will be limited to a single node.

Start workers on another node, perhaps using a scheduler

There are two issues here; one is the technical details of launching your rrq workers on the cluster, and the other is the details around whether your HPC admins would like you to (and the security implications of doing so).

If you are using rrq with an HPC system, then you will want to schedule workers onto the system. The details here will change

The basic approach is to write out a launcher script somewhere:

rrq_worker_script(dest)

This can be called from the command line:

$ ./rrq_worker --help
Usage:
  rrq_worker [options] <id>

Options:
--config=NAME    Name of a worker configuration [default: localhost]
--name=NAME      Name of the worker (optional)

This is a bash script that can then be called from whatever cluster job scheduler you use. The important things to pass through are:

  • id the only positional argument, which is the queue id
  • --config=NAME allows controlling of the named worker config (set via rrq_worker_config_save(), and allowing changing of timeout, verbosity and queues)

In addition, you may need to change the configuration type. If you need to control redis access you should set the REDIS_URL environment variable to point at your Redis server.

Use docker

We provide a docker image that you can use (mrcide/rrq), though typically you would want to extend this image to include your own packages. Alternatively create your own docker image (see the main dockerfile but replace COPY . /src with an installation of rrq) and the image that sets the entrypoint to call rrq_worker.

We use rrq to orchestrate workers in web applications where a number of workers carry out long running calculations for a HTTP API written using plumber and porcelain.

Waiting for workers to appear

If you have submitted workers via a task scheduler, you might want to block and wait for them to become available. You can do this using the rrq_worker_wait() function.

We first create a vector of names for the new workers, and then tell rrq that we’re going to produce these workers.

obj <- rrq_controller(id)
rrq_default_controller_set(obj)
worker_ids <- c("cluster_1", "cluster_2")

How you then start the workers is up to you; you might start them at the command line with

rrq_worker --worker-id cluster_1 rrq:ebbd6b4e
rrq_worker --worker-id cluster_2 rrq:ebbd6b4e

(into separate terminals). Or you might queue these jobs with a cluster scheduler such as Slurm or PBS (with appropriate care over the working directory). But you can then immediately, in your R console write:

rrq_worker_wait(worker_ids, controller = obj)

and your session will block and wait for the workers to appear, erroring if they do not appear in time.

Worker heartbeat

If you use many workers, particularly on different machines, you may not notice if some disappear. Possible causes of this include:

  • Your task crashes a worker (e.g., you run some C++ code that segfaults)
  • Your worker is killed by the operating system (e.g., the system is running low on RAM and your process is targeted by the out-of-memory killer)
  • The machine reboots or shuts down
  • The redis server goes down temporarily, causing the worker to exit when writing back results

By default, if this happens when your worker is running a task, that task status will forever be stuck in RUNNING.

rrq provides a simple heartbeat process, if requested, to detect when a worker has disappeared. To do this, we run a second process on each worker that periodically writes to the Redis database on a key that will expire in a time slightly longer than that period, in effect making a dead man’s switch - see rrq::rrq_heartbeat for details.

To enable the heartbeat, save a worker configuration with the heartbeat_period set to some number of seconds. Below we use 2 seconds so that this example runs reasonably quickly, but in practice something like 60 might be slightly less load on your Redis server.

id <- paste0("rrq:", ids::random_id(bytes = 4))
obj <- rrq_controller(id)
rrq_default_controller_set(obj)
rrq_worker_config_save(
  "localhost",
  rrq_worker_config(heartbeat_period = 2))

Then, launch a worker

w <- rrq_worker_spawn(1)
#> ℹ Spawning 1 worker with prefix 'metallographic_queenslandheeler'

Our worker will print information indicating that the heartbeat is enabled (use rrq_worker_process_log())

#> [2024-11-11 10:32:31.32047] HEARTBEAT rrq:3e0e4382:worker:metallographic_queenslandheeler_1:heartbeat
#> [2024-11-11 10:32:31.573283] HEARTBEAT OK
#> [2024-11-11 10:32:31.591422] ALIVE
#> [2024-11-11 10:32:31.591815] ENVIR new
#> [2024-11-11 10:32:31.592267] QUEUE default
#>                                  __
#>                 ______________ _/ /
#>       ______   / ___/ ___/ __ `/ /_____
#>      /_____/  / /  / /  / /_/ /_/_____/
#>  ______      /_/  /_/   \__, (_)   ______
#> /_____/                   /_/     /_____/
#>     worker:        metallographic_queenslandheeler_1
#>     config:        localhost
#>     rrq_version:   0.7.20
#>     platform:      x86_64-pc-linux-gnu (64-bit)
#>     running:       Ubuntu 24.04 LTS
#>     hostname:      wpia-dide242
#>     username:      pl2113
#>     queue:         rrq:3e0e4382:queue:default
#>     wd:            /home/pl2113/Work/rrq/vignettes_src
#>     pid:           3166948
#>     redis_host:    127.0.0.1
#>     redis_port:    6379
#>     heartbeat_key: rrq:3e0e4382:worker:metallographic_queenslandheeler_1:heartbeat
#>     offload_path:  <not set>

We also have a heartbeat key here that we can inspect:

info <- rrq_worker_info()[[1]]
obj$con$EXISTS(info$heartbeat_key)
#> [1] 1
obj$con$PTTL(info$heartbeat_key) # in milliseconds
#> [1] 5882

We queue some slow job onto the worker:

t <- rrq_task_create_expr({
  Sys.sleep(10)
  runif(1)
})

Then we kill the worker:

tools::pskill(rrq_worker_info()[[1]]$pid)

Of course, immediately our key still exists:

obj$con$EXISTS(info$heartbeat_key)
#> [1] 1

but eventually it will expire:

Sys.sleep(6)
obj$con$EXISTS(info$heartbeat_key)
#> [1] 0

So far as rrq is concerned, at this point your task is still running

rrq_task_status(t)
#> [1] "RUNNING"

Handling this situation is still completely manual. You can detect lost workers jobs with:

rrq_worker_detect_exited()
#> Lost 1 worker:
#>   - metallographic_queenslandheeler_1
#> Orphaning 1 task:
#>   - 8702ce8905ea72422bdc28024b1816cd

this will also “orphan” the task

rrq_task_status(t)
#> [1] "DIED"

Any tasks that were dependent on this task will now be marked as IMPOSSIBLE.

In a future version we will support automatic re-queuing of jobs assigned to disappeared workers.

Getting a Redis server

There are several options to get started with Redis, the best one will likely depend on your platform and needs.

Use docker

(Linux, macOS with docker desktop, Windows with docker desktop)

This is how we develop rrq because it’s easy to destroy and recreate the redis instance. Start the docker redis container like:

docker run --name redis --rm -d -p 127.0.0.1:6379:6379 redis

This will listen on port 6379 which is the Redis default. You can stop the container (deleting all data) with docker stop redis

Install Redis

On Linux this is fairly straightforward, either by downloading and building the source code or by installing via apt or snap

On macOS the source will compile, or you can install a redis server via homebrew

On Windows you can install redis via WSL. There have also been various ports.

Use Redis on a different machine

If you have redis running on a different machine (this will be the case if you’re using redis to distribute tasks over a number of different machines) you will need to tell rrq and redux where to find it. The simplest way is to set the environment variable REDIS_HOST to the name of the machine if it is running with default ports, or set REDUX_URL if you need more control. Alternatively, when connecting to the server above, you can manually construct your redux::hiredis object and pass in any configuration option you need; see the documentation for redux::redis_config for details.