sharded pubsub support

This commit is contained in:
Leo P 2022-05-17 00:15:24 -04:00
parent 95a0c1283a
commit e9634042f3

15
async.c
View File

@ -414,7 +414,7 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
dict *callbacks;
redisCallback *cb = NULL;
dictEntry *de;
int pvariant;
int pvariant, svariant;
char *stype;
sds sname = NULL;
@ -426,7 +426,7 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
assert(reply->element[0]->type == REDIS_REPLY_STRING);
stype = reply->element[0]->str;
pvariant = (tolower(stype[0]) == 'p') ? 1 : 0;
svariant = (reply->element[0]->len > 3 && (strncasecmp(stype, "ssu"/*bscrscribe*/, 3) == 0 || strncasecmp(stype, "sun"/*subscribe*/, 3) == 0)) ? 1 : 0;
if (pvariant)
callbacks = ac->sub.patterns;
else
@ -444,11 +444,11 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
}
/* If this is an subscribe reply decrease pending counter. */
if (strcasecmp(stype+pvariant,"subscribe") == 0) {
if (strcasecmp(stype+pvariant+svariant,"subscribe") == 0) {
assert(cb != NULL);
cb->pending_subs -= 1;
} else if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
} else if (strcasecmp(stype+pvariant+svariant,"unsubscribe") == 0) {
if (cb == NULL)
ac->sub.pending_unsubs -= 1;
else if (cb->pending_subs == 0)
@ -762,7 +762,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
dictIterator it;
dictEntry *de;
redisCallback *existcb;
int pvariant, hasnext;
int pvariant, svariant, hasnext;
const char *cstr, *astr;
size_t clen, alen;
const char *p;
@ -783,8 +783,9 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
assert(p != NULL);
hasnext = (p[0] == '$');
pvariant = (tolower(cstr[0]) == 'p') ? 1 : 0;
cstr += pvariant;
clen -= pvariant;
svariant = clen > 3 && (strncasecmp(cstr, "ssu"/*bscribe*/, 3) == 0 || strncasecmp(cstr, "sun"/*subscribe*/, 3) == 0);
cstr += pvariant + svariant;
clen -= pvariant + svariant;
if (hasnext && strncasecmp(cstr,"subscribe\r\n",11) == 0) {
c->flags |= REDIS_SUBSCRIBED;