suricata
tmqh-flow.c
Go to the documentation of this file.
1/* Copyright (C) 2007-2020 Open Information Security Foundation
2 *
3 * You can copy, redistribute or modify this Program under the terms of
4 * the GNU General Public License version 2 as published by the Free
5 * Software Foundation.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * version 2 along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15 * 02110-1301, USA.
16 */
17
18/**
19 * \file
20 *
21 * \author Victor Julien <victor@inliniac.net>
22 * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23 *
24 * Simple output queue handler that makes sure all packets of the same flow
25 * are sent to the same queue. We support different kind of q handlers. Have
26 * a look at "autofp-scheduler" conf to further understand the various q
27 * handlers we provide.
28 */
29
30#include "suricata.h"
31#include "packet-queue.h"
32#include "decode.h"
33#include "threads.h"
34#include "threadvars.h"
35#include "tmqh-flow.h"
36#include "flow-hash.h"
37
38#include "tm-queuehandlers.h"
39
40#include "conf.h"
41#include "util-unittest.h"
42
46static void TmqhOutputFlowFTPHash(ThreadVars *t, Packet *p);
47void *TmqhOutputFlowSetupCtx(const char *queue_str);
48void TmqhOutputFlowFreeCtx(void *ctx);
49void TmqhFlowRegisterTests(void);
50
52{
53 tmqh_table[TMQH_FLOW].name = "flow";
58
59 const char *scheduler = NULL;
60 if (SCConfGet("autofp-scheduler", &scheduler) == 1) {
61 if (strcasecmp(scheduler, "round-robin") == 0) {
62 SCLogNotice("using flow hash instead of round robin");
64 } else if (strcasecmp(scheduler, "active-packets") == 0) {
65 SCLogNotice("using flow hash instead of active packets");
67 } else if (strcasecmp(scheduler, "hash") == 0) {
69 } else if (strcasecmp(scheduler, "ippair") == 0) {
71 } else if (strcasecmp(scheduler, "ftp-hash") == 0) {
72 tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowFTPHash;
73 } else {
74 SCLogError("Invalid entry \"%s\" "
75 "for autofp-scheduler in conf. Killing engine.",
76 scheduler);
77 exit(EXIT_FAILURE);
78 }
79 } else {
81 }
82}
83
85{
86#define PRINT_IF_FUNC(f, msg) \
87 if (tmqh_table[TMQH_FLOW].OutHandler == (f)) \
88 SCLogConfig("AutoFP mode using \"%s\" flow load balancer", (msg))
89
92 PRINT_IF_FUNC(TmqhOutputFlowFTPHash, "FTPHash");
93
94#undef PRINT_IF_FUNC
95}
96
97/* same as 'simple' */
99{
100 PacketQueue *q = tv->inq->pq;
101
103
104 SCMutexLock(&q->mutex_q);
105 if (q->len == 0) {
106 /* if we have no packets in queue, wait... */
107 SCCondWait(&q->cond_q, &q->mutex_q);
108 }
109
110 if (q->len > 0) {
111 Packet *p = PacketDequeue(q);
113 return p;
114 } else {
115 /* return NULL if we have no pkt. Should only happen on signals. */
117 return NULL;
118 }
119}
120
121static int StoreQueueId(TmqhFlowCtx *ctx, char *name)
122{
123 void *ptmp;
124 Tmq *tmq = TmqGetQueueByName(name);
125 if (tmq == NULL) {
126 tmq = TmqCreateQueue(name);
127 if (tmq == NULL)
128 return -1;
129 }
130 tmq->writer_cnt++;
131
132 if (ctx->queues == NULL) {
133 ctx->size = 1;
134 ctx->queues = SCCalloc(1, ctx->size * sizeof(TmqhFlowMode));
135 if (ctx->queues == NULL) {
136 return -1;
137 }
138 } else {
139 ctx->size++;
140 ptmp = SCRealloc(ctx->queues, ctx->size * sizeof(TmqhFlowMode));
141 if (ptmp == NULL) {
142 SCFree(ctx->queues);
143 ctx->queues = NULL;
144 return -1;
145 }
146 ctx->queues = ptmp;
147
148 memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode));
149 }
150 ctx->queues[ctx->size - 1].q = tmq->pq;
151
152 return 0;
153}
154
155/**
156 * \brief setup the queue handlers ctx
157 *
158 * Parses a comma separated string "queuename1,queuename2,etc"
159 * and sets the ctx up to devide flows over these queue's.
160 *
161 * \param queue_str comma separated string with output queue names
162 *
163 * \retval ctx queues handlers ctx or NULL in error
164 */
165void *TmqhOutputFlowSetupCtx(const char *queue_str)
166{
167 if (queue_str == NULL || strlen(queue_str) == 0)
168 return NULL;
169
170 SCLogDebug("queue_str %s", queue_str);
171
172 TmqhFlowCtx *ctx = SCCalloc(1, sizeof(TmqhFlowCtx));
173 if (unlikely(ctx == NULL))
174 return NULL;
175
176 char *str = SCStrdup(queue_str);
177 if (unlikely(str == NULL)) {
178 goto error;
179 }
180 char *tstr = str;
181
182 /* parse the comma separated string */
183 do {
184 char *comma = strchr(tstr,',');
185 if (comma != NULL) {
186 *comma = '\0';
187 char *qname = tstr;
188 int r = StoreQueueId(ctx,qname);
189 if (r < 0)
190 goto error;
191 } else {
192 char *qname = tstr;
193 int r = StoreQueueId(ctx,qname);
194 if (r < 0)
195 goto error;
196 }
197 tstr = comma ? (comma + 1) : comma;
198 } while (tstr != NULL);
199
200 SCFree(str);
201 return (void *)ctx;
202
203error:
204 SCFree(ctx);
205 if (str != NULL)
206 SCFree(str);
207 return NULL;
208}
209
211{
212 TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
213
214 SCLogPerf("AutoFP - Total flow handler queues - %" PRIu16,
215 fctx->size);
216 SCFree(fctx->queues);
217 SCFree(fctx);
218}
219
221{
222 uint32_t qid;
224
225 if (p->flags & PKT_WANTS_FLOW) {
226 uint32_t hash = p->flow_hash;
227 qid = hash % ctx->size;
228 } else {
229 qid = ctx->last++;
230
231 if (ctx->last == ctx->size)
232 ctx->last = 0;
233 }
234
235 PacketQueue *q = ctx->queues[qid].q;
236 SCMutexLock(&q->mutex_q);
237 PacketEnqueue(q, p);
238 SCCondSignal(&q->cond_q);
240}
241
242/**
243 * \brief select the queue to output based on IP address pair.
244 *
245 * \param tv thread vars.
246 * \param p packet.
247 */
249{
250 uint32_t addr_hash = 0;
251
253
254 if (p->src.family == AF_INET6) {
255 for (int i = 0; i < 4; i++) {
256 addr_hash += p->src.addr_data32[i] + p->dst.addr_data32[i];
257 }
258 } else {
259 addr_hash = p->src.addr_data32[0] + p->dst.addr_data32[0];
260 }
261
262 uint32_t qid = addr_hash % ctx->size;
263 PacketQueue *q = ctx->queues[qid].q;
264 SCMutexLock(&q->mutex_q);
265 PacketEnqueue(q, p);
266 SCCondSignal(&q->cond_q);
268}
269
270static void TmqhOutputFlowFTPHash(ThreadVars *tv, Packet *p)
271{
272 uint32_t qid;
274
275 if (p->flags & PKT_WANTS_FLOW) {
276 uint32_t hash = p->flow_hash;
277 if (PacketIsTCP(p) && ((p->sp >= 1024 && p->dp >= 1024) || p->dp == 21 || p->sp == 21 ||
278 p->dp == 20 || p->sp == 20)) {
279 hash = FlowGetIpPairProtoHash(p);
280 }
281 qid = hash % ctx->size;
282 } else {
283 qid = ctx->last++;
284
285 if (ctx->last == ctx->size)
286 ctx->last = 0;
287 }
288
289 PacketQueue *q = ctx->queues[qid].q;
290 SCMutexLock(&q->mutex_q);
291 PacketEnqueue(q, p);
292 SCCondSignal(&q->cond_q);
294}
295
296#ifdef UNITTESTS
297
298static int TmqhOutputFlowSetupCtxTest01(void)
299{
301
302 Tmq *tmq1 = TmqCreateQueue("queue1");
303 FAIL_IF_NULL(tmq1);
304 Tmq *tmq2 = TmqCreateQueue("queue2");
305 FAIL_IF_NULL(tmq2);
306 Tmq *tmq3 = TmqCreateQueue("another");
307 FAIL_IF_NULL(tmq3);
308 Tmq *tmq4 = TmqCreateQueue("yetanother");
309 FAIL_IF_NULL(tmq4);
310
311 const char *str = "queue1,queue2,another,yetanother";
314
315 TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
316
317 FAIL_IF_NOT(fctx->size == 4);
318
319 FAIL_IF_NULL(fctx->queues);
320
321 FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
322 FAIL_IF_NOT(fctx->queues[1].q == tmq2->pq);
323 FAIL_IF_NOT(fctx->queues[2].q == tmq3->pq);
324 FAIL_IF_NOT(fctx->queues[3].q == tmq4->pq);
325
328 PASS;
329}
330
331static int TmqhOutputFlowSetupCtxTest02(void)
332{
334
335 Tmq *tmq1 = TmqCreateQueue("queue1");
336 FAIL_IF_NULL(tmq1);
337 Tmq *tmq2 = TmqCreateQueue("queue2");
338 FAIL_IF_NULL(tmq2);
339 Tmq *tmq3 = TmqCreateQueue("another");
340 FAIL_IF_NULL(tmq3);
341 Tmq *tmq4 = TmqCreateQueue("yetanother");
342 FAIL_IF_NULL(tmq4);
343
344 const char *str = "queue1";
347
348 TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
349
350 FAIL_IF_NOT(fctx->size == 1);
351
352 FAIL_IF_NULL(fctx->queues);
353
354 FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
357
358 PASS;
359}
360
361static int TmqhOutputFlowSetupCtxTest03(void)
362{
364
365 const char *str = "queue1,queue2,another,yetanother";
368
369 TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
370
371 FAIL_IF_NOT(fctx->size == 4);
372
373 FAIL_IF_NULL(fctx->queues);
374
375 Tmq *tmq1 = TmqGetQueueByName("queue1");
376 FAIL_IF_NULL(tmq1);
377 Tmq *tmq2 = TmqGetQueueByName("queue2");
378 FAIL_IF_NULL(tmq2);
379 Tmq *tmq3 = TmqGetQueueByName("another");
380 FAIL_IF_NULL(tmq3);
381 Tmq *tmq4 = TmqGetQueueByName("yetanother");
382 FAIL_IF_NULL(tmq4);
383
384 FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
385 FAIL_IF_NOT(fctx->queues[1].q == tmq2->pq);
386 FAIL_IF_NOT(fctx->queues[2].q == tmq3->pq);
387 FAIL_IF_NOT(fctx->queues[3].q == tmq4->pq);
388
391 PASS;
392}
393
394#endif /* UNITTESTS */
395
397{
398#ifdef UNITTESTS
399 UtRegisterTest("TmqhOutputFlowSetupCtxTest01",
400 TmqhOutputFlowSetupCtxTest01);
401 UtRegisterTest("TmqhOutputFlowSetupCtxTest02",
402 TmqhOutputFlowSetupCtxTest02);
403 UtRegisterTest("TmqhOutputFlowSetupCtxTest03",
404 TmqhOutputFlowSetupCtxTest03);
405#endif
406}
int SCConfGet(const char *name, const char **vptr)
Retrieve the value of a configuration node.
Definition conf.c:350
void StatsSyncCountersIfSignalled(ThreadVars *tv)
Definition counters.c:450
#define PKT_WANTS_FLOW
Definition decode.h:1296
uint32_t FlowGetIpPairProtoHash(const Packet *p)
Definition flow-hash.c:117
ThreadVars * tv
#define FAIL_IF_NULL(expr)
Fail a test if expression evaluates to NULL.
void UtRegisterTest(const char *name, int(*TestFn)(void))
Register unit test.
#define FAIL_IF_NOT(expr)
Fail a test if expression evaluates to false.
#define PASS
Pass the test.
struct Thresholds ctx
void PacketEnqueue(PacketQueue *q, Packet *p)
Packet * PacketDequeue(PacketQueue *q)
char family
Definition decode.h:113
simple fifo queue for packets with mutex and cond Calling the mutex or triggering the cond is respons...
uint32_t len
SCMutex mutex_q
SCCondT cond_q
Address src
Definition decode.h:505
uint32_t flow_hash
Definition decode.h:550
uint32_t flags
Definition decode.h:544
Address dst
Definition decode.h:506
Per thread variable structure.
Definition threadvars.h:58
void * outctx
Definition threadvars.h:105
uint16_t writer_cnt
Definition tm-queues.h:34
PacketQueue * pq
Definition tm-queues.h:35
Ctx for the flow queue handler.
Definition tmqh-flow.h:34
TmqhFlowMode * queues
Definition tmqh-flow.h:38
uint16_t size
Definition tmqh-flow.h:35
PacketQueue * q
Definition tmqh-flow.h:28
const char * name
Packet *(* InHandler)(ThreadVars *)
void(* RegisterTests)(void)
void(* OutHandlerCtxFree)(void *)
void(* OutHandler)(ThreadVars *, Packet *)
void *(* OutHandlerCtxSetup)(const char *)
#define str(s)
#define SCCondWait
#define SCMutexUnlock(mut)
#define SCCondSignal
#define SCMutexLock(mut)
Tmqh tmqh_table[TMQH_SIZE]
@ TMQH_FLOW
void TmqResetQueues(void)
Definition tm-queues.c:80
Tmq * TmqGetQueueByName(const char *name)
Definition tm-queues.c:59
Tmq * TmqCreateQueue(const char *name)
const char * name
void * TmqhOutputFlowSetupCtx(const char *queue_str)
setup the queue handlers ctx
Definition tmqh-flow.c:165
void TmqhOutputFlowIPPair(ThreadVars *t, Packet *p)
select the queue to output based on IP address pair.
Definition tmqh-flow.c:248
Packet * TmqhInputFlow(ThreadVars *t)
Definition tmqh-flow.c:98
void TmqhFlowPrintAutofpHandler(void)
Definition tmqh-flow.c:84
#define PRINT_IF_FUNC(f, msg)
void TmqhFlowRegister(void)
Definition tmqh-flow.c:51
void TmqhOutputFlowFreeCtx(void *ctx)
Definition tmqh-flow.c:210
void TmqhOutputFlowHash(ThreadVars *t, Packet *p)
Definition tmqh-flow.c:220
void TmqhFlowRegisterTests(void)
Definition tmqh-flow.c:396
#define SCLogPerf(...)
Definition util-debug.h:234
#define SCLogDebug(...)
Definition util-debug.h:275
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition util-debug.h:243
#define SCLogError(...)
Macro used to log ERROR messages.
Definition util-debug.h:267
#define SCFree(p)
Definition util-mem.h:61
#define SCRealloc(ptr, sz)
Definition util-mem.h:50
#define SCCalloc(nm, sz)
Definition util-mem.h:53
#define SCStrdup(s)
Definition util-mem.h:56
#define unlikely(expr)