Add timeout support to libhv adapter. (#1109)
Add timeout support to libhv adapter. See: #904
This commit is contained in:
parent
722e3409c7
commit
68b29e1ad5
@ -5,6 +5,11 @@
|
|||||||
#include "../hiredis.h"
|
#include "../hiredis.h"
|
||||||
#include "../async.h"
|
#include "../async.h"
|
||||||
|
|
||||||
|
typedef struct redisLibhvEvents {
|
||||||
|
hio_t *io;
|
||||||
|
htimer_t *timer;
|
||||||
|
} redisLibhvEvents;
|
||||||
|
|
||||||
static void redisLibhvHandleEvents(hio_t* io) {
|
static void redisLibhvHandleEvents(hio_t* io) {
|
||||||
redisAsyncContext* context = (redisAsyncContext*)hevent_userdata(io);
|
redisAsyncContext* context = (redisAsyncContext*)hevent_userdata(io);
|
||||||
int events = hio_events(io);
|
int events = hio_events(io);
|
||||||
@ -18,51 +23,100 @@ static void redisLibhvHandleEvents(hio_t* io) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void redisLibhvAddRead(void *privdata) {
|
static void redisLibhvAddRead(void *privdata) {
|
||||||
hio_t* io = (hio_t*)privdata;
|
redisLibhvEvents* events = (redisLibhvEvents*)privdata;
|
||||||
hio_add(io, redisLibhvHandleEvents, HV_READ);
|
hio_add(events->io, redisLibhvHandleEvents, HV_READ);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void redisLibhvDelRead(void *privdata) {
|
static void redisLibhvDelRead(void *privdata) {
|
||||||
hio_t* io = (hio_t*)privdata;
|
redisLibhvEvents* events = (redisLibhvEvents*)privdata;
|
||||||
hio_del(io, HV_READ);
|
hio_del(events->io, HV_READ);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void redisLibhvAddWrite(void *privdata) {
|
static void redisLibhvAddWrite(void *privdata) {
|
||||||
hio_t* io = (hio_t*)privdata;
|
redisLibhvEvents* events = (redisLibhvEvents*)privdata;
|
||||||
hio_add(io, redisLibhvHandleEvents, HV_WRITE);
|
hio_add(events->io, redisLibhvHandleEvents, HV_WRITE);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void redisLibhvDelWrite(void *privdata) {
|
static void redisLibhvDelWrite(void *privdata) {
|
||||||
hio_t* io = (hio_t*)privdata;
|
redisLibhvEvents* events = (redisLibhvEvents*)privdata;
|
||||||
hio_del(io, HV_WRITE);
|
hio_del(events->io, HV_WRITE);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void redisLibhvCleanup(void *privdata) {
|
static void redisLibhvCleanup(void *privdata) {
|
||||||
hio_t* io = (hio_t*)privdata;
|
redisLibhvEvents* events = (redisLibhvEvents*)privdata;
|
||||||
hio_close(io);
|
|
||||||
hevent_set_userdata(io, NULL);
|
if (events->timer)
|
||||||
|
htimer_del(events->timer);
|
||||||
|
|
||||||
|
hio_close(events->io);
|
||||||
|
hevent_set_userdata(events->io, NULL);
|
||||||
|
|
||||||
|
hi_free(events);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void redisLibhvTimeout(htimer_t* timer) {
|
||||||
|
hio_t* io = (hio_t*)hevent_userdata(timer);
|
||||||
|
redisAsyncHandleTimeout(hevent_userdata(io));
|
||||||
|
}
|
||||||
|
|
||||||
|
static void redisLibhvSetTimeout(void *privdata, struct timeval tv) {
|
||||||
|
redisLibhvEvents* events;
|
||||||
|
uint32_t millis;
|
||||||
|
hloop_t* loop;
|
||||||
|
|
||||||
|
events = (redisLibhvEvents*)privdata;
|
||||||
|
millis = tv.tv_sec * 1000 + tv.tv_usec / 1000;
|
||||||
|
|
||||||
|
if (millis == 0) {
|
||||||
|
/* Libhv disallows zero'd timers so treat this as a delete or NO OP */
|
||||||
|
if (events->timer) {
|
||||||
|
htimer_del(events->timer);
|
||||||
|
events->timer = NULL;
|
||||||
|
}
|
||||||
|
} else if (events->timer == NULL) {
|
||||||
|
/* Add new timer */
|
||||||
|
loop = hevent_loop(events->io);
|
||||||
|
events->timer = htimer_add(loop, redisLibhvTimeout, millis, 1);
|
||||||
|
hevent_set_userdata(events->timer, events->io);
|
||||||
|
} else {
|
||||||
|
/* Update existing timer */
|
||||||
|
htimer_reset(events->timer, millis);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int redisLibhvAttach(redisAsyncContext* ac, hloop_t* loop) {
|
static int redisLibhvAttach(redisAsyncContext* ac, hloop_t* loop) {
|
||||||
redisContext *c = &(ac->c);
|
redisContext *c = &(ac->c);
|
||||||
|
redisLibhvEvents *events;
|
||||||
hio_t* io = NULL;
|
hio_t* io = NULL;
|
||||||
|
|
||||||
if (ac->ev.data != NULL) {
|
if (ac->ev.data != NULL) {
|
||||||
return REDIS_ERR;
|
return REDIS_ERR;
|
||||||
}
|
}
|
||||||
|
|
||||||
io = hio_get(loop, c->fd);
|
/* Create container struct to keep track of our io and any timer */
|
||||||
if (io == NULL) {
|
events = hi_malloc(sizeof(*events));
|
||||||
|
if (events == NULL) {
|
||||||
return REDIS_ERR;
|
return REDIS_ERR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
io = hio_get(loop, c->fd);
|
||||||
|
if (io == NULL) {
|
||||||
|
hi_free(events);
|
||||||
|
return REDIS_ERR;
|
||||||
|
}
|
||||||
|
|
||||||
hevent_set_userdata(io, ac);
|
hevent_set_userdata(io, ac);
|
||||||
|
|
||||||
|
events->io = io;
|
||||||
|
events->timer = NULL;
|
||||||
|
|
||||||
ac->ev.addRead = redisLibhvAddRead;
|
ac->ev.addRead = redisLibhvAddRead;
|
||||||
ac->ev.delRead = redisLibhvDelRead;
|
ac->ev.delRead = redisLibhvDelRead;
|
||||||
ac->ev.addWrite = redisLibhvAddWrite;
|
ac->ev.addWrite = redisLibhvAddWrite;
|
||||||
ac->ev.delWrite = redisLibhvDelWrite;
|
ac->ev.delWrite = redisLibhvDelWrite;
|
||||||
ac->ev.cleanup = redisLibhvCleanup;
|
ac->ev.cleanup = redisLibhvCleanup;
|
||||||
ac->ev.data = io;
|
ac->ev.scheduleTimer = redisLibhvSetTimeout;
|
||||||
|
ac->ev.data = events;
|
||||||
|
|
||||||
return REDIS_OK;
|
return REDIS_OK;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -16,6 +16,18 @@ void getCallback(redisAsyncContext *c, void *r, void *privdata) {
|
|||||||
redisAsyncDisconnect(c);
|
redisAsyncDisconnect(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void debugCallback(redisAsyncContext *c, void *r, void *privdata) {
|
||||||
|
(void)privdata;
|
||||||
|
redisReply *reply = r;
|
||||||
|
|
||||||
|
if (reply == NULL) {
|
||||||
|
printf("`DEBUG SLEEP` error: %s\n", c->errstr ? c->errstr : "unknown error");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
redisAsyncDisconnect(c);
|
||||||
|
}
|
||||||
|
|
||||||
void connectCallback(const redisAsyncContext *c, int status) {
|
void connectCallback(const redisAsyncContext *c, int status) {
|
||||||
if (status != REDIS_OK) {
|
if (status != REDIS_OK) {
|
||||||
printf("Error: %s\n", c->errstr);
|
printf("Error: %s\n", c->errstr);
|
||||||
@ -46,10 +58,13 @@ int main (int argc, char **argv) {
|
|||||||
|
|
||||||
hloop_t* loop = hloop_new(HLOOP_FLAG_QUIT_WHEN_NO_ACTIVE_EVENTS);
|
hloop_t* loop = hloop_new(HLOOP_FLAG_QUIT_WHEN_NO_ACTIVE_EVENTS);
|
||||||
redisLibhvAttach(c, loop);
|
redisLibhvAttach(c, loop);
|
||||||
|
redisAsyncSetTimeout(c, (struct timeval){.tv_sec = 0, .tv_usec = 500000});
|
||||||
redisAsyncSetConnectCallback(c,connectCallback);
|
redisAsyncSetConnectCallback(c,connectCallback);
|
||||||
redisAsyncSetDisconnectCallback(c,disconnectCallback);
|
redisAsyncSetDisconnectCallback(c,disconnectCallback);
|
||||||
redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1]));
|
redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1]));
|
||||||
redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key");
|
redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key");
|
||||||
|
redisAsyncCommand(c, debugCallback, NULL, "DEBUG SLEEP %d", 1);
|
||||||
hloop_run(loop);
|
hloop_run(loop);
|
||||||
|
hloop_free(&loop);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user