From 980b3c259c330903e4aa457aec1bc9345ff4683d Mon Sep 17 00:00:00 2001 From: Alex 'AdUser' Z Date: Sat, 24 Sep 2016 00:14:11 +1000 Subject: [PATCH] * sources/redis.c : implement next() --- src/sources/redis.c | 37 +++++++++++++++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/src/sources/redis.c b/src/sources/redis.c index d3a6d24..b4f7461 100644 --- a/src/sources/redis.c +++ b/src/sources/redis.c @@ -189,13 +189,46 @@ stop(cfg_t *cfg) { bool next(cfg_t *cfg, char *buf, size_t bufsize, bool reset) { + bool gotit = false; assert(cfg != NULL); assert(buf != NULL); assert(bufsize > 0); - assert(0); /* TODO */ + (void)(reset); /* suppress warning */ - return false; + if (!cfg->conn || cfg->conn->err) + redis_connect(cfg); + if (!cfg->conn) + return false; /* reconnect failure */ + + if (cfg->conn->err) { + snprintf(cfg->error, sizeof(cfg->error), "connection error: %s", cfg->conn->errstr); + return false; + } + + redisReply *reply = NULL; + if (redisGetReply(cfg->conn, (void **) &reply) == REDIS_OK) { + if (reply->type == REDIS_REPLY_ARRAY) { + if (strcmp(reply->element[0]->str, "message") == 0 || + strcmp(reply->element[1]->str, cfg->hash) == 0) { + strlcpy(buf, reply->element[2]->str, bufsize); + gotit = true; + } else { + cfg->errcb(cfg->error); + } + } else { + strlcpy(cfg->error, "reply is not a array type", sizeof(cfg->error)); + cfg->errcb(cfg->error); + } + freeReplyObject(reply); + } else if (cfg->conn->err == REDIS_ERR_IO && errno == EAGAIN) { + cfg->conn->err = 0; /* reset error to prevent reconnecting */ + } else { + snprintf(cfg->error, sizeof(cfg->error), "can't get reply from server %s: %s", cfg->host, cfg->conn->errstr); + cfg->errcb(cfg->error); + } + + return gotit; } void