Support timeouts in libev adapater (#795)
Add support for timeouts in our libev adapter. See #795
This commit is contained in:
parent
2cb203c1e9
commit
994d2fd77d
@ -41,6 +41,7 @@ typedef struct redisLibevEvents {
|
|||||||
struct ev_loop *loop;
|
struct ev_loop *loop;
|
||||||
int reading, writing;
|
int reading, writing;
|
||||||
ev_io rev, wev;
|
ev_io rev, wev;
|
||||||
|
ev_timer timer;
|
||||||
} redisLibevEvents;
|
} redisLibevEvents;
|
||||||
|
|
||||||
static void redisLibevReadEvent(EV_P_ ev_io *watcher, int revents) {
|
static void redisLibevReadEvent(EV_P_ ev_io *watcher, int revents) {
|
||||||
@ -103,13 +104,41 @@ static void redisLibevDelWrite(void *privdata) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void redisLibevStopTimer(void *privdata) {
|
||||||
|
redisLibevEvents *e = (redisLibevEvents*)privdata;
|
||||||
|
struct ev_loop *loop = e->loop;
|
||||||
|
((void)loop);
|
||||||
|
ev_timer_stop(EV_A_ &e->timer);
|
||||||
|
}
|
||||||
|
|
||||||
static void redisLibevCleanup(void *privdata) {
|
static void redisLibevCleanup(void *privdata) {
|
||||||
redisLibevEvents *e = (redisLibevEvents*)privdata;
|
redisLibevEvents *e = (redisLibevEvents*)privdata;
|
||||||
redisLibevDelRead(privdata);
|
redisLibevDelRead(privdata);
|
||||||
redisLibevDelWrite(privdata);
|
redisLibevDelWrite(privdata);
|
||||||
|
redisLibevStopTimer(privdata);
|
||||||
free(e);
|
free(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void redisLibevTimeout(EV_P_ ev_timer *timer, int revents) {
|
||||||
|
((void)revents);
|
||||||
|
redisLibevEvents *e = timer->data;
|
||||||
|
redisAsyncHandleTimeout(e->context);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void redisLibevSetTimeout(void *privdata, struct timeval tv) {
|
||||||
|
redisLibevEvents *e = privdata;
|
||||||
|
struct ev_loop *loop = e->loop;
|
||||||
|
((void)loop);
|
||||||
|
|
||||||
|
if (!ev_is_active(&e->timer)) {
|
||||||
|
ev_init(&e->timer, redisLibevTimeout);
|
||||||
|
e->timer.data = e;
|
||||||
|
}
|
||||||
|
|
||||||
|
e->timer.repeat = tv.tv_sec + tv.tv_usec / 1000000.00;
|
||||||
|
ev_timer_again(EV_A_ &e->timer);
|
||||||
|
}
|
||||||
|
|
||||||
static int redisLibevAttach(EV_P_ redisAsyncContext *ac) {
|
static int redisLibevAttach(EV_P_ redisAsyncContext *ac) {
|
||||||
redisContext *c = &(ac->c);
|
redisContext *c = &(ac->c);
|
||||||
redisLibevEvents *e;
|
redisLibevEvents *e;
|
||||||
@ -119,14 +148,13 @@ static int redisLibevAttach(EV_P_ redisAsyncContext *ac) {
|
|||||||
return REDIS_ERR;
|
return REDIS_ERR;
|
||||||
|
|
||||||
/* Create container for context and r/w events */
|
/* Create container for context and r/w events */
|
||||||
e = (redisLibevEvents*)hi_malloc(sizeof(*e));
|
e = (redisLibevEvents*)hi_calloc(1, sizeof(*e));
|
||||||
e->context = ac;
|
e->context = ac;
|
||||||
#if EV_MULTIPLICITY
|
#if EV_MULTIPLICITY
|
||||||
e->loop = loop;
|
e->loop = loop;
|
||||||
#else
|
#else
|
||||||
e->loop = NULL;
|
e->loop = NULL;
|
||||||
#endif
|
#endif
|
||||||
e->reading = e->writing = 0;
|
|
||||||
e->rev.data = e;
|
e->rev.data = e;
|
||||||
e->wev.data = e;
|
e->wev.data = e;
|
||||||
|
|
||||||
@ -136,6 +164,7 @@ static int redisLibevAttach(EV_P_ redisAsyncContext *ac) {
|
|||||||
ac->ev.addWrite = redisLibevAddWrite;
|
ac->ev.addWrite = redisLibevAddWrite;
|
||||||
ac->ev.delWrite = redisLibevDelWrite;
|
ac->ev.delWrite = redisLibevDelWrite;
|
||||||
ac->ev.cleanup = redisLibevCleanup;
|
ac->ev.cleanup = redisLibevCleanup;
|
||||||
|
ac->ev.scheduleTimer = redisLibevSetTimeout;
|
||||||
ac->ev.data = e;
|
ac->ev.data = e;
|
||||||
|
|
||||||
/* Initialize read/write events */
|
/* Initialize read/write events */
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user