Request is responsible for parsing its own reply
This commit is contained in:
parent
6c990440ee
commit
d0eb3f6511
47
request.c
47
request.c
@ -44,6 +44,7 @@ int redis_request_queue_destroy(redis_request_queue *self) {
|
||||
}
|
||||
|
||||
void redis_request_queue_insert(redis_request_queue *self, redis_request *request) {
|
||||
request->request_queue = self;
|
||||
ngx_queue_insert_head(&self->request_to_write, &request->queue);
|
||||
|
||||
if (self->request_to_write_cb) {
|
||||
@ -168,44 +169,28 @@ int redis_request_queue_write_cb(redis_request_queue *self, int n) {
|
||||
int redis_request_queue_read_cb(redis_request_queue *self, const char *buf, size_t len) {
|
||||
ngx_queue_t *q;
|
||||
redis_request *req;
|
||||
redis_protocol *p;
|
||||
size_t nparsed;
|
||||
int n, done;
|
||||
|
||||
while (len) {
|
||||
p = NULL;
|
||||
nparsed = redis_parser_execute(&self->parser, &p, buf, len);
|
||||
assert(!ngx_queue_empty(&self->request_wait_read));
|
||||
q = ngx_queue_last(&self->request_wait_read);
|
||||
req = ngx_queue_data(q, redis_request, queue);
|
||||
done = 0;
|
||||
|
||||
/* Test for parse error */
|
||||
if (nparsed < len && p == NULL) {
|
||||
errno = redis_parser_err(&self->parser);
|
||||
return REDIS_EPARSER;
|
||||
/* Fire read callback */
|
||||
n = req->read_cb(req, buf, len, &done);
|
||||
if (n < 0) {
|
||||
return n;
|
||||
}
|
||||
|
||||
if (!ngx_queue_empty(&self->request_wait_read)) {
|
||||
q = ngx_queue_last(&self->request_wait_read);
|
||||
req = ngx_queue_data(q, redis_request, queue);
|
||||
|
||||
/* Fire raw read callback when defined */
|
||||
if (req->read_raw_cb) {
|
||||
req->read_raw_cb(req, buf, nparsed);
|
||||
}
|
||||
|
||||
/* Fire read callback when the parser produced something */
|
||||
if (p != NULL) {
|
||||
int done = 0;
|
||||
|
||||
req->read_cb(req, p, &done);
|
||||
if (done) {
|
||||
ngx_queue_remove(q);
|
||||
req->free(req);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
assert(NULL && "todo");
|
||||
if (done) {
|
||||
ngx_queue_remove(q);
|
||||
req->read_cb_done = 1;
|
||||
req->free(req);
|
||||
}
|
||||
|
||||
buf += nparsed;
|
||||
len -= nparsed;
|
||||
buf += n;
|
||||
len -= n;
|
||||
}
|
||||
|
||||
return REDIS_OK;
|
||||
|
||||
34
request.h
34
request.h
@ -7,6 +7,8 @@
|
||||
|
||||
typedef struct redis_request_s redis_request;
|
||||
|
||||
typedef struct redis_request_queue_s redis_request_queue;
|
||||
|
||||
/*
|
||||
* Obtain a char* to a buffer representing (some part of) the request.
|
||||
*
|
||||
@ -27,7 +29,7 @@ typedef void (redis_request_write_ptr)(redis_request *self,
|
||||
* n the number of bytes written
|
||||
*
|
||||
* Return:
|
||||
* the number of bytes (< n) that could be accounted for
|
||||
* the number of bytes (<= n) actually used
|
||||
*/
|
||||
typedef int (redis_request_write_cb)(redis_request *self,
|
||||
int n);
|
||||
@ -41,24 +43,15 @@ typedef int (redis_request_write_cb)(redis_request *self,
|
||||
* self the request as previously inserted in the queue
|
||||
* buf buffer with reply data
|
||||
* len length of buffer with reply data
|
||||
*/
|
||||
typedef void (redis_request_read_raw_cb)(redis_request *self,
|
||||
const char *buf,
|
||||
size_t len);
|
||||
|
||||
/*
|
||||
* Let the request know that a full reply was read on its behalf. This
|
||||
* function is called when the protocol parser parsed a full reply and this
|
||||
* request is on the head of the list of requests waiting for a reply.
|
||||
* done set to non-zero by the request when it doesn't need more bytes
|
||||
*
|
||||
* Arguments:
|
||||
* self the request as previously inserted in the queue
|
||||
* reply reply as read by the parser
|
||||
* done set to non-zero by the request when this is the last reply
|
||||
* Return:
|
||||
* the number of bytes (<= n) actually used
|
||||
*/
|
||||
typedef void (redis_request_read_cb)(redis_request *self,
|
||||
redis_protocol *reply,
|
||||
int *done);
|
||||
typedef int (redis_request_read_cb)(redis_request *self,
|
||||
const char *buf,
|
||||
size_t len,
|
||||
int *done);
|
||||
|
||||
/*
|
||||
* Free the request. This function is called after the last reply has been
|
||||
@ -72,9 +65,12 @@ typedef void (redis_request_free)(redis_request *self);
|
||||
|
||||
#define REDIS_REQUEST_COMMON \
|
||||
ngx_queue_t queue; \
|
||||
redis_request_queue *request_queue; \
|
||||
unsigned write_ptr_done:1; \
|
||||
unsigned write_cb_done:1; \
|
||||
unsigned read_cb_done:1; \
|
||||
redis_request_write_ptr *write_ptr; \
|
||||
redis_request_write_cb *write_cb; \
|
||||
redis_request_read_raw_cb *read_raw_cb; \
|
||||
redis_request_read_cb *read_cb; \
|
||||
redis_request_free *free;
|
||||
|
||||
@ -82,8 +78,6 @@ struct redis_request_s {
|
||||
REDIS_REQUEST_COMMON
|
||||
};
|
||||
|
||||
typedef struct redis_request_queue_s redis_request_queue;
|
||||
|
||||
/*
|
||||
* These functions are called whenever a request is inserted in a new queue.
|
||||
* When a request is initially inserted via the `redis_request_queue_insert`
|
||||
|
||||
@ -171,16 +171,28 @@ int t1_write_cb(redis_request *_self, int n) {
|
||||
return to_write;
|
||||
}
|
||||
|
||||
void t1_read_raw_cb(redis_request *_self, const char *buf, size_t len) {
|
||||
int t1_read_cb(redis_request *_self, const char *buf, size_t len, int *done) {
|
||||
t1_redis_request *self = (t1_redis_request*)_self;
|
||||
self->read_raw_buf = buf;
|
||||
self->read_raw_len = len;
|
||||
}
|
||||
redis_protocol *p = NULL;
|
||||
size_t nparsed;
|
||||
|
||||
void t1_read_cb(redis_request *_self, redis_protocol *reply, int *done) {
|
||||
t1_redis_request *self = (t1_redis_request*)_self;
|
||||
self->reply = reply;
|
||||
*done = 1;
|
||||
nparsed = redis_parser_execute(&self->request_queue->parser, &p, buf, len);
|
||||
|
||||
/* Test for parse error */
|
||||
if (nparsed < len && p == NULL) {
|
||||
errno = redis_parser_err(&self->request_queue->parser);
|
||||
return REDIS_EPARSER;
|
||||
}
|
||||
|
||||
if (p != NULL) {
|
||||
self->reply = p;
|
||||
*done = 1;
|
||||
}
|
||||
|
||||
self->read_raw_buf = buf;
|
||||
self->read_raw_len = nparsed;
|
||||
|
||||
return nparsed;
|
||||
}
|
||||
|
||||
void t1_free(redis_request *_self) {
|
||||
@ -194,7 +206,6 @@ void t1_init(t1_redis_request *self) {
|
||||
|
||||
self->write_ptr = t1_write_ptr;
|
||||
self->write_cb = t1_write_cb;
|
||||
self->read_raw_cb = t1_read_raw_cb;
|
||||
self->read_cb = t1_read_cb;
|
||||
self->free = t1_free;
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user