Using Resque with Rust
======================


Disclaimer: I'm not a Rust expert by any means so please tell me if
there is something I can improve in the examples below. I'm not going
to explain how Rust works. If you're not familiar with it I highly
encourage you to read the Rust Book[0].

Resque[1] is a popular solution in the Ruby world to process
background jobs. The great thing with it is the fact it uses Redis as
a backend, making it easy to share jobs with workers written in other
languages.

In this article we'll see how we can write a fully functional Resque
worker in Rust. This will allow us to either use Resque entirely with
Rust, enqueue a job in Ruby and perform it with Rust or vice versa.

The full code for the following examples can be found here[2].


How does Resque work?
---------------------

Resque jobs are enqueued in a Redis list using the RPUSH command.  A
Resque job is represented internally with a JSON string containing two
keys, one for the job class name and one for its arguments (or payload
in Resque terms).

    {
        'class': 'JobClass',
        'args': [ arg1, arg2 ]
    }

Available queues are defined in a resque:queues Redis SET whose
entries are the queue name like some_queue_name.

Enqueueing a job is done by issueing a RPUSH command with a payload to
some queue.

Knowing that let's enqueue our first job from Rust.


Enqueing a Resque job
---------------------

Let's pretend we need to enqueue a Resque job in order to send a
confirmation email.

First we need to start a new project with:

    $ cargo new rust-worker --bin

First we will need to add some crates.
In your Cargo.toml add:

    [dependencies]
    rustc-serialize = "0.3.16"

    [dependencies.redis]
    git = "https://github.com/mitsuhiko/redis-rs.git"
    tag = "0.5.1"

Next let's use those crates. In our main.rs:

    extern crate redis;
    extern crate rustc_serialize;

    use redis::Commands;
    use rustc_serialize::Encodable;
    use rustc_serialize::json;

For our example we need to define a Job struct with with two fields: a
class that will be a String and args that will be a vector of Strings.

    #[derive(RustcEncodable, Debug)]
    pub struct Job {
        class: String,
        args: Vec<String>
    }

This struct needs the RustcEncodable trait so that we can encode it in
JSON and the Debug trait for printing purposes so we'll derive them.

Next, we will define an enqueue function that will take a Job as input
and return a Redis Result:

    fn enqueue(job: Job) -> redis::RedisResult<Job> {
        // Connect to a local Redis
        let client = try!(redis::Client::open("redis://127.0.0.1/"));
        let conn = try!(client.get_connection());

        // Encode our Job in JSON
        let json_job = json::encode(&job).unwrap();

        // Add our queue in resque:queues Set
        try!(conn.sadd("resque:queues", "rust_test_queue"));
        // Push our job in the resque:queue:rust_test_queue list
        try!(conn.rpush("resque:queue:rust_test_queue", json_job));

        println!("Enqueued job: {:?}", job);

        Ok(job)
    }

Finally we create and enqueue a job in our main function:

    fn main() {
        // Create a Job
        let job: Job = Job { class: "SignupEmail".to_owned(),
                             args: vec!["user@example.com".to_owned()] };

        // Enqueue our job
        match enqueue(job) {
            Ok(job) => println!("Enqueued job: {:?}", job),
            Err(_) => { /* handle failure here */ }
        }
    }

Resque screenshot[3]

Great! We successfully enqueued a job that can be performed through
Resque.

Now onto the perform part.


Performing a Resque job
-----------------------

A Resque worker will try to reserve a job by polling a queue with the
LPOP command until it gets something to perform.

Let's implement that.

We need a reserve function that will check if a job is present in a
queue:

    fn reserve() -> redis::RedisResult<()> {
        println!("--: Checking rust_test_queue");

        // Connect to a local Redis
        let client = try!(redis::Client::open("redis://127.0.0.1/"));
        let conn = try!(client.get_connection());

        // Check if a job is present in the queue
        let res = conn.lpop("resque:queue:rust_test_queue").unwrap();

        // Perform the job or return
        match res {
            Some(job) => perform(job),
            None => return Ok(()),
        }
    }

We also need to have a function that waits a few seconds to mimic
Resque behaviour:

    fn wait_a_bit() {
        println!("--: Sleeping for 5.0 seconds");
        std::thread::sleep_ms(5000);
    }

In order to perform our job, we need to decode the JSON String
retrieved from the Resque queue and then do something useful like
sending an email in our case.

    fn perform(json_job: String) -> redis::RedisResult<()> {
        println!("Found job: {:?}", json_job);

        // Decode JSON
        let job: Job = json::decode(&*json_job).unwrap();

        // Send our email with something like:
        // send_email(job.args.first());
        // not implemented here.

        Ok(())
    }

Our Job struct must derive the RustcDecodable trait to be
decodable. I'm decoding &*json_job since json::decode expects a &str
and not a String.

Now let's update our main() function to enqueue a job, perform it and
wait for other jobs to come.

    fn main() {
        let job: Job = Job { class: "SignupEmail".to_owned(),
                             args: vec!["user@example.com".to_owned()] };

        enqueue(job).unwrap();

        loop {
            reserve().unwrap();
            wait_a_bit();
        }
    }

Time to try our worker.

    $ cargo build
       Compiling rust-resque-example v0.1.0 (file:///Users/julien/Code/rust-resque-example)

    $ cargo run
         Running `target/debug/rust-resque-example`
    Enqueued job: Job { class: "SignupEmail", args: ["user@example.com"] }
    --: Checking rust_test_queue
    Found job: Job { class: "SignupEmail", args: ["user@example.com"] }
    --: Checking rust_test_queue
    --: Sleeping for 5.0 seconds
    --: Checking rust_test_queue
    --: Sleeping for 5.0 seconds

And it works! We successfully enqueued and performed a job in Resque
from Rust.


What's missing?
---------------

Our implementation still needs to be Resque web compatible (display
workers, failed jobs...). It also needs to be deployed alongside our
Ruby workers but that's for another article :)

Thanks a lot to the reviewers: Flavien, Marc, Steve & Yohan.


[0] https://doc.rust-lang.org/book/
[1] https://github.com/blog/542-introducing-resque
[2] https://github.com/julienXX/rust-resque-example
[3] http://i.imgbox.com/GmeAjSnN.png

-------

Last update: 29 October, 2015