suricata
flow-manager.c
Go to the documentation of this file.
1/* Copyright (C) 2007-2024 Open Information Security Foundation
2 *
3 * You can copy, redistribute or modify this Program under the terms of
4 * the GNU General Public License version 2 as published by the Free
5 * Software Foundation.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * version 2 along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15 * 02110-1301, USA.
16 */
17
18/**
19 * \file
20 *
21 * \author Anoop Saldanha <anoopsaldanha@gmail.com>
22 * \author Victor Julien <victor@inliniac.net>
23 */
24
25#include "suricata-common.h"
26#include "conf.h"
27#include "threadvars.h"
28#include "tm-threads.h"
29#include "runmodes.h"
30
31#include "util-time.h"
32
33#include "flow.h"
34#include "flow-queue.h"
35#include "flow-hash.h"
36#include "flow-util.h"
37#include "flow-private.h"
38#include "flow-timeout.h"
39#include "flow-manager.h"
40#include "flow-storage.h"
41#include "flow-spare-pool.h"
42#include "flow-callbacks.h"
43
44#include "stream-tcp.h"
45#include "stream-tcp-cache.h"
46
47#include "util-device-private.h"
48
49#include "util-debug.h"
50
51#include "threads.h"
53
54#include "host-timeout.h"
55#include "defrag-hash.h"
56#include "defrag-timeout.h"
57#include "ippair-timeout.h"
58#include "app-layer-htp-range.h"
59
60#include "output-flow.h"
61
62#include "runmode-unix-socket.h"
63
64/** queue to pass flows to cleanup/log thread(s) */
66
67/* multi flow manager support */
68static uint32_t flowmgr_number = 1;
69/* atomic counter for flow managers, to assign instance id */
70SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt);
71
72/* multi flow recycler support */
73static uint32_t flowrec_number = 1;
74/* atomic counter for flow recyclers, to assign instance id */
75SC_ATOMIC_DECLARE(uint32_t, flowrec_cnt);
76SC_ATOMIC_DECLARE(uint32_t, flowrec_busy);
77SC_ATOMIC_EXTERN(unsigned int, flow_flags);
78
79static SCCtrlCondT flow_manager_ctrl_cond = PTHREAD_COND_INITIALIZER;
80static SCCtrlMutex flow_manager_ctrl_mutex = PTHREAD_MUTEX_INITIALIZER;
81static SCCtrlCondT flow_recycler_ctrl_cond = PTHREAD_COND_INITIALIZER;
82static SCCtrlMutex flow_recycler_ctrl_mutex = PTHREAD_MUTEX_INITIALIZER;
83
85{
86 SCCtrlMutexLock(&flow_manager_ctrl_mutex);
87 SCCtrlCondSignal(&flow_manager_ctrl_cond);
88 SCCtrlMutexUnlock(&flow_manager_ctrl_mutex);
89}
90
92{
93 SCCtrlMutexLock(&flow_recycler_ctrl_mutex);
94 SCCtrlCondSignal(&flow_recycler_ctrl_cond);
95 SCCtrlMutexUnlock(&flow_recycler_ctrl_mutex);
96}
97
99{
100 SC_ATOMIC_SET(flow_timeouts, flow_timeouts_normal);
101}
102
104{
105 SC_ATOMIC_SET(flow_timeouts, flow_timeouts_emerg);
106}
107
125
126/**
127 * \brief Used to disable flow manager thread(s).
128 *
129 * \todo Kinda hackish since it uses the tv name to identify flow manager
130 * thread. We need an all weather identification scheme.
131 */
133{
135 /* flow manager thread(s) is/are a part of mgmt threads */
136 for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
137 if (strncasecmp(tv->name, thread_name_flow_mgr,
138 strlen(thread_name_flow_mgr)) == 0)
139 {
141 }
142 }
144
145 struct timeval start_ts;
146 struct timeval cur_ts;
147 gettimeofday(&start_ts, NULL);
148
149again:
150 gettimeofday(&cur_ts, NULL);
151 if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
152 FatalError("unable to get all flow manager "
153 "threads to shutdown in time");
154 }
155
157 for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
158 if (strncasecmp(tv->name, thread_name_flow_mgr,
159 strlen(thread_name_flow_mgr)) == 0)
160 {
163 /* sleep outside lock */
164 SleepMsec(1);
165 goto again;
166 }
167 }
168 }
170
171 /* reset count, so we can kill and respawn (unix socket) */
172 SC_ATOMIC_SET(flowmgr_cnt, 0);
173}
174
175/** \internal
176 * \brief check if a flow is timed out
177 *
178 * Takes lastts, adds the timeout policy to it, compared to current time `ts`.
179 * In case of emergency mode, timeout_policy is ignored and the emerg table
180 * is used.
181 *
182 * \param f flow
183 * \param ts timestamp - realtime or a minimum of active threads in offline mode
184 * \param next_ts tracking the next timeout ts, so FM can skip the row until that time
185 * \param emerg bool to indicate if emergency timeout settings should be used
186 *
187 * \retval false not timed out
188 * \retval true timed out
189 */
190static bool FlowManagerFlowTimeout(Flow *f, SCTime_t ts, uint32_t *next_ts, const bool emerg)
191{
192 SCTime_t timesout_at;
193
194 if (emerg) {
196 timesout_at = SCTIME_ADD_SECS(f->lastts,
197 FlowGetFlowTimeoutDirect(flow_timeouts_emerg, f->flow_state, f->protomap));
198 } else {
199 timesout_at = SCTIME_ADD_SECS(f->lastts, f->timeout_policy);
200 }
201 /* update next_ts if needed */
202 if (*next_ts == 0 || (uint32_t)SCTIME_SECS(timesout_at) < *next_ts)
203 *next_ts = (uint32_t)SCTIME_SECS(timesout_at);
204
205 /* if time is live, we just use the `ts` */
206 if (TimeModeIsLive() || f->thread_id[0] == 0) {
207 /* do the timeout check */
208 if (SCTIME_CMP_LT(ts, timesout_at)) {
209 return false;
210 }
211 } else {
212 /* offline: take last ts from "owning" thread */
214 /* do the timeout check */
215 if (SCTIME_CMP_LT(checkts, timesout_at)) {
216 return false;
217 }
218 }
219
220 return true;
221}
222
223#ifdef CAPTURE_OFFLOAD
224/** \internal
225 * \brief check timeout of captured bypassed flow by querying capture method
226 *
227 * \param f Flow
228 * \param ts timestamp
229 * \param counters Flow timeout counters
230 *
231 * \retval false not timeout
232 * \retval true timeout (or not capture bypassed)
233 */
234static inline bool FlowBypassedTimeout(Flow *f, SCTime_t ts, FlowTimeoutCounters *counters)
235{
236 if (f->flow_state != FLOW_STATE_CAPTURE_BYPASSED) {
237 return true;
238 }
239
241 if (fc && fc->BypassUpdate) {
242 /* flow will be possibly updated */
243 uint64_t pkts_tosrc = fc->tosrcpktcnt;
244 uint64_t bytes_tosrc = fc->tosrcbytecnt;
245 uint64_t pkts_todst = fc->todstpktcnt;
246 uint64_t bytes_todst = fc->todstbytecnt;
247 bool update = fc->BypassUpdate(f, fc->bypass_data, SCTIME_SECS(ts));
248 if (update) {
249 SCLogDebug("Updated flow: %" PRIu64 "", FlowGetId(f));
250 pkts_tosrc = fc->tosrcpktcnt - pkts_tosrc;
251 bytes_tosrc = fc->tosrcbytecnt - bytes_tosrc;
252 pkts_todst = fc->todstpktcnt - pkts_todst;
253 bytes_todst = fc->todstbytecnt - bytes_todst;
254 if (f->livedev) {
255 SC_ATOMIC_ADD(f->livedev->bypassed,
256 pkts_tosrc + pkts_todst);
257 }
258 counters->bypassed_pkts += pkts_tosrc + pkts_todst;
259 counters->bypassed_bytes += bytes_tosrc + bytes_todst;
260 return false;
261 }
262 SCLogDebug("No new packet, dead flow %" PRIu64 "", FlowGetId(f));
263 if (f->livedev) {
264 if (FLOW_IS_IPV4(f)) {
265 LiveDevSubBypassStats(f->livedev, 1, AF_INET);
266 } else if (FLOW_IS_IPV6(f)) {
267 LiveDevSubBypassStats(f->livedev, 1, AF_INET6);
268 }
269 }
270 counters->bypassed_count++;
271 }
272 return true;
273}
274#endif /* CAPTURE_OFFLOAD */
275
277 /* used to temporarily store flows that have timed out and are
278 * removed from the hash to reduce locking contention */
281
282/**
283 * \internal
284 *
285 * \brief Process the temporary Aside Queue
286 * This means that as long as a flow f is not waiting on detection
287 * engine to finish dealing with it, f will be put in the recycle
288 * queue for further processing later on.
289 *
290 * \param td FM Timeout Thread instance
291 * \param counters Flow Timeout counters to be updated
292 *
293 * \retval Number of flows that were recycled
294 */
295static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCounters *counters)
296{
297 FlowQueuePrivate recycle = { NULL, NULL, 0 };
298 counters->flows_aside += td->aside_queue.len;
299
300 uint32_t cnt = 0;
301 Flow *f;
302 while ((f = FlowQueuePrivateGetFromTop(&td->aside_queue)) != NULL) {
303 /* flow is still locked */
304
305 if (f->proto == IPPROTO_TCP &&
307 !FlowIsBypassed(f) && FlowNeedsReassembly(f)) {
308 /* Send the flow to its thread */
311 /* flow ownership is already passed to the worker thread */
312
313 counters->flows_aside_needs_work++;
314 continue;
315 }
317
318 FlowQueuePrivateAppendFlow(&recycle, f);
319 if (recycle.len == 100) {
322 }
323 cnt++;
324 }
325 if (recycle.len) {
328 }
329 return cnt;
330}
331
332/**
333 * \internal
334 *
335 * \brief check all flows in a hash row for timing out
336 *
337 * \param f last flow in the hash row
338 * \param ts timestamp
339 * \param emergency bool indicating emergency mode
340 * \param counters ptr to FlowTimeoutCounters structure
341 */
342static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCTime_t ts,
343 int emergency, FlowTimeoutCounters *counters, uint32_t *next_ts)
344{
345 uint32_t checked = 0;
346 Flow *prev_f = NULL;
347
348 do {
349 checked++;
350
352
353 /* check flow timeout based on lastts and state. Both can be
354 * accessed w/o Flow lock as we do have the hash row lock (so flow
355 * can't disappear) and flow_state is atomic. lastts can only
356 * be modified when we have both the flow and hash row lock */
357
358 /* timeout logic goes here */
359 if (!FlowManagerFlowTimeout(f, ts, next_ts, emergency)) {
361 counters->flows_notimeout++;
362
363 prev_f = f;
364 f = f->next;
365 continue;
366 }
367
368 Flow *next_flow = f->next;
369
370#ifdef CAPTURE_OFFLOAD
371 /* never prune a flow that is used by a packet we
372 * are currently processing in one of the threads */
373 if (!FlowBypassedTimeout(f, ts, counters)) {
375 prev_f = f;
376 f = f->next;
377 continue;
378 }
379#endif
381
382 counters->flows_timeout++;
383
384 RemoveFromHash(f, prev_f);
385
387 /* flow is still locked in the queue */
388
389 f = next_flow;
390 } while (f != NULL);
391
392 counters->flows_checked += checked;
393 if (checked > counters->rows_maxlen)
394 counters->rows_maxlen = checked;
395}
396
397/**
398 * \internal
399 *
400 * \brief Clear evicted list from Flow Manager.
401 * All the evicted flows are removed from the Flow bucket and added
402 * to the temporary Aside Queue.
403 *
404 * \param td FM timeout thread instance
405 * \param f head of the evicted list
406 */
407static void FlowManagerHashRowClearEvictedList(FlowManagerTimeoutThread *td, Flow *f)
408{
409 do {
411 Flow *next_flow = f->next;
412 f->next = NULL;
413 f->fb = NULL;
414
416 /* flow is still locked in the queue */
417
418 f = next_flow;
419 } while (f != NULL);
420}
421
422/**
423 * \brief time out flows from the hash
424 *
425 * \param ts timestamp
426 * \param hash_min min hash index to consider
427 * \param hash_max max hash index to consider
428 * \param counters ptr to FlowTimeoutCounters structure
429 *
430 * \retval cnt number of timed out flow
431 */
432static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td, SCTime_t ts, const uint32_t hash_min,
433 const uint32_t hash_max, FlowTimeoutCounters *counters)
434{
435 uint32_t cnt = 0;
436 const int emergency = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY));
437 const uint32_t rows_checked = hash_max - hash_min;
438 uint32_t rows_skipped = 0;
439 uint32_t rows_empty = 0;
440
441#if __WORDSIZE==64
442#define BITS 64
443#define TYPE uint64_t
444#else
445#define BITS 32
446#define TYPE uint32_t
447#endif
448
449 const uint32_t ts_secs = (uint32_t)SCTIME_SECS(ts);
450 for (uint32_t idx = hash_min; idx < hash_max; idx+=BITS) {
451 TYPE check_bits = 0;
452 const uint32_t check = MIN(BITS, (hash_max - idx));
453 for (uint32_t i = 0; i < check; i++) {
454 FlowBucket *fb = &flow_hash[idx+i];
455 check_bits |= (TYPE)(SC_ATOMIC_LOAD_EXPLICIT(
456 fb->next_ts, SC_ATOMIC_MEMORY_ORDER_RELAXED) <= ts_secs)
457 << (TYPE)i;
458 }
459 if (check_bits == 0)
460 continue;
461
462 for (uint32_t i = 0; i < check; i++) {
463 FlowBucket *fb = &flow_hash[idx+i];
464 if ((check_bits & ((TYPE)1 << (TYPE)i)) != 0 && SC_ATOMIC_GET(fb->next_ts) <= ts_secs) {
465 FBLOCK_LOCK(fb);
466 Flow *evicted = NULL;
467 if (fb->evicted != NULL || fb->head != NULL) {
468 if (fb->evicted != NULL) {
469 /* transfer out of bucket so we can do additional work outside
470 * of the bucket lock */
471 evicted = fb->evicted;
472 fb->evicted = NULL;
473 }
474 if (fb->head != NULL) {
475 uint32_t next_ts = 0;
476 FlowManagerHashRowTimeout(td, fb->head, ts, emergency, counters, &next_ts);
477
478 if (SC_ATOMIC_GET(fb->next_ts) != next_ts)
479 SC_ATOMIC_SET(fb->next_ts, next_ts);
480 }
481 if (fb->evicted == NULL && fb->head == NULL) {
482 /* row is empty */
483 SC_ATOMIC_SET(fb->next_ts, UINT_MAX);
484 }
485 } else {
486 SC_ATOMIC_SET(fb->next_ts, UINT_MAX);
487 rows_empty++;
488 }
489 FBLOCK_UNLOCK(fb);
490 /* processed evicted list */
491 if (evicted) {
492 FlowManagerHashRowClearEvictedList(td, evicted);
493 }
494 } else {
495 rows_skipped++;
496 }
497 }
498 if (td->aside_queue.len) {
499 cnt += ProcessAsideQueue(td, counters);
500 }
501 }
502
503 counters->rows_checked += rows_checked;
504 counters->rows_skipped += rows_skipped;
505 counters->rows_empty += rows_empty;
506
507 if (td->aside_queue.len) {
508 cnt += ProcessAsideQueue(td, counters);
509 }
510 counters->flows_removed += cnt;
511 /* coverity[missing_unlock : FALSE] */
512 return cnt;
513}
514
515/** \internal
516 *
517 * \brief handle timeout for a slice of hash rows
518 * If we wrap around we call FlowTimeoutHash twice
519 * \param td FM timeout thread
520 * \param ts timeout timestamp
521 * \param hash_min lower bound of the row slice
522 * \param hash_max upper bound of the row slice
523 * \param counters Flow timeout counters to be passed
524 * \param rows number of rows for this worker unit
525 * \param pos absolute position of the beginning of row slice in the hash table
526 * \param instance instance id of this FM
527 *
528 * \retval number of successfully timed out flows
529 */
530static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td, SCTime_t ts,
531 const uint32_t hash_min, const uint32_t hash_max, FlowTimeoutCounters *counters,
532 const uint32_t rows, uint32_t *pos, const uint32_t instance)
533{
534 uint32_t start = 0;
535 uint32_t end = 0;
536 uint32_t cnt = 0;
537 uint32_t rows_left = rows;
538
539again:
540 start = (*pos);
541 if (start >= hash_max) {
542 start = hash_min;
543 }
544 end = start + rows_left;
545 if (end > hash_max) {
546 end = hash_max;
547 }
548 *pos = (end == hash_max) ? hash_min : end;
549 rows_left = rows_left - (end - start);
550
551 SCLogDebug("instance %u: %u:%u (hash_min %u, hash_max %u *pos %u)", instance, start, end,
552 hash_min, hash_max, *pos);
553
554 cnt += FlowTimeoutHash(td, ts, start, end, counters);
555 if (rows_left) {
556 goto again;
557 }
558 return cnt;
559}
560
561/**
562 * \internal
563 *
564 * \brief move all flows out of a hash row
565 *
566 * \param f last flow in the hash row
567 * \param recycle_q Flow recycle queue
568 * \param mode emergency or not
569 *
570 * \retval cnt number of flows removed from the hash and added to the recycle queue
571 */
572static uint32_t FlowManagerHashRowCleanup(Flow *f, FlowQueuePrivate *recycle_q, const int mode)
573{
574 uint32_t cnt = 0;
575
576 do {
578
579 Flow *next_flow = f->next;
580
581 /* remove from the hash */
582 if (mode == 0) {
583 RemoveFromHash(f, NULL);
584 } else {
585 FlowBucket *fb = f->fb;
586 fb->evicted = f->next;
587 f->next = NULL;
588 f->fb = NULL;
589 }
591
592 /* no one is referring to this flow, removed from hash
593 * so we can unlock it and move it to the recycle queue. */
595 FlowQueuePrivateAppendFlow(recycle_q, f);
596
597 cnt++;
598
599 f = next_flow;
600 } while (f != NULL);
601
602 return cnt;
603}
604
605#define RECYCLE_MAX_QUEUE_ITEMS 25
606/**
607 * \brief remove all flows from the hash
608 *
609 * \retval cnt number of removes out flows
610 */
611static uint32_t FlowCleanupHash(void)
612{
613 FlowQueuePrivate local_queue = { NULL, NULL, 0 };
614 uint32_t cnt = 0;
615
616 for (uint32_t idx = 0; idx < flow_config.hash_size; idx++) {
617 FlowBucket *fb = &flow_hash[idx];
618
619 FBLOCK_LOCK(fb);
620
621 if (fb->head != NULL) {
622 /* we have a flow, or more than one */
623 cnt += FlowManagerHashRowCleanup(fb->head, &local_queue, 0);
624 }
625 if (fb->evicted != NULL) {
626 /* we have a flow, or more than one */
627 cnt += FlowManagerHashRowCleanup(fb->evicted, &local_queue, 1);
628 }
629
630 FBLOCK_UNLOCK(fb);
631 if (local_queue.len >= RECYCLE_MAX_QUEUE_ITEMS) {
634 }
635 }
639
640 return cnt;
641}
642
666
678
679static void FlowCountersInit(ThreadVars *t, FlowCounters *fc)
680{
681 fc->flow_mgr_full_pass = StatsRegisterCounter("flow.mgr.full_hash_pass", t);
682 fc->flow_mgr_rows_sec = StatsRegisterCounter("flow.mgr.rows_per_sec", t);
683
684 fc->flow_mgr_spare = StatsRegisterCounter("flow.spare", t);
685 fc->flow_emerg_mode_enter = StatsRegisterCounter("flow.emerg_mode_entered", t);
686 fc->flow_emerg_mode_over = StatsRegisterCounter("flow.emerg_mode_over", t);
687
688 fc->flow_mgr_rows_maxlen = StatsRegisterMaxCounter("flow.mgr.rows_maxlen", t);
689 fc->flow_mgr_flows_checked = StatsRegisterCounter("flow.mgr.flows_checked", t);
690 fc->flow_mgr_flows_notimeout = StatsRegisterCounter("flow.mgr.flows_notimeout", t);
691 fc->flow_mgr_flows_timeout = StatsRegisterCounter("flow.mgr.flows_timeout", t);
692 fc->flow_mgr_flows_aside = StatsRegisterCounter("flow.mgr.flows_evicted", t);
693 fc->flow_mgr_flows_aside_needs_work = StatsRegisterCounter("flow.mgr.flows_evicted_needs_work", t);
694
695 fc->flow_bypassed_cnt_clo = StatsRegisterCounter("flow_bypassed.closed", t);
696 fc->flow_bypassed_pkts = StatsRegisterCounter("flow_bypassed.pkts", t);
697 fc->flow_bypassed_bytes = StatsRegisterCounter("flow_bypassed.bytes", t);
698
699 fc->memcap_pressure = StatsRegisterCounter("memcap.pressure", t);
700 fc->memcap_pressure_max = StatsRegisterMaxCounter("memcap.pressure_max", t);
701}
702
703static void FlowCountersUpdate(
704 ThreadVars *th_v, const FlowManagerThreadData *ftd, const FlowTimeoutCounters *counters)
705{
706 StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_checked, (uint64_t)counters->flows_checked);
707 StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_notimeout, (uint64_t)counters->flows_notimeout);
708
709 StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_timeout, (uint64_t)counters->flows_timeout);
710 StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_aside, (uint64_t)counters->flows_aside);
712 (uint64_t)counters->flows_aside_needs_work);
713
714 StatsAddUI64(th_v, ftd->cnt.flow_bypassed_cnt_clo, (uint64_t)counters->bypassed_count);
715 StatsAddUI64(th_v, ftd->cnt.flow_bypassed_pkts, (uint64_t)counters->bypassed_pkts);
716 StatsAddUI64(th_v, ftd->cnt.flow_bypassed_bytes, (uint64_t)counters->bypassed_bytes);
717
718 StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_maxlen, (uint64_t)counters->rows_maxlen);
719}
720
721static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void **data)
722{
724 if (ftd == NULL)
725 return TM_ECODE_FAILED;
726
727 ftd->instance = SC_ATOMIC_ADD(flowmgr_cnt, 1);
728 SCLogDebug("flow manager instance %u", ftd->instance);
729
730 /* set the min and max value used for hash row walking
731 * each thread has it's own section of the flow hash */
732 uint32_t range = flow_config.hash_size / flowmgr_number;
733
734 ftd->min = ftd->instance * range;
735 ftd->max = (ftd->instance + 1) * range;
736
737 /* last flow-manager takes on hash_size % flowmgr_number extra rows */
738 if ((ftd->instance + 1) == flowmgr_number) {
740 }
742
743 SCLogDebug("instance %u hash range %u %u", ftd->instance, ftd->min, ftd->max);
744
745 /* pass thread data back to caller */
746 *data = ftd;
747
748 FlowCountersInit(t, &ftd->cnt);
749 ftd->counter_defrag_timeout = StatsRegisterCounter("defrag.mgr.tracker_timeout", t);
750 ftd->counter_defrag_memuse = StatsRegisterCounter("defrag.memuse", t);
751
753 return TM_ECODE_OK;
754}
755
756static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data)
757{
760 SCFree(data);
761 return TM_ECODE_OK;
762}
763
764/** \internal
765 * \brief calculate number of rows to scan and how much time to sleep
766 * based on the busy score `mp` (0 idle, 100 max busy).
767 *
768 * We try to to make sure we scan the hash once a second. The number size
769 * of the slice of the hash scanned is determined by our busy score 'mp'.
770 * We sleep for the remainder of the second after processing the slice,
771 * or at least an approximation of it.
772 * A minimum busy score of 10 is assumed to avoid a longer than 10 second
773 * full hash pass. This is to avoid burstiness in scanning when there is
774 * a rapid increase of the busy score, which could lead to the flow manager
775 * suddenly scanning a much larger slice of the hash leading to a burst
776 * in scan/eviction work.
777 *
778 * \param rows number of rows for the work unit
779 * \param mp current memcap pressure value
780 * \param emergency emergency mode is set or not
781 * \param wu_sleep holds value of sleep time per worker unit
782 * \param wu_rows holds value of calculated rows to be processed per second
783 * \param rows_sec same as wu_rows, only used for counter updates
784 */
785static void GetWorkUnitSizing(const uint32_t rows, const uint32_t mp, const bool emergency,
786 uint64_t *wu_sleep, uint32_t *wu_rows, uint32_t *rows_sec)
787{
788 if (emergency) {
789 *wu_rows = rows;
790 *wu_sleep = 250;
791 return;
792 }
793 /* minimum busy score is 10 */
794 const uint32_t emp = MAX(mp, 10);
795 const uint32_t rows_per_sec = (uint32_t)((float)rows * (float)((float)emp / (float)100));
796 /* calc how much time we estimate the work will take, in ms. We assume
797 * each row takes an average of 1usec. Maxing out at 1sec. */
798 const uint32_t work_per_unit = MIN(rows_per_sec / 1000, 1000);
799 /* calc how much time we need to sleep to get to the per second cadence
800 * but sleeping for at least 250ms. */
801 const uint32_t sleep_per_unit = MAX(250, 1000 - work_per_unit);
802 SCLogDebug("mp %u emp %u rows %u rows_sec %u sleep %ums", mp, emp, rows, rows_per_sec,
803 sleep_per_unit);
804
805 *wu_sleep = sleep_per_unit;
806 *wu_rows = rows_per_sec;
807 *rows_sec = rows_per_sec;
808}
809
810/** \brief Thread that manages the flow table and times out flows.
811 *
812 * \param td ThreadVars cast to void ptr
813 *
814 * Keeps an eye on the spare list, alloc flows if needed...
815 */
816static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
817{
818 FlowManagerThreadData *ftd = thread_data;
819 const uint32_t rows = ftd->max - ftd->min;
820 const bool time_is_live = TimeModeIsLive();
821
822 uint32_t emerg_over_cnt = 0;
823 uint64_t next_run_ms = 0;
824 uint32_t pos = ftd->min;
825 uint32_t rows_sec = 0;
826 uint32_t rows_per_wu = 0;
827 uint64_t sleep_per_wu = 0;
828 bool prev_emerg = false;
829 uint32_t other_last_sec = 0; /**< last sec stamp when defrag etc ran */
830
831 uint32_t mp = MemcapsGetPressure() * 100;
832 if (ftd->instance == 0) {
833 StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
834 StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
835 }
836 GetWorkUnitSizing(rows, mp, false, &sleep_per_wu, &rows_per_wu, &rows_sec);
837 StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);
838
840 /* don't start our activities until time is setup */
841 while (!TimeModeIsReady()) {
842 if (suricata_ctl_flags != 0)
843 return TM_ECODE_OK;
844 SleepUsec(10);
845 }
846 bool run = TmThreadsWaitForUnpause(th_v);
847
848 while (run) {
849 bool emerg = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0);
850
851 /* Get the time: real time in live mode, or a min() of the
852 * "active" threads in offline mode. See TmThreadsGetMinimalTimestamp */
853 SCTime_t ts = TimeGet();
854
855 SCLogDebug("ts %" PRIdMAX "", (intmax_t)SCTIME_SECS(ts));
856 uint64_t ts_ms = SCTIME_MSECS(ts);
857 const bool emerge_p = (emerg && !prev_emerg);
858 if (emerge_p) {
859 next_run_ms = 0;
860 prev_emerg = true;
861 SCLogNotice("Flow emergency mode entered...");
863 }
864 if (ts_ms >= next_run_ms) {
865 if (ftd->instance == 0) {
866 const uint32_t sq_len = FlowSpareGetPoolSize();
867 const uint32_t spare_perc = sq_len * 100 / MAX(flow_config.prealloc, 1);
868 /* see if we still have enough spare flows */
869 if (spare_perc < 90 || spare_perc > 110) {
870 FlowSparePoolUpdate(sq_len);
871 }
872 }
873
874 /* try to time out flows */
875 // clang-format off
876 FlowTimeoutCounters counters = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, };
877 // clang-format on
878
879 if (emerg) {
880 /* in emergency mode, do a full pass of the hash table */
881 FlowTimeoutHash(&ftd->timeout, ts, ftd->min, ftd->max, &counters);
882 StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
883 } else {
884 SCLogDebug("hash %u:%u slice starting at %u with %u rows", ftd->min, ftd->max, pos,
885 rows_per_wu);
886
887 const uint32_t ppos = pos;
888 FlowTimeoutHashInChunks(&ftd->timeout, ts, ftd->min, ftd->max, &counters,
889 rows_per_wu, &pos, ftd->instance);
890 if (ppos > pos) {
891 StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
892 }
893 }
894
895 const uint32_t spare_pool_len = FlowSpareGetPoolSize();
896 StatsSetUI64(th_v, ftd->cnt.flow_mgr_spare, (uint64_t)spare_pool_len);
897
898 FlowCountersUpdate(th_v, ftd, &counters);
899
900 if (emerg) {
901 SCLogDebug("flow_sparse_q.len = %" PRIu32 " prealloc: %" PRIu32
902 "flow_spare_q status: %" PRIu32 "%% flows at the queue",
903 spare_pool_len, flow_config.prealloc,
904 spare_pool_len * 100 / MAX(flow_config.prealloc, 1));
905
906 /* only if we have pruned this "emergency_recovery" percentage
907 * of flows, we will unset the emergency bit */
908 if ((spare_pool_len * 100 / MAX(flow_config.prealloc, 1)) >
910 emerg_over_cnt++;
911 } else {
912 emerg_over_cnt = 0;
913 }
914
915 if (emerg_over_cnt >= 30) {
916 SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY);
918
919 emerg = false;
920 prev_emerg = false;
921 emerg_over_cnt = 0;
922 SCLogNotice("Flow emergency mode over, back to normal... unsetting"
923 " FLOW_EMERGENCY bit (ts.tv_sec: %" PRIuMAX ", "
924 "ts.tv_usec:%" PRIuMAX ") flow_spare_q status(): %" PRIu32
925 "%% flows at the queue",
926 (uintmax_t)SCTIME_SECS(ts), (uintmax_t)SCTIME_USECS(ts),
927 spare_pool_len * 100 / MAX(flow_config.prealloc, 1));
928
930 }
931 }
932
933 /* update work units */
934 const uint32_t pmp = mp;
935 mp = MemcapsGetPressure() * 100;
936 if (ftd->instance == 0) {
937 StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
938 StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
939 }
940 GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
941 if (pmp != mp) {
942 StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);
943 }
944
945 next_run_ms = ts_ms + sleep_per_wu;
946 }
947 if (other_last_sec == 0 || other_last_sec < (uint32_t)SCTIME_SECS(ts)) {
948 if (ftd->instance == 0) {
950 uint32_t defrag_cnt = DefragTimeoutHash(ts);
951 if (defrag_cnt) {
952 StatsAddUI64(th_v, ftd->counter_defrag_timeout, defrag_cnt);
953 }
958 other_last_sec = (uint32_t)SCTIME_SECS(ts);
959 }
960 }
961
962 if (TmThreadsCheckFlag(th_v, THV_KILL)) {
963 StatsSyncCounters(th_v);
964 break;
965 }
966
967 if (emerg || !time_is_live) {
968 SleepUsec(250);
969 } else {
970 struct timeval cond_tv;
971 gettimeofday(&cond_tv, NULL);
972 struct timeval add_tv;
973 add_tv.tv_sec = sleep_per_wu / 1000;
974 add_tv.tv_usec = (sleep_per_wu % 1000) * 1000;
975 timeradd(&cond_tv, &add_tv, &cond_tv);
976
977 struct timespec cond_time = FROM_TIMEVAL(cond_tv);
978 SCCtrlMutexLock(&flow_manager_ctrl_mutex);
979 while (1) {
980 if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
981 break;
982 }
983 int rc = SCCtrlCondTimedwait(
984 &flow_manager_ctrl_cond, &flow_manager_ctrl_mutex, &cond_time);
985 if (rc == ETIMEDOUT || rc < 0) {
986 break;
987 }
988 }
989 SCCtrlMutexUnlock(&flow_manager_ctrl_mutex);
990 }
991
992 SCLogDebug("woke up... %s", SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY ? "emergency":"");
993
995 }
996 return TM_ECODE_OK;
997}
998
999/** \brief spawn the flow manager thread */
1001{
1002 intmax_t setting = 1;
1003 (void)SCConfGetInt("flow.managers", &setting);
1004
1005 if (setting < 1 || setting > 1024) {
1006 FatalError("invalid flow.managers setting %" PRIdMAX, setting);
1007 }
1008 flowmgr_number = (uint32_t)setting;
1009
1010 SCLogConfig("using %u flow manager threads", flowmgr_number);
1012
1013 for (uint32_t u = 0; u < flowmgr_number; u++) {
1015 snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_mgr, u+1);
1016
1018 "FlowManager", 0);
1019 BUG_ON(tv_flowmgr == NULL);
1020
1021 if (tv_flowmgr == NULL) {
1022 FatalError("flow manager thread creation failed");
1023 }
1024 if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
1025 FatalError("flow manager thread spawn failed");
1026 }
1027 }
1028}
1029
1041
1042static TmEcode FlowRecyclerThreadInit(ThreadVars *t, const void *initdata, void **data)
1043{
1045 if (ftd == NULL)
1046 return TM_ECODE_FAILED;
1048 SCLogError("initializing flow log API for thread failed");
1049 SCFree(ftd);
1050 return TM_ECODE_FAILED;
1051 }
1052 SCLogDebug("output_thread_data %p", ftd->output_thread_data);
1053
1054 ftd->counter_flows = StatsRegisterCounter("flow.recycler.recycled", t);
1055 ftd->counter_queue_avg = StatsRegisterAvgCounter("flow.recycler.queue_avg", t);
1056 ftd->counter_queue_max = StatsRegisterMaxCounter("flow.recycler.queue_max", t);
1057
1058 ftd->counter_flow_active = StatsRegisterCounter("flow.active", t);
1059 ftd->counter_tcp_active_sessions = StatsRegisterCounter("tcp.active_sessions", t);
1060
1061 FlowEndCountersRegister(t, &ftd->fec);
1062
1063 *data = ftd;
1064 return TM_ECODE_OK;
1065}
1066
1067static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data)
1068{
1070
1072 if (ftd->output_thread_data != NULL)
1074
1075 SCFree(data);
1076 return TM_ECODE_OK;
1077}
1078
1079static void Recycler(ThreadVars *tv, FlowRecyclerThreadData *ftd, Flow *f)
1080{
1081 FLOWLOCK_WRLOCK(f);
1082
1083 (void)OutputFlowLog(tv, ftd->output_thread_data, f);
1084
1085 FlowEndCountersUpdate(tv, &ftd->fec, f);
1086 if (f->proto == IPPROTO_TCP && f->protoctx != NULL) {
1088 }
1092 FLOWLOCK_UNLOCK(f);
1093}
1094
1095/** \brief Thread that manages timed out flows.
1096 *
1097 * \param td ThreadVars cast to void ptr
1098 */
1099static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
1100{
1101 FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data;
1102 BUG_ON(ftd == NULL);
1103 const bool time_is_live = TimeModeIsLive();
1104 uint64_t recycled_cnt = 0;
1105 FlowQueuePrivate ret_queue = { NULL, NULL, 0 };
1106
1108 bool run = TmThreadsWaitForUnpause(th_v);
1109
1110 while (run) {
1111 SC_ATOMIC_ADD(flowrec_busy,1);
1113
1114 StatsAddUI64(th_v, ftd->counter_queue_avg, list.len);
1115 StatsSetUI64(th_v, ftd->counter_queue_max, list.len);
1116
1117 const int bail = (TmThreadsCheckFlag(th_v, THV_KILL));
1118
1119 /* Get the time */
1120 SCLogDebug("ts %" PRIdMAX "", (intmax_t)SCTIME_SECS(TimeGet()));
1121
1122 uint64_t cnt = 0;
1123 Flow *f;
1124 while ((f = FlowQueuePrivateGetFromTop(&list)) != NULL) {
1125 Recycler(th_v, ftd, f);
1126 cnt++;
1127
1128 /* for every full sized block, add it to the spare pool */
1129 FlowQueuePrivateAppendFlow(&ret_queue, f);
1130 if (ret_queue.len == FLOW_SPARE_POOL_BLOCK_SIZE) {
1131 FlowSparePoolReturnFlows(&ret_queue);
1132 }
1133 }
1134 if (ret_queue.len > 0) {
1135 FlowSparePoolReturnFlows(&ret_queue);
1136 }
1137 if (cnt > 0) {
1138 recycled_cnt += cnt;
1139 StatsAddUI64(th_v, ftd->counter_flows, cnt);
1140 }
1141 SC_ATOMIC_SUB(flowrec_busy,1);
1142
1143 if (bail) {
1144 break;
1145 }
1146
1147 const bool emerg = (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY);
1148 if (emerg || !time_is_live) {
1149 SleepUsec(250);
1150 } else {
1151 struct timeval cond_tv;
1152 gettimeofday(&cond_tv, NULL);
1153 cond_tv.tv_sec += 1;
1154 struct timespec cond_time = FROM_TIMEVAL(cond_tv);
1155 SCCtrlMutexLock(&flow_recycler_ctrl_mutex);
1156 while (1) {
1157 if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
1158 break;
1159 }
1160 if (SC_ATOMIC_GET(flow_recycle_q.non_empty)) {
1161 break;
1162 }
1163 int rc = SCCtrlCondTimedwait(
1164 &flow_recycler_ctrl_cond, &flow_recycler_ctrl_mutex, &cond_time);
1165 if (rc == ETIMEDOUT || rc < 0) {
1166 break;
1167 }
1168 }
1169 SCCtrlMutexUnlock(&flow_recycler_ctrl_mutex);
1170 }
1171
1172 SCLogDebug("woke up...");
1173
1175 }
1176 StatsSyncCounters(th_v);
1177 SCLogPerf("%"PRIu64" flows processed", recycled_cnt);
1178 return TM_ECODE_OK;
1179}
1180
1181static bool FlowRecyclerReadyToShutdown(void)
1182{
1183 if (SC_ATOMIC_GET(flowrec_busy) != 0) {
1184 return false;
1185 }
1186 uint32_t len = 0;
1188 len = flow_recycle_q.qlen;
1190
1191 return ((len == 0));
1192}
1193
1194/** \brief spawn the flow recycler thread */
1196{
1197 intmax_t setting = 1;
1198 (void)SCConfGetInt("flow.recyclers", &setting);
1199
1200 if (setting < 1 || setting > 1024) {
1201 FatalError("invalid flow.recyclers setting %" PRIdMAX, setting);
1202 }
1203 flowrec_number = (uint32_t)setting;
1204
1205 SCLogConfig("using %u flow recycler threads", flowrec_number);
1206
1207 for (uint32_t u = 0; u < flowrec_number; u++) {
1209 snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_rec, u+1);
1210
1212 "FlowRecycler", 0);
1213
1214 if (tv_flowrec == NULL) {
1215 FatalError("flow recycler thread creation failed");
1216 }
1217 if (TmThreadSpawn(tv_flowrec) != TM_ECODE_OK) {
1218 FatalError("flow recycler thread spawn failed");
1219 }
1220 }
1221}
1222
1223/**
1224 * \brief Used to disable flow recycler thread(s).
1225 *
1226 * \note this should only be called when the flow manager is already gone
1227 *
1228 * \todo Kinda hackish since it uses the tv name to identify flow recycler
1229 * thread. We need an all weather identification scheme.
1230 */
1232{
1233 /* move all flows still in the hash to the recycler queue */
1234#ifndef DEBUG
1235 (void)FlowCleanupHash();
1236#else
1237 uint32_t flows = FlowCleanupHash();
1238 SCLogDebug("flows to progress: %u", flows);
1239#endif
1240
1241 /* make sure all flows are processed */
1242 do {
1244 SleepUsec(10);
1245 } while (!FlowRecyclerReadyToShutdown());
1246
1248 /* flow recycler thread(s) is/are a part of mgmt threads */
1249 for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
1250 if (strncasecmp(tv->name, thread_name_flow_rec,
1251 strlen(thread_name_flow_rec)) == 0)
1252 {
1254 }
1255 }
1257
1258 struct timeval start_ts;
1259 struct timeval cur_ts;
1260 gettimeofday(&start_ts, NULL);
1261
1262again:
1263 gettimeofday(&cur_ts, NULL);
1264 if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1265 FatalError("unable to get all flow recycler "
1266 "threads to shutdown in time");
1267 }
1268
1270 for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
1271 if (strncasecmp(tv->name, thread_name_flow_rec,
1272 strlen(thread_name_flow_rec)) == 0)
1273 {
1277 /* sleep outside lock */
1278 SleepMsec(1);
1279 goto again;
1280 }
1281 }
1282 }
1284
1285 /* reset count, so we can kill and respawn (unix socket) */
1286 SC_ATOMIC_SET(flowrec_cnt, 0);
1287}
1288
1290{
1291 tmm_modules[TMM_FLOWMANAGER].name = "FlowManager";
1292 tmm_modules[TMM_FLOWMANAGER].ThreadInit = FlowManagerThreadInit;
1293 tmm_modules[TMM_FLOWMANAGER].ThreadDeinit = FlowManagerThreadDeinit;
1294 tmm_modules[TMM_FLOWMANAGER].Management = FlowManager;
1297 SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name);
1298
1299 SC_ATOMIC_INIT(flowmgr_cnt);
1300 SC_ATOMIC_INITPTR(flow_timeouts);
1301}
1302
1304{
1305 tmm_modules[TMM_FLOWRECYCLER].name = "FlowRecycler";
1306 tmm_modules[TMM_FLOWRECYCLER].ThreadInit = FlowRecyclerThreadInit;
1307 tmm_modules[TMM_FLOWRECYCLER].ThreadDeinit = FlowRecyclerThreadDeinit;
1308 tmm_modules[TMM_FLOWRECYCLER].Management = FlowRecycler;
1311 SCLogDebug("%s registered", tmm_modules[TMM_FLOWRECYCLER].name);
1312
1313 SC_ATOMIC_INIT(flowrec_cnt);
1314 SC_ATOMIC_INIT(flowrec_busy);
1315}
uint8_t len
uint32_t HttpRangeContainersTimeoutHash(const SCTime_t ts)
int SCConfGetInt(const char *name, intmax_t *val)
Retrieve a configuration value as an integer.
Definition conf.c:414
uint16_t StatsRegisterGlobalCounter(const char *name, uint64_t(*Func)(void))
Registers a counter, which represents a global value.
Definition counters.c:1010
void StatsDecr(ThreadVars *tv, uint16_t id)
Decrements the local counter.
Definition counters.c:186
void StatsSyncCounters(ThreadVars *tv)
Definition counters.c:445
uint16_t StatsRegisterMaxCounter(const char *name, struct ThreadVars_ *tv)
Registers a counter, whose value holds the maximum of all the values assigned to it.
Definition counters.c:992
uint16_t StatsRegisterCounter(const char *name, struct ThreadVars_ *tv)
Registers a normal, unqualified counter.
Definition counters.c:952
void StatsSetUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Sets a value of type double to the local counter.
Definition counters.c:207
void StatsSyncCountersIfSignalled(ThreadVars *tv)
Definition counters.c:450
void StatsIncr(ThreadVars *tv, uint16_t id)
Increments the local counter.
Definition counters.c:166
uint16_t StatsRegisterAvgCounter(const char *name, struct ThreadVars_ *tv)
Registers a counter, whose value holds the average of all the values assigned to it.
Definition counters.c:972
void StatsAddUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Adds a value of type uint64_t to the local counter.
Definition counters.c:146
uint64_t DefragTrackerGetMemcap(void)
Return memcap value.
Definition defrag-hash.c:63
uint32_t DefragTimeoutHash(SCTime_t ts)
time out tracker from the hash
void SCFlowRunFinishCallbacks(ThreadVars *tv, Flow *f)
FlowBucket * flow_hash
Definition flow-hash.c:59
Flow * evicted
Definition flow-hash.h:4
#define FBLOCK_LOCK(fb)
Definition flow-hash.h:73
#define FBLOCK_UNLOCK(fb)
Definition flow-hash.h:75
void FlowTimeoutsEmergency(void)
void FlowManagerThreadSpawn(void)
spawn the flow manager thread
#define TYPE
void FlowWakeupFlowManagerThread(void)
void TmModuleFlowRecyclerRegister(void)
void FlowRecyclerThreadSpawn(void)
spawn the flow recycler thread
void FlowTimeoutsInit(void)
void FlowWakeupFlowRecyclerThread(void)
#define RECYCLE_MAX_QUEUE_ITEMS
#define BITS
FlowQueue flow_recycle_q
void FlowDisableFlowManagerThread(void)
Used to disable flow manager thread(s).
struct FlowManagerThreadData_ FlowManagerThreadData
struct FlowTimeoutCounters_ FlowTimeoutCounters
void TmModuleFlowManagerRegister(void)
void FlowDisableFlowRecyclerThread(void)
Used to disable flow recycler thread(s).
struct FlowRecyclerThreadData_ FlowRecyclerThreadData
struct FlowCounters_ FlowCounters
#define FlowTimeoutsReset()
@ FLOW_PROTO_MAX
#define FLOW_EMERGENCY
FlowConfig flow_config
Definition flow.c:93
FlowProtoTimeout flow_timeouts_normal[FLOW_PROTO_MAX]
Definition flow.c:88
FlowProtoTimeout flow_timeouts_emerg[FLOW_PROTO_MAX]
Definition flow.c:89
void FlowQueuePrivateAppendFlow(FlowQueuePrivate *fqc, Flow *f)
Definition flow-queue.c:65
Flow * FlowQueuePrivateGetFromTop(FlowQueuePrivate *fqc)
Definition flow-queue.c:151
void FlowQueueAppendPrivate(FlowQueue *fq, FlowQueuePrivate *fqc)
Definition flow-queue.c:119
FlowQueuePrivate FlowQueueExtractPrivate(FlowQueue *fq)
Definition flow-queue.c:140
#define FQLOCK_LOCK(q)
Definition flow-queue.h:72
#define FQLOCK_UNLOCK(q)
Definition flow-queue.h:74
uint32_t FlowSpareGetPoolSize(void)
void FlowSparePoolUpdate(uint32_t size)
void FlowSparePoolReturnFlows(FlowQueuePrivate *fqp)
#define FLOW_SPARE_POOL_BLOCK_SIZE
void * FlowGetStorageById(const Flow *f, FlowStorageId id)
void FlowSendToLocalThread(Flow *f)
bool FlowNeedsReassembly(Flow *f)
Check if a flow needs forced reassembly, or any other processing.
void FlowEndCountersRegister(ThreadVars *t, FlowEndCounters *fec)
Definition flow-util.c:246
FlowStorageId GetFlowBypassInfoID(void)
Definition flow-util.c:222
int FlowClearMemory(Flow *f, uint8_t proto_map)
Function clear the flow memory before queueing it to spare flow queue.
Definition flow.c:1097
uint64_t FlowGetMemuse(void)
Definition flow.c:128
#define FLOW_IS_IPV6(f)
Definition flow.h:172
#define FLOW_ACTION_DROP
Definition flow.h:70
#define FLOW_END_FLAG_TIMEOUT
Definition flow.h:243
#define FLOW_TIMEOUT_REASSEMBLY_DONE
Definition flow.h:97
#define FLOW_END_FLAG_SHUTDOWN
Definition flow.h:245
#define FLOWLOCK_UNLOCK(fb)
Definition flow.h:273
#define FLOW_IS_IPV4(f)
Definition flow.h:170
#define FLOWLOCK_WRLOCK(fb)
Definition flow.h:270
ThreadVars * tv
uint32_t ThresholdsExpire(const SCTime_t ts)
uint32_t HostTimeoutHash(SCTime_t ts)
time out hosts from the hash
uint32_t IPPairTimeoutHash(SCTime_t ts)
time out ippairs from the hash
TmEcode OutputFlowLogThreadDeinit(ThreadVars *tv, void *thread_data)
TmEcode OutputFlowLogThreadInit(ThreadVars *tv, void **data)
thread init for the flow logger This will run the thread init functions for the individual registered...
TmEcode OutputFlowLog(ThreadVars *tv, void *thread_data, Flow *f)
Run flow logger(s)
Definition output-flow.c:87
float MemcapsGetPressure(void)
const char * thread_name_flow_mgr
Definition runmodes.c:70
const char * thread_name_flow_rec
Definition runmodes.c:71
uint64_t ts
void StreamTcpThreadCacheCleanup(void)
Flow * evicted
Definition flow-hash.h:48
void * bypass_data
Definition flow.h:532
uint64_t tosrcpktcnt
Definition flow.h:533
uint64_t tosrcbytecnt
Definition flow.h:534
uint64_t todstpktcnt
Definition flow.h:535
bool(* BypassUpdate)(Flow *f, void *data, time_t tsec)
Definition flow.h:530
uint64_t todstbytecnt
Definition flow.h:536
uint32_t prealloc
Definition flow.h:295
uint32_t emergency_recovery
Definition flow.h:300
uint32_t hash_size
Definition flow.h:294
uint16_t flow_mgr_full_pass
uint16_t flow_emerg_mode_enter
uint16_t flow_mgr_rows_sec
uint16_t flow_mgr_flows_checked
uint16_t flow_mgr_rows_maxlen
uint16_t flow_mgr_flows_aside
uint16_t flow_mgr_spare
uint16_t flow_bypassed_pkts
uint16_t memcap_pressure_max
uint16_t flow_bypassed_bytes
uint16_t flow_mgr_flows_timeout
uint16_t flow_emerg_mode_over
uint16_t flow_mgr_flows_notimeout
uint16_t memcap_pressure
uint16_t flow_mgr_flows_aside_needs_work
uint16_t flow_bypassed_cnt_clo
FlowManagerTimeoutThread timeout
FlowQueuePrivate aside_queue
uint32_t flows_aside_needs_work
uint32_t flows_aside_needs_work
Definition flow-worker.c:62
Flow data structure.
Definition flow.h:356
uint8_t proto
Definition flow.h:378
uint32_t flags
Definition flow.h:421
uint8_t flow_end_flags
Definition flow.h:447
FlowStateType flow_state
Definition flow.h:412
uint32_t timeout_policy
Definition flow.h:405
void * protoctx
Definition flow.h:441
SCTime_t lastts
Definition flow.h:410
struct LiveDevice_ * livedev
Definition flow.h:398
uint8_t protomap
Definition flow.h:445
struct FlowBucket_ * fb
Definition flow.h:491
FlowThreadId thread_id[2]
Definition flow.h:394
struct Flow_ * next
Definition flow.h:396
Per thread variable structure.
Definition threadvars.h:58
char name[16]
Definition threadvars.h:65
struct ThreadVars_ * next
Definition threadvars.h:125
const char * name
Definition tm-modules.h:48
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition tm-modules.h:53
uint8_t cap_flags
Definition tm-modules.h:77
uint8_t flags
Definition tm-modules.h:80
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition tm-modules.h:51
TmEcode(* Management)(ThreadVars *, void *)
Definition tm-modules.h:69
#define BUG_ON(x)
#define MIN(x, y)
#define MAX(x, y)
volatile uint8_t suricata_ctl_flags
Definition suricata.c:172
#define SCCtrlCondTimedwait
#define SCCtrlMutexLock(mut)
#define SCCtrlCondT
#define SCCtrlMutex
#define SCMutexUnlock(mut)
#define SCCtrlMutexUnlock(mut)
#define SCCtrlCondSignal
#define SCMutexLock(mut)
#define THV_RUNNING_DONE
Definition threadvars.h:46
#define THV_KILL
Definition threadvars.h:40
#define THV_RUNNING
Definition threadvars.h:55
TmModule tmm_modules[TMM_SIZE]
Definition tm-modules.c:29
#define TM_FLAG_MANAGEMENT_TM
Definition tm-modules.h:36
@ TVT_MGMT
@ TMM_FLOWMANAGER
@ TMM_FLOWRECYCLER
@ TM_ECODE_FAILED
@ TM_ECODE_OK
SCTime_t TmThreadsGetThreadTime(const int idx)
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
Definition tm-threads.c:93
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
const char * name
ThreadVars * tv_root[TVT_MAX]
Definition tm-threads.c:82
SCMutex tv_root_lock
Definition tm-threads.c:85
bool TmThreadsWaitForUnpause(ThreadVars *tv)
Wait for a thread to become unpaused.
Definition tm-threads.c:363
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition tm-threads.c:101
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
#define SleepMsec(msec)
Definition tm-threads.h:45
#define TM_THREAD_NAME_MAX
Definition tm-threads.h:49
#define SleepUsec(usec)
Definition tm-threads.h:44
void PacketPoolDestroy(void)
void PacketPoolInit(void)
uint32_t cnt
#define SC_ATOMIC_ADD(name, val)
add a value to our atomic variable
#define SC_ATOMIC_INIT(name)
wrapper for initializing an atomic variable.
#define SC_ATOMIC_LOAD_EXPLICIT(name, order)
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value to our atomic variable.
#define SC_ATOMIC_INITPTR(name)
#define SC_ATOMIC_EXTERN(type, name)
wrapper for referencing an atomic variable declared on another file.
#define SC_ATOMIC_DECLARE(type, name)
wrapper for declaring atomic variables.
#define SC_ATOMIC_SUB(name, val)
sub a value from our atomic variable
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
#define SC_ATOMIC_MEMORY_ORDER_RELAXED
#define FatalError(...)
Definition util-debug.h:510
#define SCLogPerf(...)
Definition util-debug.h:234
#define SCLogDebug(...)
Definition util-debug.h:275
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition util-debug.h:243
#define SCLogError(...)
Macro used to log ERROR messages.
Definition util-debug.h:267
#define SCLogConfig(...)
Definition util-debug.h:229
void LiveDevSubBypassStats(LiveDevice *dev, uint64_t cnt, int family)
#define SCFree(p)
Definition util-mem.h:61
#define SCCalloc(nm, sz)
Definition util-mem.h:53
bool TimeModeIsLive(void)
Definition util-time.c:111
bool TimeModeIsReady(void)
Definition util-time.c:92
SCTime_t TimeGet(void)
Definition util-time.c:152
#define timeradd(a, b, r)
Definition util-time.h:127
#define SCTIME_MSECS(t)
Definition util-time.h:58
#define SCTIME_SECS(t)
Definition util-time.h:57
#define SCTIME_CMP_LT(a, b)
Definition util-time.h:105
#define SCTIME_ADD_SECS(ts, s)
Definition util-time.h:64
#define FROM_TIMEVAL(timev)
initialize a 'struct timespec' from a 'struct timeval'.
Definition util-time.h:124
#define SCTIME_USECS(t)
Definition util-time.h:56
#define DEBUG_VALIDATE_BUG_ON(exp)