diff --git a/request.c b/request.c index ad5f277..456015a 100644 --- a/request.c +++ b/request.c @@ -44,6 +44,7 @@ int redis_request_queue_destroy(redis_request_queue *self) { } void redis_request_queue_insert(redis_request_queue *self, redis_request *request) { + request->request_queue = self; ngx_queue_insert_head(&self->request_to_write, &request->queue); if (self->request_to_write_cb) { @@ -168,44 +169,28 @@ int redis_request_queue_write_cb(redis_request_queue *self, int n) { int redis_request_queue_read_cb(redis_request_queue *self, const char *buf, size_t len) { ngx_queue_t *q; redis_request *req; - redis_protocol *p; - size_t nparsed; + int n, done; while (len) { - p = NULL; - nparsed = redis_parser_execute(&self->parser, &p, buf, len); + assert(!ngx_queue_empty(&self->request_wait_read)); + q = ngx_queue_last(&self->request_wait_read); + req = ngx_queue_data(q, redis_request, queue); + done = 0; - /* Test for parse error */ - if (nparsed < len && p == NULL) { - errno = redis_parser_err(&self->parser); - return REDIS_EPARSER; + /* Fire read callback */ + n = req->read_cb(req, buf, len, &done); + if (n < 0) { + return n; } - if (!ngx_queue_empty(&self->request_wait_read)) { - q = ngx_queue_last(&self->request_wait_read); - req = ngx_queue_data(q, redis_request, queue); - - /* Fire raw read callback when defined */ - if (req->read_raw_cb) { - req->read_raw_cb(req, buf, nparsed); - } - - /* Fire read callback when the parser produced something */ - if (p != NULL) { - int done = 0; - - req->read_cb(req, p, &done); - if (done) { - ngx_queue_remove(q); - req->free(req); - } - } - } else { - assert(NULL && "todo"); + if (done) { + ngx_queue_remove(q); + req->read_cb_done = 1; + req->free(req); } - buf += nparsed; - len -= nparsed; + buf += n; + len -= n; } return REDIS_OK; diff --git a/request.h b/request.h index 4f56c44..cba26f4 100644 --- a/request.h +++ b/request.h @@ -7,6 +7,8 @@ typedef struct redis_request_s redis_request; +typedef struct redis_request_queue_s redis_request_queue; + /* * Obtain a char* to a buffer representing (some part of) the request. * @@ -27,7 +29,7 @@ typedef void (redis_request_write_ptr)(redis_request *self, * n the number of bytes written * * Return: - * the number of bytes (< n) that could be accounted for + * the number of bytes (<= n) actually used */ typedef int (redis_request_write_cb)(redis_request *self, int n); @@ -41,24 +43,15 @@ typedef int (redis_request_write_cb)(redis_request *self, * self the request as previously inserted in the queue * buf buffer with reply data * len length of buffer with reply data - */ -typedef void (redis_request_read_raw_cb)(redis_request *self, - const char *buf, - size_t len); - -/* - * Let the request know that a full reply was read on its behalf. This - * function is called when the protocol parser parsed a full reply and this - * request is on the head of the list of requests waiting for a reply. + * done set to non-zero by the request when it doesn't need more bytes * - * Arguments: - * self the request as previously inserted in the queue - * reply reply as read by the parser - * done set to non-zero by the request when this is the last reply + * Return: + * the number of bytes (<= n) actually used */ -typedef void (redis_request_read_cb)(redis_request *self, - redis_protocol *reply, - int *done); +typedef int (redis_request_read_cb)(redis_request *self, + const char *buf, + size_t len, + int *done); /* * Free the request. This function is called after the last reply has been @@ -72,9 +65,12 @@ typedef void (redis_request_free)(redis_request *self); #define REDIS_REQUEST_COMMON \ ngx_queue_t queue; \ + redis_request_queue *request_queue; \ + unsigned write_ptr_done:1; \ + unsigned write_cb_done:1; \ + unsigned read_cb_done:1; \ redis_request_write_ptr *write_ptr; \ redis_request_write_cb *write_cb; \ - redis_request_read_raw_cb *read_raw_cb; \ redis_request_read_cb *read_cb; \ redis_request_free *free; @@ -82,8 +78,6 @@ struct redis_request_s { REDIS_REQUEST_COMMON }; -typedef struct redis_request_queue_s redis_request_queue; - /* * These functions are called whenever a request is inserted in a new queue. * When a request is initially inserted via the `redis_request_queue_insert` diff --git a/test/test-request.c b/test/test-request.c index ee41062..93b310a 100644 --- a/test/test-request.c +++ b/test/test-request.c @@ -171,16 +171,28 @@ int t1_write_cb(redis_request *_self, int n) { return to_write; } -void t1_read_raw_cb(redis_request *_self, const char *buf, size_t len) { +int t1_read_cb(redis_request *_self, const char *buf, size_t len, int *done) { t1_redis_request *self = (t1_redis_request*)_self; - self->read_raw_buf = buf; - self->read_raw_len = len; -} + redis_protocol *p = NULL; + size_t nparsed; -void t1_read_cb(redis_request *_self, redis_protocol *reply, int *done) { - t1_redis_request *self = (t1_redis_request*)_self; - self->reply = reply; - *done = 1; + nparsed = redis_parser_execute(&self->request_queue->parser, &p, buf, len); + + /* Test for parse error */ + if (nparsed < len && p == NULL) { + errno = redis_parser_err(&self->request_queue->parser); + return REDIS_EPARSER; + } + + if (p != NULL) { + self->reply = p; + *done = 1; + } + + self->read_raw_buf = buf; + self->read_raw_len = nparsed; + + return nparsed; } void t1_free(redis_request *_self) { @@ -194,7 +206,6 @@ void t1_init(t1_redis_request *self) { self->write_ptr = t1_write_ptr; self->write_cb = t1_write_cb; - self->read_raw_cb = t1_read_raw_cb; self->read_cb = t1_read_cb; self->free = t1_free; }