suricata
util-log-redis.c
Go to the documentation of this file.
1/* Copyright (C) 2007-2024 Open Information Security Foundation
2 *
3 * You can copy, redistribute or modify this Program under the terms of
4 * the GNU General Public License version 2 as published by the Free
5 * Software Foundation.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * version 2 along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15 * 02110-1301, USA.
16 */
17
18/**
19 * \file
20 *
21 * \author Paulo Pacheco <fooinha@gmail.com>
22 *
23 * File-like output for logging: redis
24 */
25#include "suricata-common.h" /* errno.h, string.h, etc. */
26#include "util-log-redis.h"
27#include "util-logopenfile.h"
28#include "util-byte.h"
29#include "util-debug.h"
30
31#ifdef HAVE_LIBHIREDIS
32
33#ifdef HAVE_LIBEVENT_PTHREADS
34#include <event2/thread.h>
35#endif /* HAVE_LIBEVENT_PTHREADS */
36
37static const char *redis_lpush_cmd = "LPUSH";
38static const char *redis_rpush_cmd = "RPUSH";
39static const char *redis_publish_cmd = "PUBLISH";
40static const char *redis_xadd_cmd = "XADD";
41static const char *redis_default_key = "suricata";
42static const char *redis_default_server = "127.0.0.1";
43static const char *redis_default_format = "%s %s %s";
44static const char *redis_stream_format = "%s %s * eve %s";
45static const char *redis_stream_format_maxlen_tmpl = "%s %s MAXLEN %c %d * eve %s";
46
47static int SCConfLogReopenSyncRedis(LogFileCtx *log_ctx);
48static void SCLogFileCloseRedis(LogFileCtx *log_ctx);
49
50#define REDIS_MAX_STREAM_LENGTH_DEFAULT 100000
51
52/**
53 * \brief SCLogRedisInit() - Initializes global stuff before threads
54 */
55void SCLogRedisInit(void)
56{
57#ifdef HAVE_LIBEVENT_PTHREADS
58 evthread_use_pthreads();
59#endif /* HAVE_LIBEVENT_PTHREADS */
60}
61
62/** \brief SCLogRedisContextAlloc() - Allocates and initializes redis context
63 */
64static SCLogRedisContext *SCLogRedisContextAlloc(void)
65{
66 SCLogRedisContext* ctx = (SCLogRedisContext*) SCCalloc(1, sizeof(SCLogRedisContext));
67 if (ctx == NULL) {
68 FatalError("Unable to allocate redis context");
69 }
70 ctx->sync = NULL;
71#if HAVE_LIBEVENT
72 ctx->ev_base = NULL;
73 ctx->async = NULL;
74#endif
75 ctx->batch_count = 0;
76 ctx->last_push = 0;
77 ctx->tried = 0;
78
79 return ctx;
80}
81
82#ifdef HAVE_LIBEVENT
83
84static int SCConfLogReopenAsyncRedis(LogFileCtx *log_ctx);
85#include <hiredis/adapters/libevent.h>
86
87/** \brief SCLogRedisAsyncContextAlloc() - Allocates and initializes redis context with async
88 */
89static SCLogRedisContext *SCLogRedisContextAsyncAlloc(void)
90{
91 SCLogRedisContext* ctx = (SCLogRedisContext*) SCCalloc(1, sizeof(SCLogRedisContext));
92 if (unlikely(ctx == NULL)) {
93 FatalError("Unable to allocate redis context");
94 }
95
96 ctx->sync = NULL;
97 ctx->async = NULL;
98 ctx->ev_base = NULL;
99 ctx->connected = 0;
100 ctx->batch_count = 0;
101 ctx->last_push = 0;
102 ctx->tried = 0;
103
104 return ctx;
105}
106
107/** \brief SCRedisAsyncCommandCallback() Callback when reply from redis happens.
108 * \param ac redis async context
109 * \param r redis reply
110 * \param privdata opaque data with pointer to LogFileCtx
111 */
112static void SCRedisAsyncCommandCallback(redisAsyncContext *ac, void *r, void *privdata)
113{
114 redisReply *reply = r;
115 LogFileCtx *log_ctx = privdata;
116 SCLogRedisContext *ctx = log_ctx->redis;
117
118 if (reply == NULL) {
119 if (ctx->connected > 0)
120 SCLogInfo("Missing reply from redis, disconnected.");
121 ctx->connected = 0;
122 } else {
123 ctx->connected = 1;
124 event_base_loopbreak(ctx->ev_base);
125 }
126}
127
128/** \brief SCRedisAsyncEchoCommandCallback() Callback for an ECHO command reply
129 * This is used to check if redis is connected.
130 * \param ac redis async context
131 * \param r redis reply
132 * \param privdata opaque data with pointer to LogFileCtx
133 */
134static void SCRedisAsyncEchoCommandCallback(redisAsyncContext *ac, void *r, void *privdata)
135{
136 redisReply *reply = r;
137 SCLogRedisContext * ctx = privdata;
138
139 if (reply) {
140 if (ctx->connected == 0) {
141 SCLogNotice("Connected to Redis.");
142 ctx->connected = 1;
143 ctx->tried = 0;
144 }
145 } else {
146 ctx->connected = 0;
147 if (ctx->tried == 0) {
148 SCLogWarning("Failed to connect to Redis... (will keep trying)");
149 }
150 ctx->tried = time(NULL);
151 }
152 event_base_loopbreak(ctx->ev_base);
153}
154
155/** \brief SCRedisAsyncEchoCommandCallback() Callback for an QUIT command reply
156 * Emits and awaits response for an async ECHO command.
157 * It's used for check if redis is alive.
158 * \param ctx redis context
159 */
160static void SCLogAsyncRedisSendEcho(SCLogRedisContext * ctx)
161{
162 redisAsyncCommand(ctx->async, SCRedisAsyncEchoCommandCallback, ctx, "ECHO suricata");
163 event_base_dispatch(ctx->ev_base);
164}
165
166/** \brief SCRedisAsyncEchoCommandCallback() Callback for an QUIT command reply
167 * This is used to terminate connection with redis.
168 * \param ac redis async context
169 * \param r redis reply
170 * \param privdata opaque data with pointer to LogFileCtx
171 */
172static void SCRedisAsyncQuitCommandCallback(redisAsyncContext *ac, void *r, void *privdata)
173{
174 SCLogInfo("Disconnecting from redis!");
175}
176
177/** \brief QUIT command
178 * Emits and awaits response for an async QUIT command.
179 * It's used to disconnect with redis
180 * \param ctx redis context
181 */
182static void SCLogAsyncRedisSendQuit(SCLogRedisContext * ctx)
183{
184 if (ctx->connected) {
185 redisAsyncCommand(ctx->async, SCRedisAsyncQuitCommandCallback, ctx, "QUIT");
186 SCLogInfo("QUIT Command sent to redis. Connection will terminate!");
187 }
188
189 redisAsyncFree(ctx->async);
190 event_base_dispatch(ctx->ev_base);
191 ctx->async = NULL;
192 event_base_free(ctx->ev_base);
193 ctx->ev_base = NULL;
194 ctx->connected = 0;
195}
196
197/** \brief SCConfLogReopenAsyncRedis() Open or re-opens connection to redis for logging.
198 * \param log_ctx Log file context allocated by caller
199 */
200static int SCConfLogReopenAsyncRedis(LogFileCtx *log_ctx)
201{
202 SCLogRedisContext * ctx = log_ctx->redis;
203 const char *redis_server = log_ctx->redis_setup.server;
204 int redis_port = log_ctx->redis_setup.port;
205
206 /* only try to reconnect once per second */
207 if (ctx->tried >= time(NULL)) {
208 return -1;
209 }
210
211 if (strchr(redis_server, '/') == NULL) {
212 ctx->async = redisAsyncConnect(redis_server, redis_port);
213 } else {
214 ctx->async = redisAsyncConnectUnix(redis_server);
215 }
216
217 if (ctx->ev_base != NULL) {
218 event_base_free(ctx->ev_base);
219 ctx->ev_base = NULL;
220 }
221
222 if (ctx->async == NULL) {
223 SCLogError("Error allocate redis async.");
224 ctx->tried = time(NULL);
225 return -1;
226 }
227
228 if (ctx->async != NULL && ctx->async->err) {
229 SCLogError("Error setting to redis async: [%s].", ctx->async->errstr);
230 ctx->tried = time(NULL);
231 return -1;
232 }
233
234 ctx->ev_base = event_base_new();
235
236 if (ctx->ev_base == NULL) {
237 ctx->tried = time(NULL);
238 redisAsyncFree(ctx->async);
239 ctx->async = NULL;
240 return -1;
241 }
242
243 redisLibeventAttach(ctx->async, ctx->ev_base);
244
245 log_ctx->redis = ctx;
246 log_ctx->Close = SCLogFileCloseRedis;
247 return 0;
248}
249
250
251/** \brief SCLogRedisWriteAsync() writes string to redis output in async mode
252 * \param file_ctx Log file context allocated by caller
253 * \param string Buffer to output
254 */
255static int SCLogRedisWriteAsync(LogFileCtx *file_ctx, const char *string, size_t string_len)
256{
257 SCLogRedisContext *ctx = file_ctx->redis;
258
259 if (! ctx->connected) {
260 if (SCConfLogReopenAsyncRedis(file_ctx) == -1) {
261 return -1;
262 }
263 if (ctx->tried == 0) {
264 SCLogNotice("Trying to connect to Redis");
265 }
266 SCLogAsyncRedisSendEcho(ctx);
267 }
268
269 if (!ctx->connected) {
270 return -1;
271 }
272
273 if (ctx->async == NULL) {
274 return -1;
275 }
276
277 redisAsyncCommand(ctx->async, SCRedisAsyncCommandCallback, file_ctx,
278 file_ctx->redis_setup.format, file_ctx->redis_setup.command, file_ctx->redis_setup.key,
279 string);
280
281 event_base_loop(ctx->ev_base, EVLOOP_NONBLOCK);
282
283 return 0;
284}
285
286#endif// HAVE_LIBEVENT
287
288/** \brief SCConfLogReopenSyncRedis() Open or re-opens connection to redis for logging.
289 * \param log_ctx Log file context allocated by caller
290 */
291static int SCConfLogReopenSyncRedis(LogFileCtx *log_ctx)
292{
293 SCLogRedisContext * ctx = log_ctx->redis;
294
295 /* only try to reconnect once per second */
296 if (ctx->tried >= time(NULL)) {
297 return -1;
298 }
299
300 const char *redis_server = log_ctx->redis_setup.server;
301 int redis_port = log_ctx->redis_setup.port;
302
303 if (ctx->sync != NULL) {
304 redisFree(ctx->sync);
305 }
306
307 if (strchr(redis_server, '/') == NULL) {
308 ctx->sync = redisConnect(redis_server, redis_port);
309 } else {
310 ctx->sync = redisConnectUnix(redis_server);
311 }
312 if (ctx->sync == NULL) {
313 SCLogError("Error connecting to redis server.");
314 ctx->tried = time(NULL);
315 return -1;
316 }
317 if (ctx->sync->err) {
318 SCLogError("Error connecting to redis server: [%s].", ctx->sync->errstr);
319 redisFree(ctx->sync);
320 ctx->sync = NULL;
321 ctx->tried = time(NULL);
322 return -1;
323 }
324 SCLogInfo("Connected to redis server [%s].", log_ctx->redis_setup.server);
325
326 log_ctx->redis = ctx;
327 log_ctx->Close = SCLogFileCloseRedis;
328 return 0;
329}
330/** \brief SCLogRedisWriteSync() writes string to redis output in sync mode
331 * \param file_ctx Log file context allocated by caller
332 * \param string Buffer to output
333 */
334static int SCLogRedisWriteSync(LogFileCtx *file_ctx, const char *string)
335{
336 SCLogRedisContext * ctx = file_ctx->redis;
337 int ret = -1;
338 redisContext *redis = ctx->sync;
339 if (redis == NULL) {
340 SCConfLogReopenSyncRedis(file_ctx);
341 redis = ctx->sync;
342 if (redis == NULL) {
343 SCLogDebug("Redis after re-open is not available.");
344 return -1;
345 }
346 }
347
348 /* synchronous mode */
349 if (file_ctx->redis_setup.batch_size) {
350 redisAppendCommand(redis, file_ctx->redis_setup.format, file_ctx->redis_setup.command,
351 file_ctx->redis_setup.key, string);
352 time_t now = time(NULL);
353 if ((ctx->batch_count == file_ctx->redis_setup.batch_size) || (ctx->last_push < now)) {
354 redisReply *reply;
355 int i;
356 int batch_size = ctx->batch_count;
357 ctx->batch_count = 0;
358 ctx->last_push = now;
359 for (i = 0; i <= batch_size; i++) {
360 if (redisGetReply(redis, (void **)&reply) == REDIS_OK) {
361 freeReplyObject(reply);
362 ret = 0;
363 } else {
364 if (redis->err) {
365 SCLogInfo("Error when fetching reply: %s (%d)",
366 redis->errstr,
367 redis->err);
368 }
369 switch (redis->err) {
370 case REDIS_ERR_EOF:
371 case REDIS_ERR_IO:
372 SCLogInfo("Reopening connection to redis server");
373 SCConfLogReopenSyncRedis(file_ctx);
374 redis = ctx->sync;
375 if (redis) {
376 SCLogInfo("Reconnected to redis server");
377 redisAppendCommand(redis, file_ctx->redis_setup.format,
378 file_ctx->redis_setup.command, file_ctx->redis_setup.key,
379 string);
380 ctx->batch_count++;
381 return 0;
382 } else {
383 SCLogInfo("Unable to reconnect to redis server");
384 return -1;
385 }
386 break;
387 default:
388 SCLogWarning("Unsupported error code %d", redis->err);
389 return -1;
390 }
391 }
392 }
393 } else {
394 ctx->batch_count++;
395 }
396 } else {
397 redisReply *reply = redisCommand(redis, file_ctx->redis_setup.format,
398 file_ctx->redis_setup.command, file_ctx->redis_setup.key, string);
399 /* We may lose the reply if disconnection happens*/
400 if (reply) {
401 switch (reply->type) {
402 case REDIS_REPLY_ERROR:
403 SCLogWarning("Redis error: %s", reply->str);
404 SCConfLogReopenSyncRedis(file_ctx);
405 break;
406 case REDIS_REPLY_INTEGER:
407 SCLogDebug("Redis integer %lld", reply->integer);
408 ret = 0;
409 break;
410 case REDIS_REPLY_STRING:
411 SCLogDebug("Redis string %s", reply->str);
412 ret = 0;
413 break;
414 default:
415 SCLogError("Redis default triggered with %d", reply->type);
416 SCConfLogReopenSyncRedis(file_ctx);
417 break;
418 }
419 freeReplyObject(reply);
420 } else {
421 SCConfLogReopenSyncRedis(file_ctx);
422 }
423 }
424 return ret;
425}
426
427/**
428 * \brief LogFileWriteRedis() writes log data to redis output.
429 * \param log_ctx Log file context allocated by caller
430 * \param string buffer with data to write
431 * \param string_len data length
432 * \retval 0 on success;
433 * \retval -1 on failure;
434 */
435int LogFileWriteRedis(void *lf_ctx, const char *string, size_t string_len)
436{
437 LogFileCtx *file_ctx = lf_ctx;
438 if (file_ctx == NULL) {
439 return -1;
440 }
441
442#if HAVE_LIBEVENT
443 /* async mode on */
444 if (file_ctx->redis_setup.is_async) {
445 return SCLogRedisWriteAsync(file_ctx, string, string_len);
446 }
447#endif
448 /* sync mode */
449 if (! file_ctx->redis_setup.is_async) {
450 return SCLogRedisWriteSync(file_ctx, string);
451 }
452 return -1;
453}
454
455/** \brief configure and initializes redis output logging
456 * \param conf ConfNode structure for the output section in question
457 * \param log_ctx Log file context allocated by caller
458 * \retval 0 on success
459 */
460int SCConfLogOpenRedis(SCConfNode *redis_node, void *lf_ctx)
461{
462 LogFileCtx *log_ctx = lf_ctx;
463
464 if (log_ctx->threaded) {
465 FatalError("redis does not support threaded output");
466 }
467
468 const char *redis_port = NULL;
469 const char *redis_mode = NULL;
470
471 int is_async = 0;
472
473 if (redis_node) {
474 log_ctx->redis_setup.server = SCConfNodeLookupChildValue(redis_node, "server");
475 log_ctx->redis_setup.key = SCConfNodeLookupChildValue(redis_node, "key");
476
477 redis_port = SCConfNodeLookupChildValue(redis_node, "port");
478 redis_mode = SCConfNodeLookupChildValue(redis_node, "mode");
479
480 (void)SCConfGetChildValueBool(redis_node, "async", &is_async);
481 }
482 if (!log_ctx->redis_setup.server) {
483 log_ctx->redis_setup.server = redis_default_server;
484 SCLogInfo("Using default redis server (127.0.0.1)");
485 }
486 if (!redis_port)
487 redis_port = "6379";
488 if (!redis_mode)
489 redis_mode = "list";
490 if (!log_ctx->redis_setup.key) {
491 log_ctx->redis_setup.key = redis_default_key;
492 }
493
494#ifndef HAVE_LIBEVENT
495 if (is_async) {
496 SCLogWarning("async option not available.");
497 }
498 is_async = 0;
499#endif //ifndef HAVE_LIBEVENT
500
501 log_ctx->redis_setup.is_async = is_async;
502 log_ctx->redis_setup.batch_size = 0;
503 if (redis_node) {
504 SCConfNode *pipelining = SCConfNodeLookupChild(redis_node, "pipelining");
505 if (pipelining) {
506 int enabled = 0;
507 int ret;
508 intmax_t val;
509 ret = SCConfGetChildValueBool(pipelining, "enabled", &enabled);
510 if (ret && enabled) {
511 ret = SCConfGetChildValueInt(pipelining, "batch-size", &val);
512 if (ret) {
513 log_ctx->redis_setup.batch_size = val;
514 } else {
515 log_ctx->redis_setup.batch_size = 10;
516 }
517 }
518 }
519 } else {
520 log_ctx->redis_setup.batch_size = 0;
521 }
522
523 log_ctx->redis_setup.format = redis_default_format;
524 if (!strcmp(redis_mode, "list") || !strcmp(redis_mode,"lpush")) {
525 log_ctx->redis_setup.command = redis_lpush_cmd;
526 } else if(!strcmp(redis_mode, "rpush")){
527 log_ctx->redis_setup.command = redis_rpush_cmd;
528 } else if(!strcmp(redis_mode,"channel") || !strcmp(redis_mode,"publish")) {
529 log_ctx->redis_setup.command = redis_publish_cmd;
530 } else if (!strcmp(redis_mode, "stream") || !strcmp(redis_mode, "xadd")) {
531 int exact;
532 intmax_t maxlen;
533 log_ctx->redis_setup.command = redis_xadd_cmd;
534 log_ctx->redis_setup.format = redis_stream_format;
535 if (SCConfGetChildValueBool(redis_node, "stream-trim-exact", &exact) == 0) {
536 exact = 0;
537 }
538 if (SCConfGetChildValueInt(redis_node, "stream-maxlen", &maxlen) == 0) {
539 maxlen = REDIS_MAX_STREAM_LENGTH_DEFAULT;
540 }
541 if (maxlen > 0) {
542 /* we do not need a lot of space here since we only build another
543 format string, whose length is limited by the length of the
544 maxlen integer formatted as a string */
545 log_ctx->redis_setup.stream_format = SCCalloc(100, sizeof(char));
546 snprintf(log_ctx->redis_setup.stream_format, 100, redis_stream_format_maxlen_tmpl, "%s",
547 "%s", exact ? '=' : '~', maxlen, "%s");
548 log_ctx->redis_setup.format = log_ctx->redis_setup.stream_format;
549 }
550 } else {
551 FatalError("Invalid redis mode: %s", redis_mode);
552 }
553
554 /* store server params for reconnection */
555 if (!log_ctx->redis_setup.server) {
556 FatalError("Error allocating redis server string");
557 }
558 if (StringParseUint16(&log_ctx->redis_setup.port, 10, 0, (const char *)redis_port) < 0) {
559 FatalError("Invalid value for redis port: %s", redis_port);
560 }
561 log_ctx->Close = SCLogFileCloseRedis;
562
563#ifdef HAVE_LIBEVENT
564 if (is_async) {
565 log_ctx->redis = SCLogRedisContextAsyncAlloc();
566 }
567#endif /*HAVE_LIBEVENT*/
568 if (! is_async) {
569 log_ctx->redis = SCLogRedisContextAlloc();
570 SCConfLogReopenSyncRedis(log_ctx);
571 }
572 return 0;
573}
574
575/** \brief SCLogFileCloseRedis() Closes redis log more
576 * \param log_ctx Log file context allocated by caller
577 */
578void SCLogFileCloseRedis(LogFileCtx *log_ctx)
579{
580 SCLogRedisContext * ctx = log_ctx->redis;
581 if (ctx == NULL) {
582 return;
583 }
584 /* asynchronous */
585 if (log_ctx->redis_setup.is_async) {
586#if HAVE_LIBEVENT == 1
587 if (ctx->async) {
588 if (ctx->connected > 0) {
589 SCLogAsyncRedisSendQuit(ctx);
590 }
591 if (ctx->ev_base != NULL) {
592 event_base_free(ctx->ev_base);
593 ctx->ev_base = NULL;
594 }
595 }
596#endif
597 }
598
599 /* synchronous */
600 if (!log_ctx->redis_setup.is_async) {
601 if (ctx->sync) {
602 redisReply *reply;
603 int i;
604 for (i = 0; i < ctx->batch_count; i++) {
605 redisGetReply(ctx->sync, (void **)&reply);
606 if (reply) {
607 freeReplyObject(reply);
608 }
609 }
610 redisFree(ctx->sync);
611 ctx->sync = NULL;
612 }
613 ctx->tried = 0;
614 ctx->batch_count = 0;
615 }
616
617 if (ctx != NULL) {
618 SCFree(ctx);
619 }
620}
621
622#endif //#ifdef HAVE_LIBHIREDIS
SCConfNode * SCConfNodeLookupChild(const SCConfNode *node, const char *name)
Lookup a child configuration node by name.
Definition conf.c:796
int SCConfGetChildValueInt(const SCConfNode *base, const char *name, intmax_t *val)
Definition conf.c:449
int SCConfGetChildValueBool(const SCConfNode *base, const char *name, int *val)
Definition conf.c:515
const char * SCConfNodeLookupChildValue(const SCConfNode *node, const char *name)
Lookup the value of a child configuration node by name.
Definition conf.c:824
struct Thresholds ctx
void(* Close)(struct LogFileCtx_ *fp)
int StringParseUint16(uint16_t *res, int base, size_t len, const char *str)
Definition util-byte.c:337
#define FatalError(...)
Definition util-debug.h:510
#define SCLogDebug(...)
Definition util-debug.h:275
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition util-debug.h:243
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition util-debug.h:255
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition util-debug.h:225
#define SCLogError(...)
Macro used to log ERROR messages.
Definition util-debug.h:267
#define SCFree(p)
Definition util-mem.h:61
#define SCCalloc(nm, sz)
Definition util-mem.h:53
#define unlikely(expr)