From ee99d5a3d1c8ceee5b84e746beb4a3cab50675e0 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Fri, 6 Jan 2012 17:26:13 -0800 Subject: [PATCH] Request queue The request queue abstraction intends to provide an interface usable by asynchronous I/O libraries, without making assumptions about their interfaces. --- Makefile | 2 +- ngx-queue.h | 106 ++++++++++++ request.c | 214 ++++++++++++++++++++++++ request.h | 130 +++++++++++++++ test/Makefile | 2 +- test/test-request.c | 394 ++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 846 insertions(+), 2 deletions(-) create mode 100644 ngx-queue.h create mode 100644 request.c create mode 100644 request.h create mode 100644 test/test-request.c diff --git a/Makefile b/Makefile index 2838e0a..33ec7cd 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ include ./Makefile.common -OBJ=net.o hiredis.o sds.o async.o parser.o object.o handle.o format.o context.o address.o +OBJ=net.o hiredis.o sds.o async.o parser.o object.o handle.o format.o context.o address.o request.o BINS=hiredis-example hiredis-test all: $(DYLIBNAME) $(BINS) diff --git a/ngx-queue.h b/ngx-queue.h new file mode 100644 index 0000000..8c5e461 --- /dev/null +++ b/ngx-queue.h @@ -0,0 +1,106 @@ + +/* + * Copyright (C) Igor Sysoev + */ + + +#ifndef _NGX_QUEUE_H_INCLUDED_ +#define _NGX_QUEUE_H_INCLUDED_ + + +typedef struct ngx_queue_s ngx_queue_t; + +struct ngx_queue_s { + ngx_queue_t *prev; + ngx_queue_t *next; +}; + + +#define ngx_queue_init(q) \ + (q)->prev = q; \ + (q)->next = q + + +#define ngx_queue_empty(h) \ + (h == (h)->prev) + + +#define ngx_queue_insert_head(h, x) \ + (x)->next = (h)->next; \ + (x)->next->prev = x; \ + (x)->prev = h; \ + (h)->next = x + + +#define ngx_queue_insert_after ngx_queue_insert_head + + +#define ngx_queue_insert_tail(h, x) \ + (x)->prev = (h)->prev; \ + (x)->prev->next = x; \ + (x)->next = h; \ + (h)->prev = x + + +#define ngx_queue_head(h) \ + (h)->next + + +#define ngx_queue_last(h) \ + (h)->prev + + +#define ngx_queue_sentinel(h) \ + (h) + + +#define ngx_queue_next(q) \ + (q)->next + + +#define ngx_queue_prev(q) \ + (q)->prev + + +#if (NGX_DEBUG) + +#define ngx_queue_remove(x) \ + (x)->next->prev = (x)->prev; \ + (x)->prev->next = (x)->next; \ + (x)->prev = NULL; \ + (x)->next = NULL + +#else + +#define ngx_queue_remove(x) \ + (x)->next->prev = (x)->prev; \ + (x)->prev->next = (x)->next + +#endif + + +#define ngx_queue_split(h, q, n) \ + (n)->prev = (h)->prev; \ + (n)->prev->next = n; \ + (n)->next = q; \ + (h)->prev = (q)->prev; \ + (h)->prev->next = h; \ + (q)->prev = n; + + +#define ngx_queue_add(h, n) \ + (h)->prev->next = (n)->next; \ + (n)->next->prev = (h)->prev; \ + (h)->prev = (n)->prev; \ + (h)->prev->next = h; + + +#define ngx_queue_data(q, type, link) \ + (type *) ((unsigned char *) q - offsetof(type, link)) + + +#define ngx_queue_foreach(q, h) \ + for ((q) = ngx_queue_head(h); (q) != (h); (q) = ngx_queue_next(q)) + + +#endif /* _NGX_QUEUE_H_INCLUDED_ */ diff --git a/request.c b/request.c new file mode 100644 index 0000000..2333636 --- /dev/null +++ b/request.c @@ -0,0 +1,214 @@ +#include +#include +#include + +#include "handle.h" /* return values */ +#include "request.h" + +int redis_request_init(redis_request *self) { + memset(self, 0, sizeof(*self)); + return REDIS_OK; +} + +int redis_request_destroy(redis_request *self) { + ((void) self); + return REDIS_OK; +} + +int redis_request_queue_init(redis_request_queue *self) { + memset(self, 0, sizeof(*self)); + ngx_queue_init(&self->request_to_write); + ngx_queue_init(&self->request_wait_write); + ngx_queue_init(&self->request_wait_read); + redis_parser_init(&self->parser, NULL); + return REDIS_OK; +} + +void redis__free_queue(ngx_queue_t *h) { + ngx_queue_t *q; + redis_request *req; + + ngx_queue_foreach(q, h) { + ngx_queue_remove(q); + req = ngx_queue_data(q, redis_request, queue); + req->free(req); + } +} + +int redis_request_queue_destroy(redis_request_queue *self) { + redis__free_queue(&self->request_to_write); + redis__free_queue(&self->request_wait_write); + redis__free_queue(&self->request_wait_read); + redis_parser_destroy(&self->parser); + return REDIS_OK; +} + +int redis_request_queue_insert(redis_request_queue *self, redis_request *request) { + ngx_queue_insert_head(&self->request_to_write, &request->queue); + + if (self->request_to_write_cb) { + self->request_to_write_cb(self, request); + } + + return REDIS_OK; +} + +redis_request *redis__request_queue_move(ngx_queue_t *a, ngx_queue_t *b) { + ngx_queue_t *q; + + /* Unable to move requests when there are none... */ + if (ngx_queue_empty(a)) { + return NULL; + } + + q = ngx_queue_last(a); + ngx_queue_remove(q); + ngx_queue_insert_head(b, q); + return ngx_queue_data(q, redis_request, queue); +} + +redis_request *redis__request_queue_pop_to_write(redis_request_queue *self) { + redis_request *req; + + req = redis__request_queue_move(&self->request_to_write, + &self->request_wait_write); + + if (req && self->request_wait_write_cb) { + self->request_wait_write_cb(self, req); + } + + return req; +} + +int redis_request_queue_write_ptr(redis_request_queue *self, const char **buf, size_t *len) { + ngx_queue_t *q; + redis_request *req; + + /* We need at least one element in the wait_write queue */ + if (ngx_queue_empty(&self->request_wait_write)) { + if (redis__request_queue_pop_to_write(self) == NULL) { + return -1; + } + } + + while (1) { + assert(!ngx_queue_empty(&self->request_wait_write)); + q = ngx_queue_head(&self->request_wait_write); + req = ngx_queue_data(q, redis_request, queue); + + const char *auxbuf = NULL; + size_t auxlen = 0; + + assert(req->write_ptr); + req->write_ptr(req, &auxbuf, &auxlen); + if (auxbuf == NULL) { + if (redis__request_queue_pop_to_write(self) == NULL) { + return -1; + } + + continue; + } + + *buf = auxbuf; + *len = auxlen; + break; + } + + return 0; +} + +redis_request *redis__request_queue_pop_wait_write(redis_request_queue *self) { + redis_request *req; + + req = redis__request_queue_move(&self->request_wait_write, + &self->request_wait_read); + + if (req && self->request_wait_read_cb) { + self->request_wait_read_cb(self, req); + } + + return req; +} + +int redis_request_queue_write_cb(redis_request_queue *self, int n) { + ngx_queue_t *q; + redis_request *req; + int wrote; + + /* We need at least one element in the wait_read queue */ + if (ngx_queue_empty(&self->request_wait_read)) { + if (redis__request_queue_pop_wait_write(self) == NULL) { + /* The request cannot be NULL: it emitted bytes to write and should + * be present in the wait_write queue, waiting for a callback. */ + return -1; + } + } + + while (n) { + assert(!ngx_queue_empty(&self->request_wait_read)); + q = ngx_queue_head(&self->request_wait_read); + req = ngx_queue_data(q, redis_request, queue); + + assert(req->write_cb); + wrote = req->write_cb(req, n); + if (wrote == 0) { + if (redis__request_queue_pop_wait_write(self) == NULL) { + return -1; + } + + continue; + } + + assert(wrote <= n); + n -= wrote; + } + + assert(n == 0); + return 0; +} + +int redis_request_queue_read_cb(redis_request_queue *self, const char *buf, size_t len) { + ngx_queue_t *q; + redis_request *req; + redis_protocol *p; + size_t nparsed; + + while (len) { + p = NULL; + nparsed = redis_parser_execute(&self->parser, &p, buf, len); + + /* Test for parse error */ + if (nparsed < len && p == NULL) { + errno = redis_parser_err(&self->parser); + return REDIS_EPARSER; + } + + if (!ngx_queue_empty(&self->request_wait_read)) { + q = ngx_queue_last(&self->request_wait_read); + req = ngx_queue_data(q, redis_request, queue); + + /* Fire raw read callback when defined */ + if (req->read_raw_cb) { + req->read_raw_cb(req, buf, nparsed); + } + + /* Fire read callback when the parser produced something */ + if (p != NULL) { + int done = 0; + + req->read_cb(req, p, &done); + if (done) { + ngx_queue_remove(q); + req->free(req); + } + } + } else { + assert(NULL && "todo"); + } + + buf += nparsed; + len -= nparsed; + } + + return REDIS_OK; +} diff --git a/request.h b/request.h new file mode 100644 index 0000000..dcb989f --- /dev/null +++ b/request.h @@ -0,0 +1,130 @@ +#ifndef HIREDIS_REQUEST_H +#define HIREDIS_REQUEST_H 1 + +#include /* offsetof */ +#include "ngx-queue.h" +#include "parser.h" + +typedef struct redis_request_s redis_request; + +/* + * Obtain a char* to a buffer representing (some part of) the request. + * + * Arguments: + * self the request as previously inserted in the queue + * buf buffer to write, or NULL when there are no more + * len length of the buffer to write + */ +typedef void (redis_request_write_ptr)(redis_request *self, + const char **buf, + size_t *len); + +/* + * Let the request know that (some part of) it has been written. + * + * Arguments: + * self the request as previously inserted in the queue + * n the number of bytes written + * + * Return: + * the number of bytes (< n) that could be accounted for + */ +typedef int (redis_request_write_cb)(redis_request *self, + int n); + +/* + * Let the request know the wire-level data that was fed to the parser on its + * behalf. This is merely a convenience function that can be used to buffer + * the wire-level representation of the response, for example. + * + * Arguments: + * self the request as previously inserted in the queue + * buf buffer with reply data + * len length of buffer with reply data + */ +typedef void (redis_request_read_raw_cb)(redis_request *self, + const char *buf, + size_t len); + +/* + * Let the request know that a full reply was read on its behalf. This + * function is called when the protocol parser parsed a full reply and this + * request is on the head of the list of requests waiting for a reply. + * + * Arguments: + * self the request as previously inserted in the queue + * reply reply as read by the parser + * done set to non-zero by the request when this is the last reply + */ +typedef void (redis_request_read_cb)(redis_request *self, + redis_protocol *reply, + int *done); + +/* + * Free the request. This function is called after the last reply has been + * read, and passed to the request via `read_cb`. + + * + * Arguments: + * self the request as previously inserted in the queue + */ +typedef void (redis_request_free)(redis_request *self); + +#define REDIS_REQUEST_COMMON \ + ngx_queue_t queue; \ + redis_request_write_ptr *write_ptr; \ + redis_request_write_cb *write_cb; \ + redis_request_read_raw_cb *read_raw_cb; \ + redis_request_read_cb *read_cb; \ + redis_request_free *free; + +struct redis_request_s { + REDIS_REQUEST_COMMON +}; + +typedef struct redis_request_queue_s redis_request_queue; + +/* + * These functions are called whenever a request is inserted in a new queue. + * When a request is initially inserted via the `redis_request_queue_insert` + * function, it ends up in the `to_write` queue and the `to_write_cb` callback + * is called. Before the request emits one or more pointers to its buffers, it + * is placed in the `wait_write` queue and the `wait_write_cb` callback is + * called. Finally, when the request is fully put on the wire and its + * `write_cb` function has indicated that it is done, the request is placed in + * the `wait_read` queue and the `wait_read_cb` callback is called. + * + * Arguments: + * self request queue + * request request that was moved to a new queue + */ +typedef void (request_queue_to_write_cb)(redis_request_queue *self, + redis_request *request); +typedef void (request_queue_wait_write_cb)(redis_request_queue *self, + redis_request *request); +typedef void (request_queue_wait_read_cb)(redis_request_queue *self, + redis_request *request); + +struct redis_request_queue_s { + ngx_queue_t request_to_write; + ngx_queue_t request_wait_write; + ngx_queue_t request_wait_read; + redis_parser parser; + + request_queue_to_write_cb *request_to_write_cb; + request_queue_wait_write_cb *request_wait_write_cb; + request_queue_wait_read_cb *request_wait_read_cb; +}; + +int redis_request_init(redis_request *self); +int redis_request_destroy(redis_request *self); + +int redis_request_queue_init(redis_request_queue *self); +int redis_request_queue_destroy(redis_request_queue *self); + +int redis_request_queue_insert(redis_request_queue *self, redis_request *request); +int redis_request_queue_write_ptr(redis_request_queue *self, const char **buf, size_t *len); +int redis_request_queue_write_cb(redis_request_queue *self, int n); +int redis_request_queue_read_cb(redis_request_queue *self, const char *buf, size_t len); + +#endif diff --git a/test/Makefile b/test/Makefile index 45f9806..5137dc3 100644 --- a/test/Makefile +++ b/test/Makefile @@ -1,6 +1,6 @@ include ../Makefile.common -TESTS=test-format test-parser test-object test-handle test-context +TESTS=test-format test-parser test-object test-handle test-context test-request OBJ=spawn.o net-helper.o all: $(TESTS) diff --git a/test/test-request.c b/test/test-request.c new file mode 100644 index 0000000..4b06777 --- /dev/null +++ b/test/test-request.c @@ -0,0 +1,394 @@ +#include "../fmacros.h" + +/* misc */ +#include +#include +#include +#include +#include + +/* local */ +#include "../request.h" +#include "test-helper.h" + +static struct request_to_write_cb_s { + int callc; + struct { + redis_request_queue *q; + redis_request *r; + } *callv; +} request_to_write_cb_calls = { .callc = 0, .callv = NULL }; + +void request_to_write_cb_reset(void) { + struct request_to_write_cb_s *s = &request_to_write_cb_calls; + + if (s->callc) { + free(s->callv); + } + + s->callc = 0; + s->callv = NULL; +} + +void request_to_write_cb(redis_request_queue *q, redis_request *r) { + struct request_to_write_cb_s *s = &request_to_write_cb_calls; + + s->callv = realloc(s->callv, sizeof(*s->callv) * (s->callc + 1)); + assert(s->callv != NULL); + + s->callv[s->callc].q = q; + s->callv[s->callc].r = r; + s->callc++; +} + +static struct request_wait_write_cb_s { + int callc; + struct { + redis_request_queue *q; + redis_request *r; + } *callv; +} request_wait_write_cb_calls = { .callc = 0, .callv = NULL }; + +void request_wait_write_cb_reset(void) { + struct request_wait_write_cb_s *s = &request_wait_write_cb_calls; + + if (s->callc) { + free(s->callv); + } + + s->callc = 0; + s->callv = NULL; +} + +void request_wait_write_cb(redis_request_queue *q, redis_request *r) { + struct request_wait_write_cb_s *s = &request_wait_write_cb_calls; + + s->callv = realloc(s->callv, sizeof(*s->callv) * (s->callc + 1)); + assert(s->callv != NULL); + + s->callv[s->callc].q = q; + s->callv[s->callc].r = r; + s->callc++; +} + +static struct request_wait_read_cb_s { + int callc; + struct { + redis_request_queue *q; + redis_request *r; + } *callv; +} request_wait_read_cb_calls = { .callc = 0, .callv = NULL }; + +void request_wait_read_cb_reset(void) { + struct request_wait_read_cb_s *s = &request_wait_read_cb_calls; + + if (s->callc) { + free(s->callv); + } + + s->callc = 0; + s->callv = NULL; +} + +void request_wait_read_cb(redis_request_queue *q, redis_request *r) { + struct request_wait_read_cb_s *s = &request_wait_read_cb_calls; + + s->callv = realloc(s->callv, sizeof(*s->callv) * (s->callc + 1)); + assert(s->callv != NULL); + + s->callv[s->callc].q = q; + s->callv[s->callc].r = r; + s->callc++; +} + +#define SETUP() \ + redis_request_queue q; \ + int rv; \ + \ + request_to_write_cb_reset(); \ + request_wait_write_cb_reset(); \ + request_wait_read_cb_reset(); \ + \ + rv = redis_request_queue_init(&q); \ + assert_equal_return(rv, REDIS_OK); \ + \ + q.request_to_write_cb = request_to_write_cb; \ + q.request_wait_write_cb = request_wait_write_cb; \ + q.request_wait_read_cb = request_wait_read_cb; + +#define TEARDOWN() \ + redis_request_queue_destroy(&q); + +typedef struct t1_redis_request_s t1_redis_request; + +struct t1_redis_request_s { + REDIS_REQUEST_COMMON + + const char *buf; + size_t len; + size_t emit; + size_t nemitted; + size_t nwritten; + + const char *read_raw_buf; + size_t read_raw_len; + + redis_protocol *reply; + + int free_calls; +}; + +void t1_write_ptr(redis_request *_self, const char **buf, size_t *len) { + t1_redis_request *self = (t1_redis_request*)_self; + size_t to_emit; + + if (self->nemitted == self->len) { + *buf = NULL; + *len = 0; + return; + } + + to_emit = self->len - self->nemitted; + if (to_emit > self->emit) { + to_emit = self->emit; + } + + *buf = self->buf + self->nemitted; + *len = to_emit; + self->nemitted += to_emit; +} + +int t1_write_cb(redis_request *_self, int n) { + t1_redis_request *self = (t1_redis_request*)_self; + int to_write; + + to_write = self->len - self->nwritten; + if (to_write > n) { + to_write = n; + } + + self->nwritten += to_write; + return to_write; +} + +void t1_read_raw_cb(redis_request *_self, const char *buf, size_t len) { + t1_redis_request *self = (t1_redis_request*)_self; + self->read_raw_buf = buf; + self->read_raw_len = len; +} + +void t1_read_cb(redis_request *_self, redis_protocol *reply, int *done) { + t1_redis_request *self = (t1_redis_request*)_self; + self->reply = reply; + *done = 1; +} + +void t1_free(redis_request *_self) { + t1_redis_request *self = (t1_redis_request*)_self; + self->free_calls++; +} + +void t1_init(t1_redis_request *self) { + memset(self, 0, sizeof(*self)); + redis_request_init((redis_request*)self); + + self->write_ptr = t1_write_ptr; + self->write_cb = t1_write_cb; + self->read_raw_cb = t1_read_raw_cb; + self->read_cb = t1_read_cb; + self->free = t1_free; +} + +TEST(insert_request) { + SETUP(); + + t1_redis_request req; + t1_init(&req); + + req.buf = "hello"; + req.len = strlen(req.buf); + req.emit = 2; + + rv = redis_request_queue_insert(&q, (redis_request*)&req); + assert_equal_return(rv, REDIS_OK); + + /* Test that callback was correctly triggered */ + assert_equal_int(request_to_write_cb_calls.callc, 1); + assert(request_to_write_cb_calls.callv[0].q == &q); + assert(request_to_write_cb_calls.callv[0].r == (redis_request*)&req); + + TEARDOWN(); +} + +#define SETUP_INSERTED() \ + SETUP(); \ + t1_redis_request req1, req2; \ + t1_init(&req1); \ + t1_init(&req2); \ + req1.buf = "hello"; \ + req1.len = strlen(req1.buf); \ + req1.emit = req1.len; \ + req2.buf = "world"; \ + req2.len = strlen(req2.buf); \ + req2.emit = req2.len; \ + rv = redis_request_queue_insert(&q, (redis_request*)&req1); \ + assert_equal_return(rv, REDIS_OK); \ + rv = redis_request_queue_insert(&q, (redis_request*)&req2); \ + assert_equal_return(rv, REDIS_OK); + +TEST(write_ptr) { + SETUP_INSERTED(); + + const char *buf; + size_t len; + + req1.emit = 3; + req2.emit = 3; + + rv = redis_request_queue_write_ptr(&q, &buf, &len); + assert_equal_size_t(rv, 0); + assert_equal_size_t(len, 3); + assert(strncmp(buf, "hel", len) == 0); + + /* The first request should have moved to the wait_write queue now */ + assert_equal_int(request_wait_write_cb_calls.callc, 1); + assert(request_wait_write_cb_calls.callv[0].q == &q); + assert(request_wait_write_cb_calls.callv[0].r == (redis_request*)&req1); + + rv = redis_request_queue_write_ptr(&q, &buf, &len); + assert_equal_size_t(rv, 0); + assert_equal_size_t(len, 2); + assert(strncmp(buf, "lo", len) == 0); + + rv = redis_request_queue_write_ptr(&q, &buf, &len); + assert_equal_size_t(rv, 0); + assert_equal_size_t(len, 3); + assert(strncmp(buf, "wor", len) == 0); + + /* The second request should have moved to the wait_write queue now */ + assert_equal_int(request_wait_write_cb_calls.callc, 2); + assert(request_wait_write_cb_calls.callv[1].q == &q); + assert(request_wait_write_cb_calls.callv[1].r == (redis_request*)&req2); + + rv = redis_request_queue_write_ptr(&q, &buf, &len); + assert_equal_size_t(rv, 0); + assert_equal_size_t(len, 2); + assert(strncmp(buf, "ld", len) == 0); + + rv = redis_request_queue_write_ptr(&q, &buf, &len); + assert_equal_size_t(rv, -1); + + TEARDOWN(); +} + +#define SETUP_WRITTEN_UNCONFIRMED() \ + SETUP_INSERTED(); \ + do { \ + const char *buf; \ + size_t len; \ + rv = redis_request_queue_write_ptr(&q, &buf, &len); \ + } while (rv == 0); + +TEST(write_cb) { + SETUP_WRITTEN_UNCONFIRMED(); + + /* Assume 3 bytes were written ("hel" in req1) */ + rv = redis_request_queue_write_cb(&q, 3); + assert_equal_size_t(rv, 0); + assert_equal_size_t(req1.nwritten, 3); + + /* The first request should have moved to the wait_read queue now */ + assert_equal_int(request_wait_read_cb_calls.callc, 1); + assert(request_wait_read_cb_calls.callv[0].q == &q); + assert(request_wait_read_cb_calls.callv[0].r == (redis_request*)&req1); + + /* Assume another 3 bytes were written ("lo" in req1, "w" in req2) */ + rv = redis_request_queue_write_cb(&q, 3); + assert_equal_size_t(rv, 0); + assert_equal_size_t(req1.nwritten, 5); + assert_equal_size_t(req2.nwritten, 1); + + /* The second request should have moved to the wait_read queue now */ + assert_equal_int(request_wait_read_cb_calls.callc, 2); + assert(request_wait_read_cb_calls.callv[1].q == &q); + assert(request_wait_read_cb_calls.callv[1].r == (redis_request*)&req2); + + /* Run callback for remaining bytes */ + rv = redis_request_queue_write_cb(&q, 4); + assert_equal_size_t(rv, 0); + assert_equal_size_t(req1.nwritten, 5); + assert_equal_size_t(req2.nwritten, 5); + + /* More bytes cannot be mapped to requests... */ + rv = redis_request_queue_write_cb(&q, 4); + assert_equal_size_t(rv, -1); + + TEARDOWN(); +} + +#define SETUP_WRITTEN_CONFIRMED() \ + SETUP_WRITTEN_UNCONFIRMED(); \ + rv = redis_request_queue_write_cb(&q, req1.len); \ + assert_equal_size_t(rv, 0); \ + rv = redis_request_queue_write_cb(&q, req2.len); \ + assert_equal_size_t(rv, 0); + +TEST(read_cb) { + SETUP_WRITTEN_CONFIRMED(); + + /* Feed part of the reply for request 1 */ + rv = redis_request_queue_read_cb(&q, "+stat", 5); + assert_equal_size_t(rv, 0); + + assert_equal_size_t(req1.read_raw_len, 5); + assert(strncmp(req1.read_raw_buf, "+stat", req1.read_raw_len) == 0); + assert(req1.reply == NULL); + assert(req1.free_calls == 0); + + /* Feed remaining part for request 1, and first part for request 2 */ + rv = redis_request_queue_read_cb(&q, "us\r\n+st", 7); + assert_equal_size_t(rv, 0); + + assert_equal_size_t(req1.read_raw_len, 4); + assert(strncmp(req1.read_raw_buf, "us\r\n", req1.read_raw_len) == 0); + assert(req1.reply != NULL && req1.reply->type == REDIS_STATUS); + assert(req1.free_calls == 1); + + assert_equal_size_t(req2.read_raw_len, 3); + assert(strncmp(req2.read_raw_buf, "+st", req2.read_raw_len) == 0); + assert(req2.reply == NULL); + assert(req2.free_calls == 0); + + /* Feed remaining part for request 2 */ + rv = redis_request_queue_read_cb(&q, "atus\r\n", 6); + assert_equal_size_t(rv, 0); + + assert_equal_size_t(req2.read_raw_len, 6); + assert(strncmp(req2.read_raw_buf, "atus\r\n", req2.read_raw_len) == 0); + assert(req2.reply != NULL && req1.reply->type == REDIS_STATUS); + assert(req2.free_calls == 1); + + TEARDOWN(); +} + +TEST(read_cb_with_parse_error) { + SETUP_WRITTEN_CONFIRMED(); + + /* Feed part of an erroneous reply */ + rv = redis_request_queue_read_cb(&q, "+x\r\r", 4); + assert_equal_return(rv, REDIS_EPARSER); + + /* This should keep failing */ + rv = redis_request_queue_read_cb(&q, "\n", 1); + assert_equal_return(rv, REDIS_EPARSER); + + TEARDOWN(); +} + +int main(void) { + test_insert_request(); + test_write_ptr(); + test_write_cb(); + test_read_cb(); + test_read_cb_with_parse_error(); +}