--- title: "rrq" output: rmarkdown::html_vignette vignette: > %\VignetteIndexEntry{rrq} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ## Getting started This document assumes that that you have a Redis server running. If not, see the bottom of the document for options you will have for installing this on your own system. You can test if your Redis server is behaving as expected by running ```r redux::hiredis()$PING() #> [Redis: PONG] ``` If you get an error like "Connection refused" then check your installation. The package is designed to be easy to get started with, and has features that you might like to use later. If you run the "Hello world" section you probably have 90% of what you need - the result of the document will show features that will help you bend that around your specific needs. ## Hello world Without any great explanation, here is the basic approach to using rrq to run a task on another R process, asynchronously. First, we create a "controller" object which you can use for queuing tasks. ```r library(rrq) id <- paste0("rrq:", ids::random_id(bytes = 4)) obj <- rrq_controller(id) ``` This controller uses an "identifier" (here, `id` is `rrq:babc4f67`) which can be anything you want but acts like a folder within the Redis server, distinguishing your queue from any others hosted on the same server. We will set this a default controller to use, which means we can avoid passing in `controller = obj` to all the calls below: ```r rrq_default_controller_set(obj) ``` Submit a task to the queue with `rrq_task_create_expr()`, returning a key for hat task ```r t <- rrq_task_create_expr(1 + 1) t #> [1] "5937a54f2c87e51b4e69e4a6024b2653" ``` We'll also need some worker processes to carry out our tasks. Here, we'll spawn two for now (see the section below on alternatives to this) ```r w <- rrq_worker_spawn(2) #> ℹ Spawning 2 workers with prefix 'pokeable_auklet' ``` +Wait for the task to complete: ```r rrq_task_wait(t) #> [1] TRUE ``` then retrieve the result ```r rrq_task_result(t) #> [1] 2 ``` Things to note here: * Every task is given a unique identifier, which can be used to query the task later (much more on this below) * The process with the controller is not blocked during this operation and could carry out any other calculation it wanted (it could even be closed and the controller recreated later) * We submitted the tasks before we had workers ready; we could have done this in the other order, and workers can be added and removed at will regardless of the state of the queue (see below) ## Design For years, the `parallel` package has provided users with the ability to run tasks in parallel with very little setup. Given a list of data `x` and some function `f`, one can change from the serial code ```r lapply(x, fun) ``` to run in parallel given a cluster object `cl` ```r parallel::parLapply(cl, x, fun) ``` (the even simpler `parallel::mclapply(x, fun)` can be used on platforms other than Windows with reasonable success). Nice as this is it suffers some drawbacks, most of which follow from the simple blocking interface: * The R session is blocked while the calculations run, even though the host session is doing (essentially) nothing * The number of workers (size of the cluster) is fixed at the point where `parLapply` has been called, and is restricted to a single node without considerable effort. One cannot add workers to the cluster while it runs, or remove unneeded ones as tasks finish. * It is not possible to retrieve partially completed results - we have to wait for all the tasks to complete before working with any * If tasks are of uneven size, the load balancing form (`parallel::parLapplyLB`) is quite slow As such it is hard to build interfaces like queues or work through dependency graphs (though see heroic work in [future](https://future.futureverse.org/) and [targets](https://docs.ropensci.org/targets/)). Attempts at doing this run into issues of where do you store the data and the queue in such a way that you can safely have multiple worker processes interacting with the queue without corrupting the database or hitting race conditions. Approaches like [liteq](https://cran.r-project.org/package=liteq) may not work on network file systems, and therefore become limited to a single node. At the other end of the scale, HPC systems with their schedulers can avoid all these issues, but with byzantine interfaces and slow per-task submission. Notable features of `rrq` which motivate its development within this landscape: * Uses [Redis](https://redis.io) as a task broker; this provides a database to hold tasks that allows very fast network based access to many processes * Uses a non-blocking design where control is returned to the controller process as immediately while tasks run in the background * Supports a scalable worker pool; no workers are needed for tasks to be queued, and workers can be added or removed at will while tasks are running * Allows for multiple non-interacting queues, multiple priority levels within a queue, and automatically resolved dependencies among completing tasks * Supports a fully load-balanced design with overheads of around 1ms per task * Supports optionally running each task in a separate process, allowing for strong isolation between tasks as well as per-task timeouts, logging, and allowing cancellation * Allows rich querying of task status and progress and worker history * Exposes both low-level primitives for working with individual tasks as well as higher level interfaces that mimic functions like `lapply` ## Running tasks Running tasks is a little different to many R-parallel backends, because we do not directly allocate tasks to our workers, but simply place it on first-in-first-out task queue. A pool of workers will then poll for work from this queue. See `vignette("design")` for more on this. Consider enqueing this expression ```r t <- rrq_task_create_expr({ Sys.sleep(2) runif(1) }) ``` This has created a task that will sleep for 2 seconds then return a random number Initially the task has status `RUNNING` (it will be `PENDING` *very* briefly): ```r rrq_task_status(t) #> [1] "RUNNING" ``` Then after a couple of seconds it will complete (we pad this out here so that it will complete even on slow systems) ```r Sys.sleep(3) rrq_task_result(t) #> [1] 0.2921383 ``` The basic task lifecycle is this: * when created/submitted a task becomes `PENDING` * when it is picked up by a worker it becomes `RUNNING` * when it completes it will become `COMPLETE` or `ERROR` In addition there are rarer ways a task can end (`CANCELLED`, `DIED`, `TIMEOUT`) or fail to start due to dependencies between tasks (`DEFERRED` or `IMPOSSIBLE`; see below). A task in any terminal state (except `IMPOSSIBLE`, so `COMPLETE`, `ERROR`, `CANCELLED`, `DIED` or `TIMEOUT`) can be retried, at which point the status is `MOVED` and the task will "point" somewhere else (that task will move through the usual `PENDING -> RUNNING -> (terminal state)` flow). See `vignette("fault-tolerance")` for details. Above, we slept for a few seconds in order for the task to become finished. However, this is an extremely common operation, so `rrq` provides a function `rrq_task_wait()` which will wait until a task finishes, then returns a logical indicating if the task succeeded or not, after which you can fetch the result with `rrq_task_result()`. ```r t <- rrq_task_create_expr({ Sys.sleep(2) runif(1) }) rrq_task_wait(t) #> [1] TRUE rrq_task_result(t) #> [1] 0.6660437 ``` The polling interval here is 1 second by default, but if the task completes within that period it will still be returned as soon as it is complete (the interval is just the time between progress bar updates and the period where an interrupt would be caught to cancel the wait). ## Configuring the worker environment It is rare that we want our workers to run in completely empty R environments (no extra loaded packages, no custom functions available). Quite often you will want to run something to configure the workers before they accept tasks. In order to do this, first define a function that will accept one argument which will be the environment that the worker will use, and then set that environment up. For the most common case, where you have script files that contain function definitions and you have a set of packages to load, rrq has a helper function `rrq_envir()`. So, for example, suppose we want to source a file "myfuns.R" which contains some code ```r slowdouble <- function(x) { Sys.sleep(x) x * 2 } ``` We might write: ```r create <- rrq_envir(sources = "myfuns.R") ``` The next step is to register this function for your queue: ```r rrq_worker_envir_set(create) ``` By default, this will notify all running workers to update their environment. Note that if your function errors in any way, your workers will exit! ```r rrq_worker_log_tail(n = 4) #> worker_id child time command message #> 1 pokeable_auklet_1 NA 1723025925 MESSAGE REFRESH #> 2 pokeable_auklet_2 NA 1723025925 MESSAGE REFRESH #> 3 pokeable_auklet_1 NA 1723025925 ENVIR new #> 4 pokeable_auklet_2 NA 1723025925 ENVIR new #> 5 pokeable_auklet_1 NA 1723025925 ENVIR create #> 6 pokeable_auklet_2 NA 1723025925 ENVIR create #> 7 pokeable_auklet_1 NA 1723025925 RESPONSE REFRESH #> 8 pokeable_auklet_2 NA 1723025925 RESPONSE REFRESH ``` Now our workers have picked up our functions we can start using them: ```r t <- rrq_task_create_expr(slowdouble(1)) rrq_task_wait(t) #> [1] TRUE ``` If you need more control you can write your own function. We could have written `create` as ```r create <- function(env) { sys.source("myfuns.R", env) } ``` This approach would also allow you do do something like read an rds or Rdata file containing a large object that you want every worker to have a copy of. ## Scheduling options The `rrq` package does not aspire to be a fully fledged scheduler, but sometimes a little more control than first-in-first-out is required. There are a few options available that allow the user to control how tasks are run when needed. These involve: * blocking tasks so that they run only after other tasks are completed (e.g., a set up task followed by a series of computational tasks, followed by an aggregation task) * multiple queues with different priorities, or that different workers subscribe to (e.g., fast/slow high-priority/low-priority tasks, or tasks that require a limited resource) * put tasks in the front of the queue, allowing a last-in-first-out queue ### Tasks that depend on other tasks We support a simple system for allowing tasks to depend on other tasks. An example of this might be where you need to download a file, then run a series of analyses on it. Or where you want to run an analysis over a set of parameters, and then aggregate once they're all done. How the output of one task feeds into the others is up to you, but practically this will require one of the following options: * write your results to disk and read them back * store results in a database (e.g., Redis!) at the end of one task, read them in the next * connect to the queue from your task itself When queueing a task, you can provide a vector of task identifiers as the `depends_on` argument. These identifiers must all be known to `rrq` and the task will not be started until all these prerequisites are completed. The task lifecycle will look different to the above; rather than starting as `PENDING` the task begins as `DEFERRED`. Once all prerequisites are complete, a task becomes possible and it moves from `DEFERRED` to `PENDING`. It will be placed at the *front* of the queue. If a prerequisite task fails for any reason (an error, is cancelled, or its worker dies) then the task will become `IMPOSSIBLE`. For example, suppose that we have code: ```r create <- function(n) { saveRDS(runif(n), "numbers.rds") } use <- function(i) { d <- readRDS("numbers.rds") d[[i]] } ``` Here we have some function `create` that we want to run first, doing some setup, then another function `use` that we want to run after which will read the result of running `create` and do some analysis on it. Create an `rrq_controller` object and tell workers to read the `deps.R` file which contains these function definitions ```r obj <- rrq_controller(paste0("rrq:", ids::random_id(bytes = 4))) rrq_default_controller_set(obj) rrq_worker_envir_set(rrq_envir(sources = "deps.R")) source("deps.R") ``` We can then enqueue our first task: ```r id <- rrq_task_create_expr(create(5)) ``` Then use this `id` ```r id_use <- rrq_task_create_expr(use(2), depends_on = id) ``` The status of the first task will be `PENDING`, per usual: ```r rrq_task_status(id) #> [1] "PENDING" ``` however, the second task will be `DEFERRED` because it is not yet in the queue: ```r rrq_task_status(id_use) #> [1] "DEFERRED" rrq_queue_list() #> [1] "e8f2f868119f74eb2fced249e886aed4" ``` Once the first task is processed by a worker, the status changes: ```r rrq_task_status(id) #> [1] "COMPLETE" rrq_task_status(id_use) #> [1] "PENDING" rrq_queue_list() #> [1] "4c44b980e6dd43b9cb0a3130ff781fd6" ``` At this point the second task will proceed through the queue as usual. Points to note here: * The deferred tasks cannot access the queued result of the blocking task (see above) * The deferred tasks will be added to the *front* of the queue after they become undeferred * The deferred tasks can't be queued until the blocking tasks have returned an id ### Multiple queues Sometimes it is useful to have different workers listen on different queues. For example, you may have workers on different machines with different capabilities (e.g., a machine with a GPU or high memory). You may have tasks that are expected to take quite a long time but want some workers to monitor a fast queue with short lived tasks. Every worker listens to the `default` queue, but when starting a worker, you can add additional queues and control the priority order of these queues for that worker. When submitting tasks you then specify the queue that the task sits in. The easiest way to configure this is to save a worker configuration: ```r id <- paste0("rrq:", ids::random_id(bytes = 4)) obj <- rrq_controller(id) rrq_default_controller_set(obj) rrq_worker_config_save( "short", rrq_worker_config(queue = "short")) rrq_worker_config_save( "all", rrq_worker_config(queue = c("short", "long"))) rrq_worker_config_list() #> [1] "short" "all" "localhost" ``` Above, we create two configurations: "short" which just listens on the queue `short`, and `all` which listens both on the short and long task queues (note that both these workers will also listen on the default queue). ```r w_short <- rrq_worker_spawn(name_config = "short") #> ℹ Spawning 1 worker with prefix 'astronomical_izuthrush' w_all <- rrq_worker_spawn(name_config = "all") #> ℹ Spawning 1 worker with prefix 'damaging_platypus' ``` We can then submit a long task to the worker: ```r id_long1 <- rrq_task_create_expr(Sys.sleep(3600), queue = "long") id_long2 <- rrq_task_create_expr(Sys.sleep(3600), queue = "long") ``` After the workers have had the ability to pick up work, our "short" worker is still available: ```r rrq_worker_status() #> astronomical_izuthrush_1 damaging_platypus_1 #> "IDLE" "BUSY" ``` So we can submit tasks to this short queue and have them processed ```r id <- rrq_task_create_expr(runif(1), queue = "short") rrq_task_wait(id, timeout = 10) #> [1] TRUE ``` Note that there is no validation to check that any worker is listening on any queue when you submit a task. Indeed there can't be as new workers can be added at any time (so at the point of submission perhaps there were no workers). ## Running tasks in separate processes Running a task in a separate process offers some additional features at a cost of a little more overhead per task. The cost is that we have to launch an additional process for every task run. We use [`callr`](https://callr.r-lib.org/) for this to smooth over a number of rough edges, but this does impose a minimum overhead of about 0.1s per task, plus the cost of loading any packages that your task might need (if you use packages that make heavy use of things like S4 classes this can easily extend to a few seconds). The additional features that it provides are: * Per-task isolation: because every task runs in a separate process it works in a fresh environment and is isolated from all other tasks * Cancellable tasks: you can stop a running task and the worker will gracefully pick up additional work * Per-task timeout: you can specify the maximum running time of any task and if the task exceeds it, it will be killed * Per-task logs, via `rrq_task_log()` The sorts of tasks that benefit from this sort of approach are typically long-running (expected running times in the 10s of seconds or more) so that the overhead is low, but also the features of cancellation and timeouts become more useful. We have also seen this used usefully where the task may leak memory, or cache results aggressively - over time this would cause the worker process to consume more memory until the worker process was killed by the operating system. To use a separate process, add `separate_process = TRUE` to calls to `rrq_task_create_expr()`. This will then enable the argument `timeout` to have an effect, as well as `rrq_task_cancel()`. ## Coping with memory use The data for each task, and the task result itself, is saved in Redis. This is alongside the typically much smaller metadata required to run rrq. Because Redis is an [in memory](https://redis.io/topics/faq) database, this means that some things will not be a great idea; for example sending off 1000 tasks that will each write back 100 MB of simulation output would try and write 100 GB of data into the Redis database which may cause issues for your server! To allow for this workflow, `rrq` supports configuring its object store (`rrq::object_store`) so that objects above a certain size are written out elsewhere. Currently, the only "elsewhere" supported is to disk with the assumption that the controller and all workers share a filesystem. The approach used is safe for multiple concurrent processes, including over network mounted filesystems. To configure this you must use `rrq_configure()` *before attaching either a controller or a worker to the queue*. The configuration interface here will change in future, but we will maintain backward compatibility with the current options. To configure storage so that every object that is greater than 1KB is saved to disk, you could write: ```r id <- paste0("rrq:", ids::random_id(bytes = 4)) path <- tempfile() rrq_configure(id, store_max_size = 1000, offload_path = path) ``` then we create the queue object as normal (and spawn a worker so that we can use it) ```r obj <- rrq_controller(id) rrq_default_controller_set(obj) w <- rrq_worker_spawn(1) #> ℹ Spawning 1 worker with prefix 'antipacifistic_germanshorthairedpointer' ``` It's not hard at all to get to 1KB of data, we can do that by simulating a big pile of random numbers: ```r t <- rrq_task_create_expr(runif(200000)) ``` Once the task has finished, data will be stored on disk below the path given above: ```r dir(path) #> [1] "45d4ac0f45efd78b5333a07e5aef0fa1" ``` This keeps the larger objects out of the database. ## Orchestrating workers This vignette uses the very basic `rrq_worker_spawn()` method to create workers on your local machine. This is intended primarily for development only, though it may be useful in some situations. There are other options available, depending on how you want to use `rrq`. ### Use (and limitations) of spawn The simplest way of getting started with `rrq` is to use `rrq_worker_spawn()`, as above. This approach has several nice features; it uses `callr`, so no extra work is required to make the worker R session behave like the controller session (it will find your environment variables, library, and working directory), and it behaves the same way on all platforms (compare below). However, the workers will disappear when the controlling session completes (this is either a good or a bad thing) and you will be limited to a single node. ### Start workers on another node, perhaps using a scheduler There are two issues here; one is the technical details of launching your rrq workers on the cluster, and the other is the details around whether your HPC admins would like you to (and the security implications of doing so). If you are using rrq with an HPC system, then you will want to schedule workers onto the system. *The details here will change* The basic approach is to write out a launcher script somewhere: ```r rrq_worker_script(dest) ``` This can be called from the command line: ``` $ ./rrq_worker --help Usage: rrq_worker [options] Options: --config=NAME Name of a worker configuration [default: localhost] --name=NAME Name of the worker (optional) ``` This is a bash script that can then be called from whatever cluster job scheduler you use. The important things to pass through are: * `id` the only positional argument, which is the queue id * `--config=NAME` allows controlling of the named worker config (set via `rrq_worker_config_save()`, and allowing changing of timeout, verbosity and queues) In addition, you may need to change the configuration type. If you need to control redis access you should set the `REDIS_URL` environment variable to point at your Redis server. ### Use docker We provide a docker image that you can use (`mrcide/rrq`), though typically you would want to extend this image to include your own packages. Alternatively create your own docker image (see [the main dockerfile](https://github.com/mrc-ide/rrq/blob/master/docker/Dockerfile) but replace `COPY . /src` with an installation of rrq) and the [image that sets the entrypoint to call `rrq_worker`](https://github.com/mrc-ide/rrq/blob/master/docker/Dockerfile.worker). We use rrq to orchestrate workers in web applications where a number of workers carry out long running calculations for a HTTP API written using [plumber](https://www.rplumber.io/) and [porcelain](https://reside-ic.github.io/porcelain/). ## Waiting for workers to appear If you have submitted workers via a task scheduler, you might want to block and wait for them to become available. You can do this using the `rrq_worker_wait()` function. We first create a vector of names for the new workers, and then tell `rrq` that we're going to produce these workers. ```r obj <- rrq_controller(id) rrq_default_controller_set(obj) worker_ids <- c("cluster_1", "cluster_2") ``` How you then start the workers is up to you; you might start them at the command line with ``` rrq_worker --worker-id cluster_1 rrq:46f706a3 rrq_worker --worker-id cluster_2 rrq:46f706a3 ``` (into separate terminals). Or you might queue these jobs with a cluster scheduler such as Slurm or PBS (with appropriate care over the working directory). But you can then immediately, in your R console write: ```r rrq_worker_wait(worker_ids, controller = obj) ``` and your session will block and wait for the workers to appear, erroring if they do not appear in time. ## Worker heartbeat If you use many workers, particularly on different machines, you may not notice if some disappear. Possible causes of this include: * Your task crashes a worker (e.g., you run some C++ code that segfaults) * Your worker is killed by the operating system (e.g., the system is running low on RAM and your process is targeted by the out-of-memory killer) * The machine reboots or shuts down * The redis server goes down temporarily, causing the worker to exit when writing back results By default, if this happens when your worker is running a task, that task status will forever be stuck in `RUNNING`. `rrq` provides a simple heartbeat process, if requested, to detect when a worker has disappeared. To do this, we run a second process on each worker that periodically writes to the Redis database on a key that will expire in a time slightly longer than that period, in effect making a [dead man's switch](https://en.wikipedia.org/wiki/Dead_man%27s_switch) - see `rrq::rrq_heartbeat` for details. To enable the heartbeat, save a worker configuration with the `heartbeat_period` set to some number of seconds. Below we use 2 seconds so that this example runs reasonably quickly, but in practice something like 60 might be slightly less load on your Redis server. ```r id <- paste0("rrq:", ids::random_id(bytes = 4)) obj <- rrq_controller(id) rrq_default_controller_set(obj) res <- rrq_worker_config_save( "localhost", rrq_worker_config(heartbeat_period = 2)) ``` Then, launch a worker ```r w <- rrq_worker_spawn(1) #> ℹ Spawning 1 worker with prefix 'taxidermic_americangoldfinch' ``` Our worker will print information indicating that the heartbeat is enabled (use `rrq_worker_process_log()`) ``` #> [2024-08-07 11:18:50.341996] HEARTBEAT rrq:b5c90261:worker:taxidermic_americangoldfinch_1:heartbeat #> [2024-08-07 11:18:50.581563] HEARTBEAT OK #> [2024-08-07 11:18:50.596252] ALIVE #> [2024-08-07 11:18:50.596533] ENVIR new #> [2024-08-07 11:18:50.59684] QUEUE default #> __ #> ______________ _/ / #> ______ / ___/ ___/ __ `/ /_____ #> /_____/ / / / / / /_/ /_/_____/ #> ______ /_/ /_/ \__, (_) ______ #> /_____/ /_/ /_____/ #> worker: taxidermic_americangoldfinch_1 #> config: localhost #> rrq_version: 0.7.17 #> platform: x86_64-pc-linux-gnu #> running: Ubuntu 20.04.6 LTS #> hostname: wpia-dide300 #> username: rfitzjoh #> queue: rrq:b5c90261:queue:default #> wd: /home/rfitzjoh/Documents/src/rrq/vignettes_src #> pid: 2720190 #> redis_host: 127.0.0.1 #> redis_port: 6379 #> heartbeat_key: rrq:b5c90261:worker:taxidermic_americangoldfinch_1:heartbeat ``` We also have a heartbeat key here that we can inspect: ```r info <- rrq_worker_info()[[1]] obj$con$EXISTS(info$heartbeat_key) #> [1] 1 obj$con$PTTL(info$heartbeat_key) # in milliseconds #> [1] 5829 ``` We queue some slow job onto the worker: ```r t <- rrq_task_create_expr({ Sys.sleep(10) runif(1) }) ``` Then we kill the worker: ```r tools::pskill(rrq_worker_info()[[1]]$pid) ``` Of course, immediately our key still exists: ```r obj$con$EXISTS(info$heartbeat_key) #> [1] 1 ``` but eventually it will expire: ```r Sys.sleep(6) obj$con$EXISTS(info$heartbeat_key) #> [1] 0 ``` So far as `rrq` is concerned, at this point your task is still running ```r rrq_task_status(t) #> [1] "RUNNING" ``` Handling this situation is still completely manual. You can detect lost workers jobs with: ```r rrq_worker_detect_exited() #> Lost 1 worker: #> - taxidermic_americangoldfinch_1 #> Orphaning 1 task: #> - f88a6a235f8dbd9627e3e6862daae49b ``` this will also "orphan" the task ```r rrq_task_status(t) #> [1] "DIED" ``` Any tasks that were dependent on this task will now be marked as `IMPOSSIBLE`. In a future version we will support automatic re-queuing of jobs assigned to disappeared workers. ## Getting a Redis server There are several options to get started with Redis, the best one will likely depend on your platform and needs. ### Use docker (Linux, macOS with [docker desktop](https://docs.docker.com/docker-for-mac/install/), Windows with [docker desktop](https://docs.docker.com/docker-for-windows/install/)) This is how we develop rrq because it's easy to destroy and recreate the redis instance. Start the docker redis container like: ``` docker run --name redis --rm -d -p 127.0.0.1:6379:6379 redis ``` This will listen on port 6379 which is the Redis default. You can stop the container (deleting all data) with `docker stop redis` ### Install Redis On Linux this is fairly straightforward, either by [downloading and building the source code](https://redis.io/download) or by installing via `apt` or `snap` On macOS the source will compile, or you can install a redis server via homebrew On Windows you can [install redis via WSL](https://redis.com/blog/redis-on-windows-10/). There have also been various ports. ### Use Redis on a different machine If you have redis running on a different machine (this will be the case if you're using redis to distribute tasks over a number of different machines) you will need to tell `rrq` and `redux` where to find it. The simplest way is to set the environment variable `REDIS_HOST` to the name of the machine if it is running with default ports, or set `REDUX_URL` if you need more control. Alternatively, when connecting to the server above, you can manually construct your `redux::hiredis` object and pass in any configuration option you need; see the documentation for `redux::redis_config` for details.