Request queue
The request queue abstraction intends to provide an interface usable by asynchronous I/O libraries, without making assumptions about their interfaces.
This commit is contained in:
parent
87e9451e6b
commit
ee99d5a3d1
2
Makefile
2
Makefile
@ -5,7 +5,7 @@
|
||||
|
||||
include ./Makefile.common
|
||||
|
||||
OBJ=net.o hiredis.o sds.o async.o parser.o object.o handle.o format.o context.o address.o
|
||||
OBJ=net.o hiredis.o sds.o async.o parser.o object.o handle.o format.o context.o address.o request.o
|
||||
BINS=hiredis-example hiredis-test
|
||||
|
||||
all: $(DYLIBNAME) $(BINS)
|
||||
|
||||
106
ngx-queue.h
Normal file
106
ngx-queue.h
Normal file
@ -0,0 +1,106 @@
|
||||
|
||||
/*
|
||||
* Copyright (C) Igor Sysoev
|
||||
*/
|
||||
|
||||
|
||||
#ifndef _NGX_QUEUE_H_INCLUDED_
|
||||
#define _NGX_QUEUE_H_INCLUDED_
|
||||
|
||||
|
||||
typedef struct ngx_queue_s ngx_queue_t;
|
||||
|
||||
struct ngx_queue_s {
|
||||
ngx_queue_t *prev;
|
||||
ngx_queue_t *next;
|
||||
};
|
||||
|
||||
|
||||
#define ngx_queue_init(q) \
|
||||
(q)->prev = q; \
|
||||
(q)->next = q
|
||||
|
||||
|
||||
#define ngx_queue_empty(h) \
|
||||
(h == (h)->prev)
|
||||
|
||||
|
||||
#define ngx_queue_insert_head(h, x) \
|
||||
(x)->next = (h)->next; \
|
||||
(x)->next->prev = x; \
|
||||
(x)->prev = h; \
|
||||
(h)->next = x
|
||||
|
||||
|
||||
#define ngx_queue_insert_after ngx_queue_insert_head
|
||||
|
||||
|
||||
#define ngx_queue_insert_tail(h, x) \
|
||||
(x)->prev = (h)->prev; \
|
||||
(x)->prev->next = x; \
|
||||
(x)->next = h; \
|
||||
(h)->prev = x
|
||||
|
||||
|
||||
#define ngx_queue_head(h) \
|
||||
(h)->next
|
||||
|
||||
|
||||
#define ngx_queue_last(h) \
|
||||
(h)->prev
|
||||
|
||||
|
||||
#define ngx_queue_sentinel(h) \
|
||||
(h)
|
||||
|
||||
|
||||
#define ngx_queue_next(q) \
|
||||
(q)->next
|
||||
|
||||
|
||||
#define ngx_queue_prev(q) \
|
||||
(q)->prev
|
||||
|
||||
|
||||
#if (NGX_DEBUG)
|
||||
|
||||
#define ngx_queue_remove(x) \
|
||||
(x)->next->prev = (x)->prev; \
|
||||
(x)->prev->next = (x)->next; \
|
||||
(x)->prev = NULL; \
|
||||
(x)->next = NULL
|
||||
|
||||
#else
|
||||
|
||||
#define ngx_queue_remove(x) \
|
||||
(x)->next->prev = (x)->prev; \
|
||||
(x)->prev->next = (x)->next
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
#define ngx_queue_split(h, q, n) \
|
||||
(n)->prev = (h)->prev; \
|
||||
(n)->prev->next = n; \
|
||||
(n)->next = q; \
|
||||
(h)->prev = (q)->prev; \
|
||||
(h)->prev->next = h; \
|
||||
(q)->prev = n;
|
||||
|
||||
|
||||
#define ngx_queue_add(h, n) \
|
||||
(h)->prev->next = (n)->next; \
|
||||
(n)->next->prev = (h)->prev; \
|
||||
(h)->prev = (n)->prev; \
|
||||
(h)->prev->next = h;
|
||||
|
||||
|
||||
#define ngx_queue_data(q, type, link) \
|
||||
(type *) ((unsigned char *) q - offsetof(type, link))
|
||||
|
||||
|
||||
#define ngx_queue_foreach(q, h) \
|
||||
for ((q) = ngx_queue_head(h); (q) != (h); (q) = ngx_queue_next(q))
|
||||
|
||||
|
||||
#endif /* _NGX_QUEUE_H_INCLUDED_ */
|
||||
214
request.c
Normal file
214
request.c
Normal file
@ -0,0 +1,214 @@
|
||||
#include <string.h>
|
||||
#include <assert.h>
|
||||
#include <errno.h>
|
||||
|
||||
#include "handle.h" /* return values */
|
||||
#include "request.h"
|
||||
|
||||
int redis_request_init(redis_request *self) {
|
||||
memset(self, 0, sizeof(*self));
|
||||
return REDIS_OK;
|
||||
}
|
||||
|
||||
int redis_request_destroy(redis_request *self) {
|
||||
((void) self);
|
||||
return REDIS_OK;
|
||||
}
|
||||
|
||||
int redis_request_queue_init(redis_request_queue *self) {
|
||||
memset(self, 0, sizeof(*self));
|
||||
ngx_queue_init(&self->request_to_write);
|
||||
ngx_queue_init(&self->request_wait_write);
|
||||
ngx_queue_init(&self->request_wait_read);
|
||||
redis_parser_init(&self->parser, NULL);
|
||||
return REDIS_OK;
|
||||
}
|
||||
|
||||
void redis__free_queue(ngx_queue_t *h) {
|
||||
ngx_queue_t *q;
|
||||
redis_request *req;
|
||||
|
||||
ngx_queue_foreach(q, h) {
|
||||
ngx_queue_remove(q);
|
||||
req = ngx_queue_data(q, redis_request, queue);
|
||||
req->free(req);
|
||||
}
|
||||
}
|
||||
|
||||
int redis_request_queue_destroy(redis_request_queue *self) {
|
||||
redis__free_queue(&self->request_to_write);
|
||||
redis__free_queue(&self->request_wait_write);
|
||||
redis__free_queue(&self->request_wait_read);
|
||||
redis_parser_destroy(&self->parser);
|
||||
return REDIS_OK;
|
||||
}
|
||||
|
||||
int redis_request_queue_insert(redis_request_queue *self, redis_request *request) {
|
||||
ngx_queue_insert_head(&self->request_to_write, &request->queue);
|
||||
|
||||
if (self->request_to_write_cb) {
|
||||
self->request_to_write_cb(self, request);
|
||||
}
|
||||
|
||||
return REDIS_OK;
|
||||
}
|
||||
|
||||
redis_request *redis__request_queue_move(ngx_queue_t *a, ngx_queue_t *b) {
|
||||
ngx_queue_t *q;
|
||||
|
||||
/* Unable to move requests when there are none... */
|
||||
if (ngx_queue_empty(a)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
q = ngx_queue_last(a);
|
||||
ngx_queue_remove(q);
|
||||
ngx_queue_insert_head(b, q);
|
||||
return ngx_queue_data(q, redis_request, queue);
|
||||
}
|
||||
|
||||
redis_request *redis__request_queue_pop_to_write(redis_request_queue *self) {
|
||||
redis_request *req;
|
||||
|
||||
req = redis__request_queue_move(&self->request_to_write,
|
||||
&self->request_wait_write);
|
||||
|
||||
if (req && self->request_wait_write_cb) {
|
||||
self->request_wait_write_cb(self, req);
|
||||
}
|
||||
|
||||
return req;
|
||||
}
|
||||
|
||||
int redis_request_queue_write_ptr(redis_request_queue *self, const char **buf, size_t *len) {
|
||||
ngx_queue_t *q;
|
||||
redis_request *req;
|
||||
|
||||
/* We need at least one element in the wait_write queue */
|
||||
if (ngx_queue_empty(&self->request_wait_write)) {
|
||||
if (redis__request_queue_pop_to_write(self) == NULL) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
while (1) {
|
||||
assert(!ngx_queue_empty(&self->request_wait_write));
|
||||
q = ngx_queue_head(&self->request_wait_write);
|
||||
req = ngx_queue_data(q, redis_request, queue);
|
||||
|
||||
const char *auxbuf = NULL;
|
||||
size_t auxlen = 0;
|
||||
|
||||
assert(req->write_ptr);
|
||||
req->write_ptr(req, &auxbuf, &auxlen);
|
||||
if (auxbuf == NULL) {
|
||||
if (redis__request_queue_pop_to_write(self) == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
*buf = auxbuf;
|
||||
*len = auxlen;
|
||||
break;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
redis_request *redis__request_queue_pop_wait_write(redis_request_queue *self) {
|
||||
redis_request *req;
|
||||
|
||||
req = redis__request_queue_move(&self->request_wait_write,
|
||||
&self->request_wait_read);
|
||||
|
||||
if (req && self->request_wait_read_cb) {
|
||||
self->request_wait_read_cb(self, req);
|
||||
}
|
||||
|
||||
return req;
|
||||
}
|
||||
|
||||
int redis_request_queue_write_cb(redis_request_queue *self, int n) {
|
||||
ngx_queue_t *q;
|
||||
redis_request *req;
|
||||
int wrote;
|
||||
|
||||
/* We need at least one element in the wait_read queue */
|
||||
if (ngx_queue_empty(&self->request_wait_read)) {
|
||||
if (redis__request_queue_pop_wait_write(self) == NULL) {
|
||||
/* The request cannot be NULL: it emitted bytes to write and should
|
||||
* be present in the wait_write queue, waiting for a callback. */
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
while (n) {
|
||||
assert(!ngx_queue_empty(&self->request_wait_read));
|
||||
q = ngx_queue_head(&self->request_wait_read);
|
||||
req = ngx_queue_data(q, redis_request, queue);
|
||||
|
||||
assert(req->write_cb);
|
||||
wrote = req->write_cb(req, n);
|
||||
if (wrote == 0) {
|
||||
if (redis__request_queue_pop_wait_write(self) == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
assert(wrote <= n);
|
||||
n -= wrote;
|
||||
}
|
||||
|
||||
assert(n == 0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int redis_request_queue_read_cb(redis_request_queue *self, const char *buf, size_t len) {
|
||||
ngx_queue_t *q;
|
||||
redis_request *req;
|
||||
redis_protocol *p;
|
||||
size_t nparsed;
|
||||
|
||||
while (len) {
|
||||
p = NULL;
|
||||
nparsed = redis_parser_execute(&self->parser, &p, buf, len);
|
||||
|
||||
/* Test for parse error */
|
||||
if (nparsed < len && p == NULL) {
|
||||
errno = redis_parser_err(&self->parser);
|
||||
return REDIS_EPARSER;
|
||||
}
|
||||
|
||||
if (!ngx_queue_empty(&self->request_wait_read)) {
|
||||
q = ngx_queue_last(&self->request_wait_read);
|
||||
req = ngx_queue_data(q, redis_request, queue);
|
||||
|
||||
/* Fire raw read callback when defined */
|
||||
if (req->read_raw_cb) {
|
||||
req->read_raw_cb(req, buf, nparsed);
|
||||
}
|
||||
|
||||
/* Fire read callback when the parser produced something */
|
||||
if (p != NULL) {
|
||||
int done = 0;
|
||||
|
||||
req->read_cb(req, p, &done);
|
||||
if (done) {
|
||||
ngx_queue_remove(q);
|
||||
req->free(req);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
assert(NULL && "todo");
|
||||
}
|
||||
|
||||
buf += nparsed;
|
||||
len -= nparsed;
|
||||
}
|
||||
|
||||
return REDIS_OK;
|
||||
}
|
||||
130
request.h
Normal file
130
request.h
Normal file
@ -0,0 +1,130 @@
|
||||
#ifndef HIREDIS_REQUEST_H
|
||||
#define HIREDIS_REQUEST_H 1
|
||||
|
||||
#include <stddef.h> /* offsetof */
|
||||
#include "ngx-queue.h"
|
||||
#include "parser.h"
|
||||
|
||||
typedef struct redis_request_s redis_request;
|
||||
|
||||
/*
|
||||
* Obtain a char* to a buffer representing (some part of) the request.
|
||||
*
|
||||
* Arguments:
|
||||
* self the request as previously inserted in the queue
|
||||
* buf buffer to write, or NULL when there are no more
|
||||
* len length of the buffer to write
|
||||
*/
|
||||
typedef void (redis_request_write_ptr)(redis_request *self,
|
||||
const char **buf,
|
||||
size_t *len);
|
||||
|
||||
/*
|
||||
* Let the request know that (some part of) it has been written.
|
||||
*
|
||||
* Arguments:
|
||||
* self the request as previously inserted in the queue
|
||||
* n the number of bytes written
|
||||
*
|
||||
* Return:
|
||||
* the number of bytes (< n) that could be accounted for
|
||||
*/
|
||||
typedef int (redis_request_write_cb)(redis_request *self,
|
||||
int n);
|
||||
|
||||
/*
|
||||
* Let the request know the wire-level data that was fed to the parser on its
|
||||
* behalf. This is merely a convenience function that can be used to buffer
|
||||
* the wire-level representation of the response, for example.
|
||||
*
|
||||
* Arguments:
|
||||
* self the request as previously inserted in the queue
|
||||
* buf buffer with reply data
|
||||
* len length of buffer with reply data
|
||||
*/
|
||||
typedef void (redis_request_read_raw_cb)(redis_request *self,
|
||||
const char *buf,
|
||||
size_t len);
|
||||
|
||||
/*
|
||||
* Let the request know that a full reply was read on its behalf. This
|
||||
* function is called when the protocol parser parsed a full reply and this
|
||||
* request is on the head of the list of requests waiting for a reply.
|
||||
*
|
||||
* Arguments:
|
||||
* self the request as previously inserted in the queue
|
||||
* reply reply as read by the parser
|
||||
* done set to non-zero by the request when this is the last reply
|
||||
*/
|
||||
typedef void (redis_request_read_cb)(redis_request *self,
|
||||
redis_protocol *reply,
|
||||
int *done);
|
||||
|
||||
/*
|
||||
* Free the request. This function is called after the last reply has been
|
||||
* read, and passed to the request via `read_cb`.
|
||||
|
||||
*
|
||||
* Arguments:
|
||||
* self the request as previously inserted in the queue
|
||||
*/
|
||||
typedef void (redis_request_free)(redis_request *self);
|
||||
|
||||
#define REDIS_REQUEST_COMMON \
|
||||
ngx_queue_t queue; \
|
||||
redis_request_write_ptr *write_ptr; \
|
||||
redis_request_write_cb *write_cb; \
|
||||
redis_request_read_raw_cb *read_raw_cb; \
|
||||
redis_request_read_cb *read_cb; \
|
||||
redis_request_free *free;
|
||||
|
||||
struct redis_request_s {
|
||||
REDIS_REQUEST_COMMON
|
||||
};
|
||||
|
||||
typedef struct redis_request_queue_s redis_request_queue;
|
||||
|
||||
/*
|
||||
* These functions are called whenever a request is inserted in a new queue.
|
||||
* When a request is initially inserted via the `redis_request_queue_insert`
|
||||
* function, it ends up in the `to_write` queue and the `to_write_cb` callback
|
||||
* is called. Before the request emits one or more pointers to its buffers, it
|
||||
* is placed in the `wait_write` queue and the `wait_write_cb` callback is
|
||||
* called. Finally, when the request is fully put on the wire and its
|
||||
* `write_cb` function has indicated that it is done, the request is placed in
|
||||
* the `wait_read` queue and the `wait_read_cb` callback is called.
|
||||
*
|
||||
* Arguments:
|
||||
* self request queue
|
||||
* request request that was moved to a new queue
|
||||
*/
|
||||
typedef void (request_queue_to_write_cb)(redis_request_queue *self,
|
||||
redis_request *request);
|
||||
typedef void (request_queue_wait_write_cb)(redis_request_queue *self,
|
||||
redis_request *request);
|
||||
typedef void (request_queue_wait_read_cb)(redis_request_queue *self,
|
||||
redis_request *request);
|
||||
|
||||
struct redis_request_queue_s {
|
||||
ngx_queue_t request_to_write;
|
||||
ngx_queue_t request_wait_write;
|
||||
ngx_queue_t request_wait_read;
|
||||
redis_parser parser;
|
||||
|
||||
request_queue_to_write_cb *request_to_write_cb;
|
||||
request_queue_wait_write_cb *request_wait_write_cb;
|
||||
request_queue_wait_read_cb *request_wait_read_cb;
|
||||
};
|
||||
|
||||
int redis_request_init(redis_request *self);
|
||||
int redis_request_destroy(redis_request *self);
|
||||
|
||||
int redis_request_queue_init(redis_request_queue *self);
|
||||
int redis_request_queue_destroy(redis_request_queue *self);
|
||||
|
||||
int redis_request_queue_insert(redis_request_queue *self, redis_request *request);
|
||||
int redis_request_queue_write_ptr(redis_request_queue *self, const char **buf, size_t *len);
|
||||
int redis_request_queue_write_cb(redis_request_queue *self, int n);
|
||||
int redis_request_queue_read_cb(redis_request_queue *self, const char *buf, size_t len);
|
||||
|
||||
#endif
|
||||
@ -1,6 +1,6 @@
|
||||
include ../Makefile.common
|
||||
|
||||
TESTS=test-format test-parser test-object test-handle test-context
|
||||
TESTS=test-format test-parser test-object test-handle test-context test-request
|
||||
OBJ=spawn.o net-helper.o
|
||||
|
||||
all: $(TESTS)
|
||||
|
||||
394
test/test-request.c
Normal file
394
test/test-request.c
Normal file
@ -0,0 +1,394 @@
|
||||
#include "../fmacros.h"
|
||||
|
||||
/* misc */
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <assert.h>
|
||||
#include <errno.h>
|
||||
|
||||
/* local */
|
||||
#include "../request.h"
|
||||
#include "test-helper.h"
|
||||
|
||||
static struct request_to_write_cb_s {
|
||||
int callc;
|
||||
struct {
|
||||
redis_request_queue *q;
|
||||
redis_request *r;
|
||||
} *callv;
|
||||
} request_to_write_cb_calls = { .callc = 0, .callv = NULL };
|
||||
|
||||
void request_to_write_cb_reset(void) {
|
||||
struct request_to_write_cb_s *s = &request_to_write_cb_calls;
|
||||
|
||||
if (s->callc) {
|
||||
free(s->callv);
|
||||
}
|
||||
|
||||
s->callc = 0;
|
||||
s->callv = NULL;
|
||||
}
|
||||
|
||||
void request_to_write_cb(redis_request_queue *q, redis_request *r) {
|
||||
struct request_to_write_cb_s *s = &request_to_write_cb_calls;
|
||||
|
||||
s->callv = realloc(s->callv, sizeof(*s->callv) * (s->callc + 1));
|
||||
assert(s->callv != NULL);
|
||||
|
||||
s->callv[s->callc].q = q;
|
||||
s->callv[s->callc].r = r;
|
||||
s->callc++;
|
||||
}
|
||||
|
||||
static struct request_wait_write_cb_s {
|
||||
int callc;
|
||||
struct {
|
||||
redis_request_queue *q;
|
||||
redis_request *r;
|
||||
} *callv;
|
||||
} request_wait_write_cb_calls = { .callc = 0, .callv = NULL };
|
||||
|
||||
void request_wait_write_cb_reset(void) {
|
||||
struct request_wait_write_cb_s *s = &request_wait_write_cb_calls;
|
||||
|
||||
if (s->callc) {
|
||||
free(s->callv);
|
||||
}
|
||||
|
||||
s->callc = 0;
|
||||
s->callv = NULL;
|
||||
}
|
||||
|
||||
void request_wait_write_cb(redis_request_queue *q, redis_request *r) {
|
||||
struct request_wait_write_cb_s *s = &request_wait_write_cb_calls;
|
||||
|
||||
s->callv = realloc(s->callv, sizeof(*s->callv) * (s->callc + 1));
|
||||
assert(s->callv != NULL);
|
||||
|
||||
s->callv[s->callc].q = q;
|
||||
s->callv[s->callc].r = r;
|
||||
s->callc++;
|
||||
}
|
||||
|
||||
static struct request_wait_read_cb_s {
|
||||
int callc;
|
||||
struct {
|
||||
redis_request_queue *q;
|
||||
redis_request *r;
|
||||
} *callv;
|
||||
} request_wait_read_cb_calls = { .callc = 0, .callv = NULL };
|
||||
|
||||
void request_wait_read_cb_reset(void) {
|
||||
struct request_wait_read_cb_s *s = &request_wait_read_cb_calls;
|
||||
|
||||
if (s->callc) {
|
||||
free(s->callv);
|
||||
}
|
||||
|
||||
s->callc = 0;
|
||||
s->callv = NULL;
|
||||
}
|
||||
|
||||
void request_wait_read_cb(redis_request_queue *q, redis_request *r) {
|
||||
struct request_wait_read_cb_s *s = &request_wait_read_cb_calls;
|
||||
|
||||
s->callv = realloc(s->callv, sizeof(*s->callv) * (s->callc + 1));
|
||||
assert(s->callv != NULL);
|
||||
|
||||
s->callv[s->callc].q = q;
|
||||
s->callv[s->callc].r = r;
|
||||
s->callc++;
|
||||
}
|
||||
|
||||
#define SETUP() \
|
||||
redis_request_queue q; \
|
||||
int rv; \
|
||||
\
|
||||
request_to_write_cb_reset(); \
|
||||
request_wait_write_cb_reset(); \
|
||||
request_wait_read_cb_reset(); \
|
||||
\
|
||||
rv = redis_request_queue_init(&q); \
|
||||
assert_equal_return(rv, REDIS_OK); \
|
||||
\
|
||||
q.request_to_write_cb = request_to_write_cb; \
|
||||
q.request_wait_write_cb = request_wait_write_cb; \
|
||||
q.request_wait_read_cb = request_wait_read_cb;
|
||||
|
||||
#define TEARDOWN() \
|
||||
redis_request_queue_destroy(&q);
|
||||
|
||||
typedef struct t1_redis_request_s t1_redis_request;
|
||||
|
||||
struct t1_redis_request_s {
|
||||
REDIS_REQUEST_COMMON
|
||||
|
||||
const char *buf;
|
||||
size_t len;
|
||||
size_t emit;
|
||||
size_t nemitted;
|
||||
size_t nwritten;
|
||||
|
||||
const char *read_raw_buf;
|
||||
size_t read_raw_len;
|
||||
|
||||
redis_protocol *reply;
|
||||
|
||||
int free_calls;
|
||||
};
|
||||
|
||||
void t1_write_ptr(redis_request *_self, const char **buf, size_t *len) {
|
||||
t1_redis_request *self = (t1_redis_request*)_self;
|
||||
size_t to_emit;
|
||||
|
||||
if (self->nemitted == self->len) {
|
||||
*buf = NULL;
|
||||
*len = 0;
|
||||
return;
|
||||
}
|
||||
|
||||
to_emit = self->len - self->nemitted;
|
||||
if (to_emit > self->emit) {
|
||||
to_emit = self->emit;
|
||||
}
|
||||
|
||||
*buf = self->buf + self->nemitted;
|
||||
*len = to_emit;
|
||||
self->nemitted += to_emit;
|
||||
}
|
||||
|
||||
int t1_write_cb(redis_request *_self, int n) {
|
||||
t1_redis_request *self = (t1_redis_request*)_self;
|
||||
int to_write;
|
||||
|
||||
to_write = self->len - self->nwritten;
|
||||
if (to_write > n) {
|
||||
to_write = n;
|
||||
}
|
||||
|
||||
self->nwritten += to_write;
|
||||
return to_write;
|
||||
}
|
||||
|
||||
void t1_read_raw_cb(redis_request *_self, const char *buf, size_t len) {
|
||||
t1_redis_request *self = (t1_redis_request*)_self;
|
||||
self->read_raw_buf = buf;
|
||||
self->read_raw_len = len;
|
||||
}
|
||||
|
||||
void t1_read_cb(redis_request *_self, redis_protocol *reply, int *done) {
|
||||
t1_redis_request *self = (t1_redis_request*)_self;
|
||||
self->reply = reply;
|
||||
*done = 1;
|
||||
}
|
||||
|
||||
void t1_free(redis_request *_self) {
|
||||
t1_redis_request *self = (t1_redis_request*)_self;
|
||||
self->free_calls++;
|
||||
}
|
||||
|
||||
void t1_init(t1_redis_request *self) {
|
||||
memset(self, 0, sizeof(*self));
|
||||
redis_request_init((redis_request*)self);
|
||||
|
||||
self->write_ptr = t1_write_ptr;
|
||||
self->write_cb = t1_write_cb;
|
||||
self->read_raw_cb = t1_read_raw_cb;
|
||||
self->read_cb = t1_read_cb;
|
||||
self->free = t1_free;
|
||||
}
|
||||
|
||||
TEST(insert_request) {
|
||||
SETUP();
|
||||
|
||||
t1_redis_request req;
|
||||
t1_init(&req);
|
||||
|
||||
req.buf = "hello";
|
||||
req.len = strlen(req.buf);
|
||||
req.emit = 2;
|
||||
|
||||
rv = redis_request_queue_insert(&q, (redis_request*)&req);
|
||||
assert_equal_return(rv, REDIS_OK);
|
||||
|
||||
/* Test that callback was correctly triggered */
|
||||
assert_equal_int(request_to_write_cb_calls.callc, 1);
|
||||
assert(request_to_write_cb_calls.callv[0].q == &q);
|
||||
assert(request_to_write_cb_calls.callv[0].r == (redis_request*)&req);
|
||||
|
||||
TEARDOWN();
|
||||
}
|
||||
|
||||
#define SETUP_INSERTED() \
|
||||
SETUP(); \
|
||||
t1_redis_request req1, req2; \
|
||||
t1_init(&req1); \
|
||||
t1_init(&req2); \
|
||||
req1.buf = "hello"; \
|
||||
req1.len = strlen(req1.buf); \
|
||||
req1.emit = req1.len; \
|
||||
req2.buf = "world"; \
|
||||
req2.len = strlen(req2.buf); \
|
||||
req2.emit = req2.len; \
|
||||
rv = redis_request_queue_insert(&q, (redis_request*)&req1); \
|
||||
assert_equal_return(rv, REDIS_OK); \
|
||||
rv = redis_request_queue_insert(&q, (redis_request*)&req2); \
|
||||
assert_equal_return(rv, REDIS_OK);
|
||||
|
||||
TEST(write_ptr) {
|
||||
SETUP_INSERTED();
|
||||
|
||||
const char *buf;
|
||||
size_t len;
|
||||
|
||||
req1.emit = 3;
|
||||
req2.emit = 3;
|
||||
|
||||
rv = redis_request_queue_write_ptr(&q, &buf, &len);
|
||||
assert_equal_size_t(rv, 0);
|
||||
assert_equal_size_t(len, 3);
|
||||
assert(strncmp(buf, "hel", len) == 0);
|
||||
|
||||
/* The first request should have moved to the wait_write queue now */
|
||||
assert_equal_int(request_wait_write_cb_calls.callc, 1);
|
||||
assert(request_wait_write_cb_calls.callv[0].q == &q);
|
||||
assert(request_wait_write_cb_calls.callv[0].r == (redis_request*)&req1);
|
||||
|
||||
rv = redis_request_queue_write_ptr(&q, &buf, &len);
|
||||
assert_equal_size_t(rv, 0);
|
||||
assert_equal_size_t(len, 2);
|
||||
assert(strncmp(buf, "lo", len) == 0);
|
||||
|
||||
rv = redis_request_queue_write_ptr(&q, &buf, &len);
|
||||
assert_equal_size_t(rv, 0);
|
||||
assert_equal_size_t(len, 3);
|
||||
assert(strncmp(buf, "wor", len) == 0);
|
||||
|
||||
/* The second request should have moved to the wait_write queue now */
|
||||
assert_equal_int(request_wait_write_cb_calls.callc, 2);
|
||||
assert(request_wait_write_cb_calls.callv[1].q == &q);
|
||||
assert(request_wait_write_cb_calls.callv[1].r == (redis_request*)&req2);
|
||||
|
||||
rv = redis_request_queue_write_ptr(&q, &buf, &len);
|
||||
assert_equal_size_t(rv, 0);
|
||||
assert_equal_size_t(len, 2);
|
||||
assert(strncmp(buf, "ld", len) == 0);
|
||||
|
||||
rv = redis_request_queue_write_ptr(&q, &buf, &len);
|
||||
assert_equal_size_t(rv, -1);
|
||||
|
||||
TEARDOWN();
|
||||
}
|
||||
|
||||
#define SETUP_WRITTEN_UNCONFIRMED() \
|
||||
SETUP_INSERTED(); \
|
||||
do { \
|
||||
const char *buf; \
|
||||
size_t len; \
|
||||
rv = redis_request_queue_write_ptr(&q, &buf, &len); \
|
||||
} while (rv == 0);
|
||||
|
||||
TEST(write_cb) {
|
||||
SETUP_WRITTEN_UNCONFIRMED();
|
||||
|
||||
/* Assume 3 bytes were written ("hel" in req1) */
|
||||
rv = redis_request_queue_write_cb(&q, 3);
|
||||
assert_equal_size_t(rv, 0);
|
||||
assert_equal_size_t(req1.nwritten, 3);
|
||||
|
||||
/* The first request should have moved to the wait_read queue now */
|
||||
assert_equal_int(request_wait_read_cb_calls.callc, 1);
|
||||
assert(request_wait_read_cb_calls.callv[0].q == &q);
|
||||
assert(request_wait_read_cb_calls.callv[0].r == (redis_request*)&req1);
|
||||
|
||||
/* Assume another 3 bytes were written ("lo" in req1, "w" in req2) */
|
||||
rv = redis_request_queue_write_cb(&q, 3);
|
||||
assert_equal_size_t(rv, 0);
|
||||
assert_equal_size_t(req1.nwritten, 5);
|
||||
assert_equal_size_t(req2.nwritten, 1);
|
||||
|
||||
/* The second request should have moved to the wait_read queue now */
|
||||
assert_equal_int(request_wait_read_cb_calls.callc, 2);
|
||||
assert(request_wait_read_cb_calls.callv[1].q == &q);
|
||||
assert(request_wait_read_cb_calls.callv[1].r == (redis_request*)&req2);
|
||||
|
||||
/* Run callback for remaining bytes */
|
||||
rv = redis_request_queue_write_cb(&q, 4);
|
||||
assert_equal_size_t(rv, 0);
|
||||
assert_equal_size_t(req1.nwritten, 5);
|
||||
assert_equal_size_t(req2.nwritten, 5);
|
||||
|
||||
/* More bytes cannot be mapped to requests... */
|
||||
rv = redis_request_queue_write_cb(&q, 4);
|
||||
assert_equal_size_t(rv, -1);
|
||||
|
||||
TEARDOWN();
|
||||
}
|
||||
|
||||
#define SETUP_WRITTEN_CONFIRMED() \
|
||||
SETUP_WRITTEN_UNCONFIRMED(); \
|
||||
rv = redis_request_queue_write_cb(&q, req1.len); \
|
||||
assert_equal_size_t(rv, 0); \
|
||||
rv = redis_request_queue_write_cb(&q, req2.len); \
|
||||
assert_equal_size_t(rv, 0);
|
||||
|
||||
TEST(read_cb) {
|
||||
SETUP_WRITTEN_CONFIRMED();
|
||||
|
||||
/* Feed part of the reply for request 1 */
|
||||
rv = redis_request_queue_read_cb(&q, "+stat", 5);
|
||||
assert_equal_size_t(rv, 0);
|
||||
|
||||
assert_equal_size_t(req1.read_raw_len, 5);
|
||||
assert(strncmp(req1.read_raw_buf, "+stat", req1.read_raw_len) == 0);
|
||||
assert(req1.reply == NULL);
|
||||
assert(req1.free_calls == 0);
|
||||
|
||||
/* Feed remaining part for request 1, and first part for request 2 */
|
||||
rv = redis_request_queue_read_cb(&q, "us\r\n+st", 7);
|
||||
assert_equal_size_t(rv, 0);
|
||||
|
||||
assert_equal_size_t(req1.read_raw_len, 4);
|
||||
assert(strncmp(req1.read_raw_buf, "us\r\n", req1.read_raw_len) == 0);
|
||||
assert(req1.reply != NULL && req1.reply->type == REDIS_STATUS);
|
||||
assert(req1.free_calls == 1);
|
||||
|
||||
assert_equal_size_t(req2.read_raw_len, 3);
|
||||
assert(strncmp(req2.read_raw_buf, "+st", req2.read_raw_len) == 0);
|
||||
assert(req2.reply == NULL);
|
||||
assert(req2.free_calls == 0);
|
||||
|
||||
/* Feed remaining part for request 2 */
|
||||
rv = redis_request_queue_read_cb(&q, "atus\r\n", 6);
|
||||
assert_equal_size_t(rv, 0);
|
||||
|
||||
assert_equal_size_t(req2.read_raw_len, 6);
|
||||
assert(strncmp(req2.read_raw_buf, "atus\r\n", req2.read_raw_len) == 0);
|
||||
assert(req2.reply != NULL && req1.reply->type == REDIS_STATUS);
|
||||
assert(req2.free_calls == 1);
|
||||
|
||||
TEARDOWN();
|
||||
}
|
||||
|
||||
TEST(read_cb_with_parse_error) {
|
||||
SETUP_WRITTEN_CONFIRMED();
|
||||
|
||||
/* Feed part of an erroneous reply */
|
||||
rv = redis_request_queue_read_cb(&q, "+x\r\r", 4);
|
||||
assert_equal_return(rv, REDIS_EPARSER);
|
||||
|
||||
/* This should keep failing */
|
||||
rv = redis_request_queue_read_cb(&q, "\n", 1);
|
||||
assert_equal_return(rv, REDIS_EPARSER);
|
||||
|
||||
TEARDOWN();
|
||||
}
|
||||
|
||||
int main(void) {
|
||||
test_insert_request();
|
||||
test_write_ptr();
|
||||
test_write_cb();
|
||||
test_read_cb();
|
||||
test_read_cb_with_parse_error();
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user