1
votes

I want to implement such a behavior that within single http connection server receives multiple requests, processes them and responses are passed back in order of completion.

Here is code for actix:

#[derive(Deserialize, Debug)]
struct NumberToProcess {
    x: f64,
    job_id: u64,
}

#[derive(Serialize, Debug)]
struct ProcessResult {
    x: f64,
    job_id: u64,
    started: String,
    elapsed_ms: u64,
}

#[post("/sqrt")]
async fn my_sqrt(req: web::Json<NumberToProcess>) -> impl Responder {

    let date = Local::now();
    let started = time::Instant::now();
    trace!("New task: {:?}", req);

    let mut rng = rand::thread_rng();

    /* some heavy computations */
    let f = req.x;
    thread::sleep(time::Duration::from_millis(rng.gen_range(2000u64, 4000u64)));
    let r = f.sqrt();
    /* heavy computations end */

    let res = ProcessResult {
        x: r,
        job_id: req.job_id,
        started: date.format("%Y.%m.%d %H:%M:%S").to_string(),
        elapsed_ms: started.elapsed().as_millis() as u64,
    };

    HttpResponse::Ok().json(res)
}

I test it with netcat:

nc 127.0.0.1 8080 <<EOF
POST /sqrt HTTP/1.1
Host: localhost:8080
Accept: */*
Content-Type: application/json
Content-Length: 26

{"x":1.5, "job_id": 1000}
POST /sqrt HTTP/1.1
Host: localhost:8080
Accept: */*
Content-Type: application/json
Content-Length: 26

{"x":2.5, "job_id": 1001}
POST /sqrt HTTP/1.1
Host: localhost:8080
Accept: */*
Content-Type: application/json
Content-Length: 26

{"x":3.5, "job_id": 1002}
POST /sqrt HTTP/1.1
Host: localhost:8080
Accept: */*
Content-Type: application/json
Content-Length: 26

{"x":4.5, "job_id": 1003}
EOF

The problem is that requests are processed in order, synchronously.

The result is:

HTTP/1.1 200 OK
content-length: 87
content-type: application/json
date: Mon, 31 May 2021 13:55:49 GMT

{"x":1.224744871391589,"job_id":1000,"started":"2021.05.31 16:55:49","elapsed_ms":3475}HTTP/1.1 200 OK
content-length: 88
content-type: application/json
date: Mon, 31 May 2021 13:55:49 GMT

{"x":1.5811388300841898,"job_id":1001,"started":"2021.05.31 16:55:52","elapsed_ms":3500}HTTP/1.1 200 OK
content-length: 88
content-type: application/json
date: Mon, 31 May 2021 13:55:49 GMT

{"x":1.8708286933869707,"job_id":1002,"started":"2021.05.31 16:55:56","elapsed_ms":2040}HTTP/1.1 200 OK
content-length: 88
content-type: application/json
date: Mon, 31 May 2021 13:55:49 GMT

{"x":2.1213203435596424,"job_id":1003,"started":"2021.05.31 16:55:58","elapsed_ms":2201}

Is it possible to make actix (or some other web framework) to proccess requests without waiting for completion of previous request within same connection?

1

1 Answers

1
votes

An issue with your implementation is that std::thread::sleep will block execution of the async context; instead you should use [email protected]'s tokio::time::delay_for(...).await to yield execution. If you've actually intended to use std::thread::sleep to simulate a blocking computation, then you should use something like tokio::spawn_blocking so that you don't block processing of other requests.

... and responses are passed back in order of completion.

What you have done with netcat is an example of HTTP pipelining. Even if the requests are processed concurrently, you'll always get responses back in the order that you requested them.

If you want to them to come back in the order of completion, you'll either have to make separate requests or use a client that provides HTTP/2 multiplexing. This can be done with an up-to-date curl command:

$ curl -Z \
-d '{"x":1.5,"job_id":1000}' -H 'Content-Type: application/json' -X POST http://localhost:8080/sqrt --next \
-d '{"x":2.5,"job_id":1001}' -H 'Content-Type: application/json' -X POST http://localhost:8080/sqrt --next \
-d '{"x":3.5,"job_id":1002}' -H 'Content-Type: application/json' -X POST http://localhost:8080/sqrt --next \
-d '{"x":4.5,"job_id":1003}' -H 'Content-Type: application/json' -X POST http://localhost:8080/sqrt
{"x":1.224744871391589,"job_id":1000,"started":"2021.05.31 16:32:41","elapsed_ms":2406}
{"x":1.8708286933869707,"job_id":1002,"started":"2021.05.31 16:32:44","elapsed_ms":2631}
{"x":2.1213203435596424,"job_id":1003,"started":"2021.05.31 16:32:44","elapsed_ms":2717}
{"x":1.5811388300841898,"job_id":1001,"started":"2021.05.31 16:32:44","elapsed_ms":3065}

The connection is upgraded as part of the first request, so the first one is synchronous while the following requests are all made in parallel.