suricata
tm-threads.c
Go to the documentation of this file.
1/* Copyright (C) 2007-2024 Open Information Security Foundation
2 *
3 * You can copy, redistribute or modify this Program under the terms of
4 * the GNU General Public License version 2 as published by the Free
5 * Software Foundation.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * version 2 along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15 * 02110-1301, USA.
16 */
17
18/**
19 * \file
20 *
21 * \author Victor Julien <victor@inliniac.net>
22 * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23 * \author Eric Leblond <eric@regit.org>
24 *
25 * Thread management functions.
26 */
27
28#include "suricata-common.h"
29#include "suricata.h"
30#include "stream.h"
31#include "runmodes.h"
32#include "thread-callbacks.h"
33#include "threadvars.h"
34#include "thread-storage.h"
35#include "tm-queues.h"
36#include "tm-queuehandlers.h"
37#include "tm-threads.h"
38#include "tmqh-packetpool.h"
39#include "threads.h"
40#include "util-affinity.h"
41#include "util-debug.h"
42#include "util-privs.h"
43#include "util-cpu.h"
44#include "util-optimize.h"
45#include "util-profiling.h"
46#include "util-signal.h"
47#include "queue.h"
48#include "util-validate.h"
49
50#ifdef PROFILE_LOCKING
51thread_local uint64_t mutex_lock_contention;
52thread_local uint64_t mutex_lock_wait_ticks;
53thread_local uint64_t mutex_lock_cnt;
54
55thread_local uint64_t spin_lock_contention;
56thread_local uint64_t spin_lock_wait_ticks;
57thread_local uint64_t spin_lock_cnt;
58
59thread_local uint64_t rww_lock_contention;
60thread_local uint64_t rww_lock_wait_ticks;
61thread_local uint64_t rww_lock_cnt;
62
63thread_local uint64_t rwr_lock_contention;
64thread_local uint64_t rwr_lock_wait_ticks;
65thread_local uint64_t rwr_lock_cnt;
66#endif
67
68#ifdef OS_FREEBSD
69#include <sched.h>
70#include <sys/param.h>
71#include <sys/resource.h>
72#include <sys/cpuset.h>
73#include <sys/thr.h>
74#define cpu_set_t cpuset_t
75#endif /* OS_FREEBSD */
76
77/* prototypes */
78static int SetCPUAffinity(uint16_t cpu);
79static void TmThreadDeinitMC(ThreadVars *tv);
80
81/* root of the threadvars list */
83
84/* lock to protect tv_root */
86
87/**
88 * \brief Check if a thread flag is set.
89 *
90 * \retval 1 flag is set.
91 * \retval 0 flag is not set.
92 */
93int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
94{
95 return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0;
96}
97
98/**
99 * \brief Set a thread flag.
100 */
101void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
102{
103 SC_ATOMIC_OR(tv->flags, flag);
104}
105
106/**
107 * \brief Unset a thread flag.
108 */
109void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
110{
111 SC_ATOMIC_AND(tv->flags, ~flag);
112}
113
115 ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot)
116{
117 while (decode_pq->top != NULL) {
118 Packet *extra_p = PacketDequeueNoLock(decode_pq);
119 if (unlikely(extra_p == NULL))
120 continue;
121 DEBUG_VALIDATE_BUG_ON(extra_p->flow != NULL);
122
123 if (TmThreadsSlotProcessPkt(tv, slot, extra_p) != TM_ECODE_OK) {
125 }
126 }
128}
129
130/**
131 * \brief Separate run function so we can call it recursively.
132 */
134{
135 for (TmSlot *s = slot; s != NULL; s = s->slot_next) {
136 PACKET_PROFILING_TMM_START(p, s->tm_id);
137 TmEcode r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data));
138 PACKET_PROFILING_TMM_END(p, s->tm_id);
139 DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
140
141 /* handle error */
142 if (unlikely(r == TM_ECODE_FAILED)) {
143 /* Encountered error. Return packets to packetpool and return */
144 TmThreadsSlotProcessPktFail(tv, NULL);
145 return TM_ECODE_FAILED;
146 }
147 if (s->tm_flags & TM_FLAG_DECODE_TM) {
148 if (TmThreadsProcessDecodePseudoPackets(tv, &tv->decode_pq, s->slot_next) !=
149 TM_ECODE_OK) {
150 return TM_ECODE_FAILED;
151 }
152 }
153 }
154
155 return TM_ECODE_OK;
156}
157
158/** \internal
159 *
160 * \brief Process flow timeout packets
161 *
162 * Process flow timeout pseudo packets. During shutdown this loop
163 * is run until the flow engine kills the thread and the queue is
164 * empty.
165 */
167{
168 TmSlot *fw_slot = tv->tm_flowworker;
169 int r = TM_ECODE_OK;
170
171 if (tv->stream_pq == NULL || fw_slot == NULL) {
172 SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, fw_slot);
173 return r;
174 }
175
176 SCLogDebug("flow end loop starting");
177 while (1) {
179 uint32_t len = tv->stream_pq->len;
181 if (len > 0) {
182 while (len--) {
186 if (likely(p)) {
187 DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
188 r = TmThreadsSlotProcessPkt(tv, fw_slot, p);
189 if (r == TM_ECODE_FAILED) {
190 break;
191 }
192 }
193 }
194 } else {
196 break;
197 }
198 SleepUsec(1);
199 }
200 }
201 SCLogDebug("flow end loop complete");
203
204 return r;
205}
206
207static bool TmThreadsSlotPktAcqLoopInit(ThreadVars *tv)
208{
209 TmSlot *s = tv->tm_slots;
210
212
213 if (tv->thread_setup_flags != 0)
215
218
219 for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
220 if (slot->SlotThreadInit != NULL) {
221 void *slot_data = NULL;
222 TmEcode r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
223 if (r != TM_ECODE_OK) {
224 if (r == TM_ECODE_DONE) {
225 EngineDone();
227 goto error;
228 } else {
230 goto error;
231 }
232 }
233 (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
234 }
235
236 /* if the flowworker module is the first, get the threads input queue */
237 if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
238 tv->stream_pq = tv->inq->pq;
239 tv->tm_flowworker = slot;
240 SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
242 if (tv->flow_queue == NULL) {
244 goto error;
245 }
246 /* setup a queue */
247 } else if (slot->tm_id == TMM_FLOWWORKER) {
248 tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
249 if (tv->stream_pq_local == NULL)
250 FatalError("failed to alloc PacketQueue");
253 tv->tm_flowworker = slot;
254 SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
256 if (tv->flow_queue == NULL) {
258 goto error;
259 }
260 }
261 }
262
264
266
267 return true;
268
269error:
270 return false;
271}
272
274{
275 TmSlot *s = tv->tm_slots;
276 bool rc = true;
277
279
281
282 /* process all pseudo packets the flow timeout may throw at us */
284
287
289
290 for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
291 if (slot->SlotThreadExitPrintStats != NULL) {
292 slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
293 }
294
295 if (slot->SlotThreadDeinit != NULL) {
296 TmEcode r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
297 if (r != TM_ECODE_OK) {
299 rc = false;
300 break;
301 }
302 }
303 }
304
305 tv->stream_pq = NULL;
307 return rc;
308}
309
310static void *TmThreadsSlotPktAcqLoop(void *td)
311{
312 ThreadVars *tv = (ThreadVars *)td;
313 TmSlot *s = tv->tm_slots;
315
316 /* check if we are setup properly */
317 if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
318 SCLogError("TmSlot or ThreadVars badly setup: s=%p,"
319 " PktAcqLoop=%p, tmqh_in=%p,"
320 " tmqh_out=%p",
321 s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
323 pthread_exit(NULL);
324 return NULL;
325 }
326
327 if (!TmThreadsSlotPktAcqLoopInit(td)) {
328 goto error;
329 }
330
331 bool run = TmThreadsWaitForUnpause(tv);
332
333 while (run) {
334 r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
335
336 if (r == TM_ECODE_FAILED) {
338 run = false;
339 }
341 run = false;
342 }
343 if (r == TM_ECODE_DONE) {
344 run = false;
345 }
346 }
348 goto error;
349 }
350
351 SCLogDebug("%s ending", tv->name);
352 pthread_exit((void *) 0);
353 return NULL;
354
355error:
356 pthread_exit(NULL);
357 return NULL;
358}
359
360/**
361 * Also returns if the kill flag is set.
362 */
364{
367
369 SleepUsec(100);
370
372 return false;
373 }
374
376 }
377
378 return true;
379}
380
381static void *TmThreadsLib(void *td)
382{
383 ThreadVars *tv = (ThreadVars *)td;
384 TmSlot *s = tv->tm_slots;
385
386 /* check if we are setup properly */
387 if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
388 SCLogError("TmSlot or ThreadVars badly setup: s=%p, tmqh_in=%p,"
389 " tmqh_out=%p",
390 s, tv->tmqh_in, tv->tmqh_out);
392 return NULL;
393 }
394
395 if (!TmThreadsSlotPktAcqLoopInit(tv)) {
396 goto error;
397 }
398
400 goto error;
401 }
402
403 return NULL;
404
405error:
406 tv->stream_pq = NULL;
407 return (void *)-1;
408}
409
410static void *TmThreadsSlotVar(void *td)
411{
412 ThreadVars *tv = (ThreadVars *)td;
413 TmSlot *s = (TmSlot *)tv->tm_slots;
414 Packet *p = NULL;
416
418 PacketPoolInit();//Empty();
419
421
422 if (tv->thread_setup_flags != 0)
424
425 /* Drop the capabilities for this thread */
426 SCDropCaps(tv);
427
428 /* check if we are setup properly */
429 if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
431 pthread_exit(NULL);
432 return NULL;
433 }
434
435 for (; s != NULL; s = s->slot_next) {
436 if (s->SlotThreadInit != NULL) {
437 void *slot_data = NULL;
438 r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
439 if (r != TM_ECODE_OK) {
441 goto error;
442 }
443 (void)SC_ATOMIC_SET(s->slot_data, slot_data);
444 }
445
446 /* special case: we need to access the stream queue
447 * from the flow timeout code */
448
449 /* if the flowworker module is the first, get the threads input queue */
450 if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
451 tv->stream_pq = tv->inq->pq;
452 tv->tm_flowworker = s;
453 SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
455 if (tv->flow_queue == NULL) {
457 pthread_exit(NULL);
458 return NULL;
459 }
460 /* setup a queue */
461 } else if (s->tm_id == TMM_FLOWWORKER) {
462 tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
463 if (tv->stream_pq_local == NULL)
464 FatalError("failed to alloc PacketQueue");
467 tv->tm_flowworker = s;
468 SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
470 if (tv->flow_queue == NULL) {
472 pthread_exit(NULL);
473 return NULL;
474 }
475 }
476 }
477
479
480 // Each 'worker' thread uses this func to process/decode the packet read.
481 // Each decode method is different to receive methods in that they do not
482 // enter infinite loops. They use this as the core loop. As a result, at this
483 // point the worker threads can be considered both initialized and running.
485 bool run = TmThreadsWaitForUnpause(tv);
486
487 s = (TmSlot *)tv->tm_slots;
488
489 while (run) {
490 /* input a packet */
491 p = tv->tmqh_in(tv);
492
493 /* if we didn't get a packet see if we need to do some housekeeping */
494 if (unlikely(p == NULL)) {
495 if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty)) {
497 if (p != NULL) {
500 }
501 }
502 }
503
504 if (p != NULL) {
505 /* run the thread module(s) */
506 r = TmThreadsSlotVarRun(tv, p, s);
507 if (r == TM_ECODE_FAILED) {
510 break;
511 }
512
513 /* output the packet */
514 tv->tmqh_out(tv, p);
515
516 /* now handle the stream pq packets */
517 TmThreadsHandleInjectedPackets(tv);
518 }
519
521 run = false;
522 }
523 }
525 goto error;
526 }
528
529 pthread_exit(NULL);
530 return NULL;
531
532error:
533 tv->stream_pq = NULL;
534 pthread_exit(NULL);
535 return NULL;
536}
537
538static void *TmThreadsManagement(void *td)
539{
540 ThreadVars *tv = (ThreadVars *)td;
541 TmSlot *s = (TmSlot *)tv->tm_slots;
543
544 BUG_ON(s == NULL);
545
547
548 if (tv->thread_setup_flags != 0)
550
551 /* Drop the capabilities for this thread */
552 SCDropCaps(tv);
553
554 SCLogDebug("%s starting", tv->name);
555
556 if (s->SlotThreadInit != NULL) {
557 void *slot_data = NULL;
558 r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
559 if (r != TM_ECODE_OK) {
561 pthread_exit(NULL);
562 return NULL;
563 }
564 (void)SC_ATOMIC_SET(s->slot_data, slot_data);
565 }
566
568
570
571 r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));
572 /* handle error */
573 if (r == TM_ECODE_FAILED) {
575 }
576
579 }
580
583
584 if (s->SlotThreadExitPrintStats != NULL) {
585 s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
586 }
587
588 if (s->SlotThreadDeinit != NULL) {
589 r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
590 if (r != TM_ECODE_OK) {
592 pthread_exit(NULL);
593 return NULL;
594 }
595 }
596
598 pthread_exit((void *) 0);
599 return NULL;
600}
601
602/**
603 * \brief We set the slot functions.
604 *
605 * \param tv Pointer to the TV to set the slot function for.
606 * \param name Name of the slot variant.
607 * \param fn_p Pointer to a custom slot function. Used only if slot variant
608 * "name" is "custom".
609 *
610 * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure.
611 */
612static TmEcode TmThreadSetSlots(ThreadVars *tv, const char *name, void *(*fn_p)(void *))
613{
614 if (name == NULL) {
615 if (fn_p == NULL) {
616 printf("Both slot name and function pointer can't be NULL inside "
617 "TmThreadSetSlots\n");
618 goto error;
619 } else {
620 name = "custom";
621 }
622 }
623
624 if (strcmp(name, "varslot") == 0) {
625 tv->tm_func = TmThreadsSlotVar;
626 } else if (strcmp(name, "pktacqloop") == 0) {
627 tv->tm_func = TmThreadsSlotPktAcqLoop;
628 } else if (strcmp(name, "management") == 0) {
629 tv->tm_func = TmThreadsManagement;
630 } else if (strcmp(name, "command") == 0) {
631 tv->tm_func = TmThreadsManagement;
632 } else if (strcmp(name, "lib") == 0) {
633 tv->tm_func = TmThreadsLib;
634 } else if (strcmp(name, "custom") == 0) {
635 if (fn_p == NULL)
636 goto error;
637 tv->tm_func = fn_p;
638 } else {
639 printf("Error: Slot \"%s\" not supported\n", name);
640 goto error;
641 }
642
643 return TM_ECODE_OK;
644
645error:
646 return TM_ECODE_FAILED;
647}
648
649/**
650 * \brief Appends a new entry to the slots.
651 *
652 * \param tv TV the slot is attached to.
653 * \param tm TM to append.
654 * \param data Data to be passed on to the slot init function.
655 *
656 * \retval The allocated TmSlot or NULL if there is an error
657 */
658void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
659{
660 TmSlot *slot = SCCalloc(1, sizeof(TmSlot));
661 if (unlikely(slot == NULL))
662 return;
663 SC_ATOMIC_INITPTR(slot->slot_data);
664 slot->SlotThreadInit = tm->ThreadInit;
665 slot->slot_initdata = data;
666 if (tm->Func) {
667 slot->SlotFunc = tm->Func;
668 } else if (tm->PktAcqLoop) {
669 slot->PktAcqLoop = tm->PktAcqLoop;
670 if (tm->PktAcqBreakLoop) {
671 tv->break_loop = true;
672 }
673 } else if (tm->Management) {
674 slot->Management = tm->Management;
675 }
677 slot->SlotThreadDeinit = tm->ThreadDeinit;
678 /* we don't have to check for the return value "-1". We wouldn't have
679 * received a TM as arg, if it didn't exist */
680 slot->tm_id = TmModuleGetIDForTM(tm);
681 slot->tm_flags |= tm->flags;
682
683 tv->tmm_flags |= tm->flags;
684 tv->cap_flags |= tm->cap_flags;
685
686 if (tv->tm_slots == NULL) {
687 tv->tm_slots = slot;
688 } else {
689 TmSlot *a = (TmSlot *)tv->tm_slots, *b = NULL;
690
691 /* get the last slot */
692 for ( ; a != NULL; a = a->slot_next) {
693 b = a;
694 }
695 /* append the new slot */
696 if (b != NULL) {
697 b->slot_next = slot;
698 }
699 }
700}
701
702#if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
703static int SetCPUAffinitySet(cpu_set_t *cs)
704{
705#if defined OS_FREEBSD
706 int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID,
707 SCGetThreadIdLong(), sizeof(cpu_set_t),cs);
708#elif OS_DARWIN
709 int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY,
710 (void*)cs, THREAD_AFFINITY_POLICY_COUNT);
711#else
712 pid_t tid = (pid_t)syscall(SYS_gettid);
713 int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs);
714#endif /* OS_FREEBSD */
715
716 if (r != 0) {
717 printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
718 strerror(errno));
719 return -1;
720 }
721
722 return 0;
723}
724#endif
725
726
727/**
728 * \brief Set the thread affinity on the calling thread.
729 *
730 * \param cpuid Id of the core/cpu to setup the affinity.
731 *
732 * \retval 0 If all goes well; -1 if something is wrong.
733 */
734static int SetCPUAffinity(uint16_t cpuid)
735{
736#if defined __OpenBSD__ || defined sun
737 return 0;
738#else
739 int cpu = (int)cpuid;
740
741#if defined OS_WIN32 || defined __CYGWIN__
742 DWORD cs = 1 << cpu;
743
744 int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs));
745 if (r != 0) {
746 printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
747 strerror(errno));
748 return -1;
749 }
750 SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32,
751 SCGetThreadIdLong(), cpu);
752
753 return 0;
754
755#else
756 cpu_set_t cs;
757 memset(&cs, 0, sizeof(cs));
758
759 CPU_ZERO(&cs);
760 CPU_SET(cpu, &cs);
761 return SetCPUAffinitySet(&cs);
762#endif /* windows */
763#endif /* not supported */
764}
765
766
767/**
768 * \brief Set the thread options (thread priority).
769 *
770 * \param tv Pointer to the ThreadVars to setup the thread priority.
771 *
772 * \retval TM_ECODE_OK.
773 */
781
782/**
783 * \brief Adjusting nice value for threads.
784 */
786{
787 SCEnter();
788#ifndef __CYGWIN__
789#ifdef OS_WIN32
790 if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) {
791 SCLogError("Error setting priority for "
792 "thread %s: %s",
793 tv->name, strerror(errno));
794 } else {
795 SCLogDebug("Priority set to %"PRId32" for thread %s",
797 }
798#else
799 int ret = nice(tv->thread_priority);
800 if (ret == -1) {
801 SCLogError("Error setting nice value %d "
802 "for thread %s: %s",
803 tv->thread_priority, tv->name, strerror(errno));
804 } else {
805 SCLogDebug("Nice value set to %"PRId32" for thread %s",
807 }
808#endif /* OS_WIN32 */
809#endif
810 SCReturn;
811}
812
813
814/**
815 * \brief Set the thread options (cpu affinity).
816 *
817 * \param tv pointer to the ThreadVars to setup the affinity.
818 * \param cpu cpu on which affinity is set.
819 *
820 * \retval TM_ECODE_OK
821 */
823{
825 tv->cpu_affinity = cpu;
826
827 return TM_ECODE_OK;
828}
829
830
832{
834 return TM_ECODE_OK;
835
836 if (type > MAX_CPU_SET) {
837 SCLogError("invalid cpu type family");
838 return TM_ECODE_FAILED;
839 }
840
843
844 return TM_ECODE_OK;
845}
846
848{
849 if (type >= MAX_CPU_SET) {
850 SCLogError("invalid cpu type family");
851 return 0;
852 }
853
855}
856
857/**
858 * \brief Set the thread options (cpu affinitythread).
859 * Priority should be already set by pthread_create.
860 *
861 * \param tv pointer to the ThreadVars of the calling thread.
862 */
864{
866 SCLogPerf("Setting affinity for thread \"%s\"to cpu/core "
867 "%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity,
869 SetCPUAffinity(tv->cpu_affinity);
870 }
871
872#if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
877 bool use_iface_affinity = RunmodeIsAutofp() && tv->cpu_affinity == RECEIVE_CPU_SET &&
879 use_iface_affinity |= RunmodeIsWorkers() && tv->cpu_affinity == WORKER_CPU_SET &&
881
882 if (use_iface_affinity) {
884 }
885
886 if (UtilAffinityGetAffinedCPUNum(taf) == 0) {
887 if (!taf->nocpu_warned) {
888 SCLogWarning("No CPU affinity set for %s", AffinityGetYamlPath(taf));
889 taf->nocpu_warned = true;
890 }
891 }
892
893 if (taf->mode_flag == EXCLUSIVE_AFFINITY) {
894 uint16_t cpu = AffinityGetNextCPU(tv, taf);
895 SetCPUAffinity(cpu);
896 /* If CPU is in a set overwrite the default thread prio */
897 if (CPU_ISSET(cpu, &taf->lowprio_cpu)) {
899 } else if (CPU_ISSET(cpu, &taf->medprio_cpu)) {
901 } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) {
903 } else {
904 tv->thread_priority = taf->prio;
905 }
906 SCLogPerf("Setting prio %d for thread \"%s\" to cpu/core "
907 "%d, thread id %lu", tv->thread_priority,
908 tv->name, cpu, SCGetThreadIdLong());
909 } else {
910 SetCPUAffinitySet(&taf->cpu_set);
911 tv->thread_priority = taf->prio;
912 SCLogPerf("Setting prio %d for thread \"%s\", "
913 "thread id %lu", tv->thread_priority,
915 }
917 }
918#endif
919
920 return TM_ECODE_OK;
921}
922
923/**
924 * \brief Creates and returns the TV instance for a new thread.
925 *
926 * \param name Name of this TV instance
927 * \param inq_name Incoming queue name
928 * \param inqh_name Incoming queue handler name as set by TmqhSetup()
929 * \param outq_name Outgoing queue name
930 * \param outqh_name Outgoing queue handler as set by TmqhSetup()
931 * \param slots String representation for the slot function to be used
932 * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
933 * \param mucond Flag to indicate whether to initialize the condition
934 * and the mutex variables for this newly created TV.
935 *
936 * \retval the newly created TV instance, or NULL on error
937 */
938ThreadVars *TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name,
939 const char *outq_name, const char *outqh_name, const char *slots,
940 void * (*fn_p)(void *), int mucond)
941{
942 ThreadVars *tv = NULL;
943 Tmq *tmq = NULL;
944 Tmqh *tmqh = NULL;
945
946 SCLogDebug("creating thread \"%s\"...", name);
947
948 /* XXX create separate function for this: allocate a thread container */
949 tv = SCCalloc(1, sizeof(ThreadVars) + ThreadStorageSize());
950 if (unlikely(tv == NULL))
951 goto error;
952
953 SC_ATOMIC_INIT(tv->flags);
955
956 strlcpy(tv->name, name, sizeof(tv->name));
957
958 /* default state for every newly created thread */
960
961 /* set the incoming queue */
962 if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) {
963 SCLogDebug("inq_name \"%s\"", inq_name);
964
965 tmq = TmqGetQueueByName(inq_name);
966 if (tmq == NULL) {
967 tmq = TmqCreateQueue(inq_name);
968 if (tmq == NULL)
969 goto error;
970 }
971 SCLogDebug("tmq %p", tmq);
972
973 tv->inq = tmq;
974 tv->inq->reader_cnt++;
975 SCLogDebug("tv->inq %p", tv->inq);
976 }
977 if (inqh_name != NULL) {
978 SCLogDebug("inqh_name \"%s\"", inqh_name);
979
980 int id = TmqhNameToID(inqh_name);
981 if (id <= 0) {
982 goto error;
983 }
984 tmqh = TmqhGetQueueHandlerByName(inqh_name);
985 if (tmqh == NULL)
986 goto error;
987
988 tv->tmqh_in = tmqh->InHandler;
989 tv->inq_id = (uint8_t)id;
990 SCLogDebug("tv->tmqh_in %p", tv->tmqh_in);
991 }
992
993 /* set the outgoing queue */
994 if (outqh_name != NULL) {
995 SCLogDebug("outqh_name \"%s\"", outqh_name);
996
997 int id = TmqhNameToID(outqh_name);
998 if (id <= 0) {
999 goto error;
1000 }
1001
1002 tmqh = TmqhGetQueueHandlerByName(outqh_name);
1003 if (tmqh == NULL)
1004 goto error;
1005
1006 tv->tmqh_out = tmqh->OutHandler;
1007 tv->outq_id = (uint8_t)id;
1008
1009 if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {
1010 SCLogDebug("outq_name \"%s\"", outq_name);
1011
1012 if (tmqh->OutHandlerCtxSetup != NULL) {
1013 tv->outctx = tmqh->OutHandlerCtxSetup(outq_name);
1014 if (tv->outctx == NULL)
1015 goto error;
1016 tv->outq = NULL;
1017 } else {
1018 tmq = TmqGetQueueByName(outq_name);
1019 if (tmq == NULL) {
1020 tmq = TmqCreateQueue(outq_name);
1021 if (tmq == NULL)
1022 goto error;
1023 }
1024 SCLogDebug("tmq %p", tmq);
1025
1026 tv->outq = tmq;
1027 tv->outctx = NULL;
1028 tv->outq->writer_cnt++;
1029 }
1030 }
1031 }
1032
1033 if (TmThreadSetSlots(tv, slots, fn_p) != TM_ECODE_OK) {
1034 goto error;
1035 }
1036
1037 if (mucond != 0)
1039
1041
1042 return tv;
1043
1044error:
1045 SCLogError("failed to setup a thread");
1046
1047 if (tv != NULL)
1048 SCFree(tv);
1049 return NULL;
1050}
1051
1052/**
1053 * \brief Creates and returns a TV instance for a Packet Processing Thread.
1054 * This function doesn't support custom slots, and hence shouldn't be
1055 * supplied \"custom\" as its slot type. All PPT threads are created
1056 * with a mucond(see TmThreadCreate declaration) of 0. Hence the tv
1057 * conditional variables are not used to kill the thread.
1058 *
1059 * \param name Name of this TV instance
1060 * \param inq_name Incoming queue name
1061 * \param inqh_name Incoming queue handler name as set by TmqhSetup()
1062 * \param outq_name Outgoing queue name
1063 * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1064 * \param slots String representation for the slot function to be used
1065 *
1066 * \retval the newly created TV instance, or NULL on error
1067 */
1068ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name,
1069 const char *inqh_name, const char *outq_name,
1070 const char *outqh_name, const char *slots)
1071{
1072 ThreadVars *tv = NULL;
1073
1074 tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name,
1075 slots, NULL, 0);
1076
1077 if (tv != NULL) {
1078 tv->type = TVT_PPT;
1080 }
1081
1082 return tv;
1083}
1084
1085/**
1086 * \brief Creates and returns the TV instance for a Management thread(MGMT).
1087 * This function supports only custom slot functions and hence a
1088 * function pointer should be sent as an argument.
1089 *
1090 * \param name Name of this TV instance
1091 * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
1092 * \param mucond Flag to indicate whether to initialize the condition
1093 * and the mutex variables for this newly created TV.
1094 *
1095 * \retval the newly created TV instance, or NULL on error
1096 */
1097ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *),
1098 int mucond)
1099{
1100 ThreadVars *tv = NULL;
1101
1102 tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "custom", fn_p, mucond);
1103
1104 if (tv != NULL) {
1105 tv->type = TVT_MGMT;
1108 }
1109
1110 return tv;
1111}
1112
1113/**
1114 * \brief Creates and returns the TV instance for a Management thread(MGMT).
1115 * This function supports only custom slot functions and hence a
1116 * function pointer should be sent as an argument.
1117 *
1118 * \param name Name of this TV instance
1119 * \param module Name of TmModule with MANAGEMENT flag set.
1120 * \param mucond Flag to indicate whether to initialize the condition
1121 * and the mutex variables for this newly created TV.
1122 *
1123 * \retval the newly created TV instance, or NULL on error
1124 */
1125ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
1126 int mucond)
1127{
1128 ThreadVars *tv = NULL;
1129
1130 tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "management", NULL, mucond);
1131
1132 if (tv != NULL) {
1133 tv->type = TVT_MGMT;
1136
1137 TmModule *m = TmModuleGetByName(module);
1138 if (m) {
1139 TmSlotSetFuncAppend(tv, m, NULL);
1140 }
1141 }
1142
1143 return tv;
1144}
1145
1146/**
1147 * \brief Creates and returns the TV instance for a Command thread (CMD).
1148 * This function supports only custom slot functions and hence a
1149 * function pointer should be sent as an argument.
1150 *
1151 * \param name Name of this TV instance
1152 * \param module Name of TmModule with COMMAND flag set.
1153 * \param mucond Flag to indicate whether to initialize the condition
1154 * and the mutex variables for this newly created TV.
1155 *
1156 * \retval the newly created TV instance, or NULL on error
1157 */
1158ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
1159 int mucond)
1160{
1161 ThreadVars *tv = NULL;
1162
1163 tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "command", NULL, mucond);
1164
1165 if (tv != NULL) {
1166 tv->type = TVT_CMD;
1169
1170 TmModule *m = TmModuleGetByName(module);
1171 if (m) {
1172 TmSlotSetFuncAppend(tv, m, NULL);
1173 }
1174 }
1175
1176 return tv;
1177}
1178
1179/**
1180 * \brief Appends this TV to tv_root based on its type
1181 *
1182 * \param type holds the type this TV belongs to.
1183 */
1185{
1187
1188 if (tv_root[type] == NULL) {
1189 tv_root[type] = tv;
1190 tv->next = NULL;
1191
1193
1194 return;
1195 }
1196
1197 ThreadVars *t = tv_root[type];
1198
1199 while (t) {
1200 if (t->next == NULL) {
1201 t->next = tv;
1202 tv->next = NULL;
1203 break;
1204 }
1205
1206 t = t->next;
1207 }
1208
1210}
1211
1212static bool ThreadStillHasPackets(ThreadVars *tv)
1213{
1214 if (tv->inq != NULL && !tv->inq->is_packet_pool) {
1215 /* we wait till we dry out all the inq packets, before we
1216 * kill this thread. Do note that you should have disabled
1217 * packet acquire by now using TmThreadDisableReceiveThreads()*/
1218 PacketQueue *q = tv->inq->pq;
1219 SCMutexLock(&q->mutex_q);
1220 uint32_t len = q->len;
1222 if (len != 0) {
1223 return true;
1224 }
1225 }
1226
1227 if (tv->stream_pq != NULL) {
1229 uint32_t len = tv->stream_pq->len;
1231
1232 if (len != 0) {
1233 return true;
1234 }
1235 }
1236 return false;
1237}
1238
1239/**
1240 * \brief Kill a thread.
1241 *
1242 * \param tv A ThreadVars instance corresponding to the thread that has to be
1243 * killed.
1244 *
1245 * \retval r 1 killed successfully
1246 * 0 not yet ready, needs another look
1247 */
1248static int TmThreadKillThread(ThreadVars *tv)
1249{
1250 BUG_ON(tv == NULL);
1251
1252 /* kill only once :) */
1254 return 1;
1255 }
1256
1257 /* set the thread flag informing the thread that it needs to be
1258 * terminated */
1261
1262 /* to be sure, signal more */
1263 if (!(TmThreadsCheckFlag(tv, THV_CLOSED))) {
1264 if (tv->inq_id != TMQH_NOT_SET) {
1266 if (qh != NULL && qh->InShutdownHandler != NULL) {
1267 qh->InShutdownHandler(tv);
1268 }
1269 }
1270 if (tv->inq != NULL) {
1271 for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1275 }
1276 SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1277 }
1278
1279 if (tv->ctrl_cond != NULL ) {
1281 pthread_cond_broadcast(tv->ctrl_cond);
1283 }
1284 return 0;
1285 }
1286
1287 if (tv->outctx != NULL) {
1288 if (tv->outq_id != TMQH_NOT_SET) {
1290 if (qh != NULL && qh->OutHandlerCtxFree != NULL) {
1292 tv->outctx = NULL;
1293 }
1294 }
1295 }
1296
1297 /* Join the thread and flag as dead, unless the thread ID is 0 as
1298 * its not a thread created by Suricata. */
1299 if (tv->t) {
1300 pthread_join(tv->t, NULL);
1301 SCLogDebug("thread %s stopped", tv->name);
1302 }
1304 return 1;
1305}
1306
1307static bool ThreadBusy(ThreadVars *tv)
1308{
1309 for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
1310 TmModule *tm = TmModuleGetById(s->tm_id);
1311 if (tm && tm->ThreadBusy != NULL) {
1312 if (tm->ThreadBusy(tv, SC_ATOMIC_GET(s->slot_data)))
1313 return true;
1314 }
1315 }
1316 return false;
1317}
1318
1319/** \internal
1320 *
1321 * \brief make sure that all packet threads are done processing their
1322 * in-flight packets, including 'injected' flow packets.
1323 */
1324static void TmThreadDrainPacketThreads(void)
1325{
1326 ThreadVars *tv = NULL;
1327 struct timeval start_ts;
1328 struct timeval cur_ts;
1329 gettimeofday(&start_ts, NULL);
1330
1331again:
1332 gettimeofday(&cur_ts, NULL);
1333 if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1334 SCLogWarning("unable to get all packet threads "
1335 "to process their packets in time");
1336 return;
1337 }
1338
1340
1341 /* all receive threads are part of packet processing threads */
1342 tv = tv_root[TVT_PPT];
1343 while (tv) {
1344 if (ThreadStillHasPackets(tv)) {
1345 /* we wait till we dry out all the inq packets, before we
1346 * kill this thread. Do note that you should have disabled
1347 * packet acquire by now using TmThreadDisableReceiveThreads()*/
1349
1350 /* sleep outside lock */
1351 SleepMsec(1);
1352 goto again;
1353 }
1354 if (ThreadBusy(tv)) {
1356
1358 if (p != NULL) {
1361 PacketQueue *q = tv->stream_pq;
1362 SCMutexLock(&q->mutex_q);
1363 PacketEnqueue(q, p);
1364 SCCondSignal(&q->cond_q);
1366 }
1367
1368 /* don't sleep while holding a lock */
1369 SleepMsec(1);
1370 goto again;
1371 }
1372 tv = tv->next;
1373 }
1374
1376}
1377
1378/**
1379 * \brief Disable all threads having the specified TMs.
1380 *
1381 * Breaks out of the packet acquisition loop, and bumps
1382 * into the 'flow loop', where it will process packets
1383 * from the flow engine's shutdown handling.
1384 */
1386{
1387 ThreadVars *tv = NULL;
1388 struct timeval start_ts;
1389 struct timeval cur_ts;
1390 gettimeofday(&start_ts, NULL);
1391
1392again:
1393 gettimeofday(&cur_ts, NULL);
1394 if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1395 FatalError("Engine unable to disable detect "
1396 "thread - \"%s\". Killing engine",
1397 tv->name);
1398 }
1399
1401
1402 /* all receive threads are part of packet processing threads */
1403 tv = tv_root[TVT_PPT];
1404
1405 /* we do have to keep in mind that TVs are arranged in the order
1406 * right from receive to log. The moment we fail to find a
1407 * receive TM amongst the slots in a tv, it indicates we are done
1408 * with all receive threads */
1409 while (tv) {
1410 int disable = 0;
1411 TmModule *tm = NULL;
1412 /* obtain the slots for this TV */
1413 TmSlot *slots = tv->tm_slots;
1414 while (slots != NULL) {
1415 tm = TmModuleGetById(slots->tm_id);
1416
1417 if (tm->flags & TM_FLAG_RECEIVE_TM) {
1418 disable = 1;
1419 break;
1420 }
1421
1422 slots = slots->slot_next;
1423 continue;
1424 }
1425
1426 if (disable) {
1427 if (ThreadStillHasPackets(tv)) {
1428 /* we wait till we dry out all the inq packets, before we
1429 * kill this thread. Do note that you should have disabled
1430 * packet acquire by now using TmThreadDisableReceiveThreads()*/
1432 /* don't sleep while holding a lock */
1433 SleepMsec(1);
1434 goto again;
1435 }
1436
1437 if (ThreadBusy(tv)) {
1439
1441 if (p != NULL) {
1444 PacketQueue *q = tv->stream_pq;
1445 SCMutexLock(&q->mutex_q);
1446 PacketEnqueue(q, p);
1447 SCCondSignal(&q->cond_q);
1449 }
1450
1451 /* don't sleep while holding a lock */
1452 SleepMsec(1);
1453 goto again;
1454 }
1455
1456 /* we found a receive TV. Send it a KILL_PKTACQ signal. */
1457 if (tm && tm->PktAcqBreakLoop != NULL) {
1458 tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(slots->slot_data));
1459 }
1461
1462 if (tv->inq != NULL) {
1463 for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1467 }
1468 SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1469 }
1470
1471 /* wait for it to enter the 'flow loop' stage */
1474
1475 SleepMsec(1);
1476 goto again;
1477 }
1478 }
1479
1480 tv = tv->next;
1481 }
1482
1484
1485 /* finally wait for all packet threads to have
1486 * processed all of their 'live' packets so we
1487 * don't process the last live packets together
1488 * with FFR packets */
1489 TmThreadDrainPacketThreads();
1490}
1491
1492#ifdef DEBUG_VALIDATION
1493static void TmThreadDumpThreads(void);
1494#endif
1495
1496static void TmThreadDebugValidateNoMorePackets(void)
1497{
1498#ifdef DEBUG_VALIDATION
1500 for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1501 if (ThreadStillHasPackets(tv)) {
1503 TmThreadDumpThreads();
1505 }
1506 }
1508#endif
1509}
1510
1511/** \internal
1512 * \brief check if a thread has any of the modules indicated by TM_FLAG_*
1513 * \param tv thread
1514 * \param flags TM_FLAG_*'s
1515 * \retval bool true if at least on of the flags is present */
1516static inline bool CheckModuleFlags(const ThreadVars *tv, const uint8_t flags)
1517{
1518 return (tv->tmm_flags & flags) != 0;
1519}
1520
1521/**
1522 * \brief Disable all packet threads
1523 * \param set flag to set
1524 * \param check flag to check
1525 * \param module_flags bitflags of TmModule's to apply the `set` flag to.
1526 *
1527 * Support 2 stages in shutting down the packet threads:
1528 * 1. set THV_REQ_FLOW_LOOP and wait for THV_FLOW_LOOP
1529 * 2. set THV_KILL and wait for THV_RUNNING_DONE
1530 *
1531 * During step 1 the main loop is exited, and the flow loop logic is entered.
1532 * During step 2, the flow loop logic is done and the thread closes.
1533 *
1534 * `module_flags` limits which threads are disabled
1535 */
1537 const uint16_t set, const uint16_t check, const uint8_t module_flags)
1538{
1539 struct timeval start_ts;
1540 struct timeval cur_ts;
1541
1542 /* first drain all packet threads of their packets */
1543 TmThreadDrainPacketThreads();
1544
1545 /* since all the threads possibly able to produce more packets
1546 * are now gone or inactive, we should see no packets anywhere
1547 * anymore. */
1548 TmThreadDebugValidateNoMorePackets();
1549
1550 gettimeofday(&start_ts, NULL);
1551again:
1552 gettimeofday(&cur_ts, NULL);
1553 if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1554 FatalError("Engine unable to disable packet "
1555 "threads. Killing engine");
1556 }
1557
1558 /* loop through the packet threads and kill them */
1560 for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1561 /* only set flow worker threads to THV_REQ_FLOW_LOOP */
1562 if (!CheckModuleFlags(tv, module_flags)) {
1563 SCLogDebug("%s does not have any of the modules %02x, skip", tv->name, module_flags);
1564 continue;
1565 }
1566 TmThreadsSetFlag(tv, set);
1567
1568 /* separate worker threads (autofp) will still wait at their
1569 * input queues. So nudge them here so they will observe the
1570 * THV_KILL flag. */
1571 if (tv->inq != NULL) {
1572 for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1576 }
1577 SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1578 }
1579
1580 /* wait for it to reach the expected state */
1581 if (!TmThreadsCheckFlag(tv, check)) {
1583 SCLogDebug("%s did not reach state %u, again", tv->name, check);
1584
1585 SleepMsec(1);
1586 goto again;
1587 }
1588 }
1590}
1591
1592#define MIN_WAIT_TIME 100
1593#define MAX_WAIT_TIME 999999
1595{
1596 ThreadVars *tv = NULL;
1597 unsigned int sleep_usec = MIN_WAIT_TIME;
1598
1599 BUG_ON((family < 0) || (family >= TVT_MAX));
1600
1601again:
1603 tv = tv_root[family];
1604
1605 while (tv) {
1606 int r = TmThreadKillThread(tv);
1607 if (r == 0) {
1609 SleepUsec(sleep_usec);
1610 sleep_usec *= 2; /* slowly back off */
1611 sleep_usec = MIN(sleep_usec, MAX_WAIT_TIME);
1612 goto again;
1613 }
1614 sleep_usec = MIN_WAIT_TIME; /* reset */
1615
1616 tv = tv->next;
1617 }
1619}
1620#undef MIN_WAIT_TIME
1621#undef MAX_WAIT_TIME
1622
1624{
1625 int i = 0;
1626
1627 for (i = 0; i < TVT_MAX; i++) {
1629 }
1630}
1631
1632static void TmThreadFree(ThreadVars *tv)
1633{
1634 TmSlot *s;
1635 TmSlot *ps;
1636 if (tv == NULL)
1637 return;
1638
1639 SCLogDebug("Freeing thread '%s'.", tv->name);
1640
1642
1643 if (tv->flow_queue) {
1644 BUG_ON(tv->flow_queue->qlen != 0);
1646 }
1647
1649
1650 TmThreadDeinitMC(tv);
1651
1652 if (tv->thread_group_name) {
1654 }
1655
1656 if (tv->printable_name) {
1658 }
1659
1660 if (tv->iface_name) {
1662 }
1663
1664 if (tv->stream_pq_local) {
1668 }
1669
1670 s = (TmSlot *)tv->tm_slots;
1671 while (s) {
1672 ps = s;
1673 s = s->slot_next;
1674 SCFree(ps);
1675 }
1676
1678 SCFree(tv);
1679}
1680
1682{
1683 char *thread_group_name = NULL;
1684
1685 if (name == NULL)
1686 return;
1687
1688 if (tv == NULL)
1689 return;
1690
1691 thread_group_name = SCStrdup(name);
1692 if (unlikely(thread_group_name == NULL)) {
1693 SCLogError("error allocating memory");
1694 return;
1695 }
1696 tv->thread_group_name = thread_group_name;
1697}
1698
1700{
1701 ThreadVars *tv = NULL;
1702 ThreadVars *ptv = NULL;
1703
1704 if ((family < 0) || (family >= TVT_MAX))
1705 return;
1706
1708 tv = tv_root[family];
1709
1710 while (tv) {
1711 ptv = tv;
1712 tv = tv->next;
1713 TmThreadFree(ptv);
1714 }
1715 tv_root[family] = NULL;
1717}
1718
1719/**
1720 * \brief Spawns a thread associated with the ThreadVars instance tv
1721 *
1722 * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1723 */
1725{
1726 pthread_attr_t attr;
1727 if (tv->tm_func == NULL) {
1728 FatalError("No thread function set");
1729 }
1730
1731 /* Initialize and set thread detached attribute */
1732 pthread_attr_init(&attr);
1733
1734 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
1735
1736 /* Adjust thread stack size if configured */
1738 SCLogDebug("Setting per-thread stack size to %" PRIu64, threading_set_stack_size);
1739 if (pthread_attr_setstacksize(&attr, (size_t)threading_set_stack_size)) {
1740 FatalError("Unable to increase stack size to %" PRIu64 " in thread attributes",
1742 }
1743 }
1744
1745 int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv);
1746 if (rc) {
1747 FatalError("Unable to create thread %s with pthread_create(): retval %d: %s", tv->name, rc,
1748 strerror(errno));
1749 }
1750
1751#if DEBUG && HAVE_PTHREAD_GETATTR_NP
1753 if (pthread_getattr_np(tv->t, &attr) == 0) {
1754 size_t stack_size;
1755 void *stack_addr;
1756 pthread_attr_getstack(&attr, &stack_addr, &stack_size);
1757 SCLogDebug("stack: %p; size %" PRIu64, stack_addr, (uintmax_t)stack_size);
1758 } else {
1759 SCLogDebug("Unable to retrieve current stack-size for display; return code from "
1760 "pthread_getattr_np() is %" PRId32,
1761 rc);
1762 }
1763 }
1764#endif
1765
1767
1769 return TM_ECODE_OK;
1770}
1771
1772/**
1773 * \brief Spawns a "fake" lib thread associated with the ThreadVars instance tv
1774 *
1775 * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1776 */
1778{
1779 if (tv->tm_func == NULL) {
1780 printf("ERROR: no thread function set\n");
1781 return TM_ECODE_FAILED;
1782 }
1783
1784 if (tv->tm_func((void *)tv) == (void *)-1) {
1785 return TM_ECODE_FAILED;
1786 }
1787
1789
1790 return TM_ECODE_OK;
1791}
1792
1793/**
1794 * \brief Initializes the mutex and condition variables for this TV
1795 *
1796 * It can be used by a thread to control a wait loop that can also be
1797 * influenced by other threads.
1798 *
1799 * \param tv Pointer to a TV instance
1800 */
1802{
1803 if ( (tv->ctrl_mutex = SCMalloc(sizeof(*tv->ctrl_mutex))) == NULL) {
1804 FatalError("Fatal error encountered in TmThreadInitMC. "
1805 "Exiting...");
1806 }
1807
1808 if (SCCtrlMutexInit(tv->ctrl_mutex, NULL) != 0) {
1809 printf("Error initializing the tv->m mutex\n");
1810 exit(EXIT_FAILURE);
1811 }
1812
1813 if ( (tv->ctrl_cond = SCMalloc(sizeof(*tv->ctrl_cond))) == NULL) {
1814 FatalError("Fatal error encountered in TmThreadInitMC. "
1815 "Exiting...");
1816 }
1817
1818 if (SCCtrlCondInit(tv->ctrl_cond, NULL) != 0) {
1819 FatalError("Error initializing the tv->cond condition "
1820 "variable");
1821 }
1822}
1823
1824static void TmThreadDeinitMC(ThreadVars *tv)
1825{
1826 if (tv->ctrl_mutex) {
1829 }
1830 if (tv->ctrl_cond) {
1833 }
1834}
1835
1836/**
1837 * \brief Waits till the specified flag(s) is(are) set. We don't bother if
1838 * the kill flag has been set or not on the thread.
1839 *
1840 * \param tv Pointer to the TV instance.
1841 */
1843{
1844 while (!TmThreadsCheckFlag(tv, flags)) {
1845 SleepUsec(100);
1846 }
1847}
1848
1849/**
1850 * \brief Unpauses a thread
1851 *
1852 * \param tv Pointer to a TV instance that has to be unpaused
1853 */
1858
1859static TmEcode WaitOnThreadsRunningByType(const int t)
1860{
1861 struct timeval start_ts;
1862 struct timeval cur_ts;
1863 uint32_t thread_cnt = 0;
1864
1865 /* on retries, this will init to the last thread that started up already */
1866 ThreadVars *tv_start = tv_root[t];
1868 for (ThreadVars *tv = tv_start; tv != NULL; tv = tv->next) {
1869 thread_cnt++;
1870 }
1872
1873 /* give threads a second each to start up, plus a margin of a minute. */
1874 uint32_t time_budget = 60 + thread_cnt;
1875
1876 gettimeofday(&start_ts, NULL);
1877again:
1879 ThreadVars *tv = tv_start;
1880 while (tv != NULL) {
1883
1884 SCLogError("thread \"%s\" failed to "
1885 "start: flags %04x",
1886 tv->name, SC_ATOMIC_GET(tv->flags));
1887 return TM_ECODE_FAILED;
1888 }
1889
1892
1893 /* 60 seconds provided for the thread to transition from
1894 * THV_INIT_DONE to THV_RUNNING */
1895 gettimeofday(&cur_ts, NULL);
1896 if (((uint32_t)cur_ts.tv_sec - (uint32_t)start_ts.tv_sec) > time_budget) {
1897 SCLogError("thread \"%s\" failed to "
1898 "start in time: flags %04x. Total threads: %u. Time budget %us",
1899 tv->name, SC_ATOMIC_GET(tv->flags), thread_cnt, time_budget);
1900 return TM_ECODE_FAILED;
1901 }
1902
1903 /* sleep a little to give the thread some
1904 * time to start running */
1905 SleepUsec(100);
1906 goto again;
1907 }
1908 tv_start = tv;
1909
1910 tv = tv->next;
1911 }
1913 return TM_ECODE_OK;
1914}
1915
1916/**
1917 * \brief Waits for all threads to be in a running state
1918 *
1919 * \retval TM_ECODE_OK if all are running or error if a thread failed
1920 */
1922{
1923 uint16_t RX_num = 0;
1924 uint16_t W_num = 0;
1925 uint16_t FM_num = 0;
1926 uint16_t FR_num = 0;
1927 uint16_t TX_num = 0;
1928
1929 for (int i = 0; i < TVT_MAX; i++) {
1930 if (WaitOnThreadsRunningByType(i) != TM_ECODE_OK)
1931 return TM_ECODE_FAILED;
1932 }
1933
1935 for (int i = 0; i < TVT_MAX; i++) {
1936 for (ThreadVars *tv = tv_root[i]; tv != NULL; tv = tv->next) {
1937 if (strncmp(thread_name_autofp, tv->name, strlen(thread_name_autofp)) == 0)
1938 RX_num++;
1939 else if (strncmp(thread_name_workers, tv->name, strlen(thread_name_workers)) == 0)
1940 W_num++;
1941 else if (strncmp(thread_name_verdict, tv->name, strlen(thread_name_verdict)) == 0)
1942 TX_num++;
1943 else if (strncmp(thread_name_flow_mgr, tv->name, strlen(thread_name_flow_mgr)) == 0)
1944 FM_num++;
1945 else if (strncmp(thread_name_flow_rec, tv->name, strlen(thread_name_flow_rec)) == 0)
1946 FR_num++;
1947 }
1948 }
1950
1951 /* Construct a welcome string displaying
1952 * initialized thread types and counts */
1953 uint16_t app_len = 32;
1954 uint16_t buf_len = 256;
1955
1956 char append_str[app_len];
1957 char thread_counts[buf_len];
1958
1959 strlcpy(thread_counts, "Threads created -> ", strlen("Threads created -> ") + 1);
1960 if (RX_num > 0) {
1961 snprintf(append_str, app_len, "RX: %u ", RX_num);
1962 strlcat(thread_counts, append_str, buf_len);
1963 }
1964 if (W_num > 0) {
1965 snprintf(append_str, app_len, "W: %u ", W_num);
1966 strlcat(thread_counts, append_str, buf_len);
1967 }
1968 if (TX_num > 0) {
1969 snprintf(append_str, app_len, "TX: %u ", TX_num);
1970 strlcat(thread_counts, append_str, buf_len);
1971 }
1972 if (FM_num > 0) {
1973 snprintf(append_str, app_len, "FM: %u ", FM_num);
1974 strlcat(thread_counts, append_str, buf_len);
1975 }
1976 if (FR_num > 0) {
1977 snprintf(append_str, app_len, "FR: %u ", FR_num);
1978 strlcat(thread_counts, append_str, buf_len);
1979 }
1980 snprintf(append_str, app_len, " Engine started.");
1981 strlcat(thread_counts, append_str, buf_len);
1982 SCLogNotice("%s", thread_counts);
1983
1984 return TM_ECODE_OK;
1985}
1986
1987/**
1988 * \brief Unpauses all threads present in tv_root
1989 */
1991{
1993 for (int i = 0; i < TVT_MAX; i++) {
1994 ThreadVars *tv = tv_root[i];
1995 while (tv != NULL) {
1997 tv = tv->next;
1998 }
1999 }
2001}
2002
2003/**
2004 * \brief Used to check the thread for certain conditions of failure.
2005 */
2007{
2009 for (int i = 0; i < TVT_MAX; i++) {
2010 ThreadVars *tv = tv_root[i];
2011 while (tv) {
2013 FatalError("thread %s failed", tv->name);
2014 }
2015 tv = tv->next;
2016 }
2017 }
2019}
2020
2021/**
2022 * \brief Used to check if all threads have finished their initialization. On
2023 * finding an un-initialized thread, it waits till that thread completes
2024 * its initialization, before proceeding to the next thread.
2025 *
2026 * \retval TM_ECODE_OK all initialized properly
2027 * \retval TM_ECODE_FAILED failure
2028 */
2030{
2031 struct timeval start_ts;
2032 struct timeval cur_ts;
2033 gettimeofday(&start_ts, NULL);
2034
2035again:
2037 for (int i = 0; i < TVT_MAX; i++) {
2038 ThreadVars *tv = tv_root[i];
2039 while (tv != NULL) {
2042
2043 SCLogError("thread \"%s\" failed to "
2044 "initialize: flags %04x",
2045 tv->name, SC_ATOMIC_GET(tv->flags));
2046 return TM_ECODE_FAILED;
2047 }
2048
2051
2052 gettimeofday(&cur_ts, NULL);
2053 if ((cur_ts.tv_sec - start_ts.tv_sec) > 120) {
2054 SCLogError("thread \"%s\" failed to "
2055 "initialize in time: flags %04x",
2056 tv->name, SC_ATOMIC_GET(tv->flags));
2057 return TM_ECODE_FAILED;
2058 }
2059
2060 /* sleep a little to give the thread some
2061 * time to finish initialization */
2062 SleepUsec(100);
2063 goto again;
2064 }
2065
2068 SCLogError("thread \"%s\" failed to "
2069 "initialize.",
2070 tv->name);
2071 return TM_ECODE_FAILED;
2072 }
2075 SCLogError("thread \"%s\" closed on "
2076 "initialization.",
2077 tv->name);
2078 return TM_ECODE_FAILED;
2079 }
2080
2081 tv = tv->next;
2082 }
2083 }
2085
2086 return TM_ECODE_OK;
2087}
2088
2089/**
2090 * \brief returns a count of all the threads that match the flag
2091 */
2093{
2094 uint32_t cnt = 0;
2096 for (int i = 0; i < TVT_MAX; i++) {
2097 ThreadVars *tv = tv_root[i];
2098 while (tv != NULL) {
2099 if ((tv->tmm_flags & flags) == flags)
2100 cnt++;
2101
2102 tv = tv->next;
2103 }
2104 }
2106 return cnt;
2107}
2108
2109#ifdef DEBUG_VALIDATION
2110static void TmThreadDoDumpSlots(const ThreadVars *tv)
2111{
2112 for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
2114 SCLogNotice("tv %p: -> slot %p tm_id %d name %s",
2115 tv, s, s->tm_id, m->name);
2116 }
2117}
2118
2119static void TmThreadDumpThreads(void)
2120{
2122 for (int i = 0; i < TVT_MAX; i++) {
2123 ThreadVars *tv = tv_root[i];
2124 while (tv != NULL) {
2125 const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2126 SCLogNotice("tv %p: type %u name %s tmm_flags %02X flags %X stream_pq %p",
2128 if (tv->inq && tv->stream_pq == tv->inq->pq) {
2129 SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id);
2130 } else if (tv->stream_pq_local != NULL) {
2131 for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) {
2132 SCLogNotice("tv %p: ==> stream_pq_local: pq.len %u packet src %s",
2133 tv, tv->stream_pq_local->len, PktSrcToString(xp->pkt_src));
2134 }
2135 }
2136 for (Packet *xp = tv->decode_pq.top; xp != NULL; xp = xp->next) {
2137 SCLogNotice("tv %p: ==> decode_pq: decode_pq.len %u packet src %s",
2138 tv, tv->decode_pq.len, PktSrcToString(xp->pkt_src));
2139 }
2140 TmThreadDoDumpSlots(tv);
2141 tv = tv->next;
2142 }
2143 }
2146}
2147#endif
2148
2149/* Aligned to CLS to avoid false sharing between atomic ops. */
2150typedef struct Thread_ {
2151 ThreadVars *tv; /**< threadvars structure */
2152 const char *name;
2153 int type;
2154 int in_use; /**< bool to indicate this is in use */
2155
2156 SC_ATOMIC_DECLARE(SCTime_t, pktts); /**< current packet time of this thread
2157 * (offline mode) */
2158 SCTime_t sys_sec_stamp; /**< timestamp in real system
2159 * time when the pktts was last updated. */
2161} __attribute__((aligned(CLS))) Thread;
2163typedef struct Threads_ {
2164 Thread *threads;
2165 size_t threads_size;
2166 int threads_cnt;
2168
2169static bool thread_store_sealed = false;
2170static Threads thread_store = { NULL, 0, 0 };
2171static SCMutex thread_store_lock = SCMUTEX_INITIALIZER;
2172
2174{
2175 SCMutexLock(&thread_store_lock);
2176 DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
2177 thread_store_sealed = true;
2178 SCMutexUnlock(&thread_store_lock);
2179}
2180
2182{
2183 SCMutexLock(&thread_store_lock);
2184 DEBUG_VALIDATE_BUG_ON(!thread_store_sealed);
2185 thread_store_sealed = false;
2186 SCMutexUnlock(&thread_store_lock);
2187}
2188
2190{
2191 SCMutexLock(&thread_store_lock);
2192 for (size_t s = 0; s < thread_store.threads_size; s++) {
2193 Thread *t = &thread_store.threads[s];
2194 if (t == NULL || t->in_use == 0)
2195 continue;
2196
2197 SCLogNotice("Thread %"PRIuMAX", %s type %d, tv %p in_use %d",
2198 (uintmax_t)s+1, t->name, t->type, t->tv, t->in_use);
2199 if (t->tv) {
2200 ThreadVars *tv = t->tv;
2201 const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2202 SCLogNotice("tv %p type %u name %s tmm_flags %02X flags %X",
2203 tv, tv->type, tv->name, tv->tmm_flags, flags);
2204 }
2205 }
2206 SCMutexUnlock(&thread_store_lock);
2207}
2208
2209#define STEP 32
2210/**
2211 * \retval id thread id, or 0 if not found
2212 */
2214{
2215 SCMutexLock(&thread_store_lock);
2216 DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
2217 if (thread_store.threads == NULL) {
2218 thread_store.threads = SCCalloc(STEP, sizeof(Thread));
2219 BUG_ON(thread_store.threads == NULL);
2220 thread_store.threads_size = STEP;
2221 }
2222
2223 size_t s;
2224 for (s = 0; s < thread_store.threads_size; s++) {
2225 if (thread_store.threads[s].in_use == 0) {
2226 Thread *t = &thread_store.threads[s];
2227 SCSpinInit(&t->spin, 0);
2228 SCSpinLock(&t->spin);
2229 t->name = tv->name;
2230 t->type = type;
2231 t->tv = tv;
2232 t->in_use = 1;
2233 SCSpinUnlock(&t->spin);
2234
2235 SCMutexUnlock(&thread_store_lock);
2236 return (int)(s+1);
2237 }
2238 }
2239
2240 /* if we get here the array is completely filled */
2241 void *newmem = SCRealloc(thread_store.threads, ((thread_store.threads_size + STEP) * sizeof(Thread)));
2242 BUG_ON(newmem == NULL);
2243 thread_store.threads = newmem;
2244 memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP * sizeof(Thread));
2245
2246 Thread *t = &thread_store.threads[thread_store.threads_size];
2247 SCSpinInit(&t->spin, 0);
2248 SCSpinLock(&t->spin);
2249 t->name = tv->name;
2250 t->type = type;
2251 t->tv = tv;
2252 t->in_use = 1;
2253 SCSpinUnlock(&t->spin);
2254
2255 s = thread_store.threads_size;
2256 thread_store.threads_size += STEP;
2257
2258 SCMutexUnlock(&thread_store_lock);
2259 return (int)(s+1);
2260}
2261#undef STEP
2262
2264{
2265 SCMutexLock(&thread_store_lock);
2266 DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
2267 if (id <= 0 || id > (int)thread_store.threads_size) {
2268 SCMutexUnlock(&thread_store_lock);
2269 return;
2270 }
2271
2272 /* id is one higher than index */
2273 int idx = id - 1;
2274
2275 /* reset thread_id, which serves as clearing the record */
2276 thread_store.threads[idx].in_use = 0;
2277
2278 /* check if we have at least one registered thread left */
2279 size_t s;
2280 for (s = 0; s < thread_store.threads_size; s++) {
2281 Thread *t = &thread_store.threads[s];
2282 if (t->in_use == 1) {
2283 goto end;
2284 }
2285 }
2286
2287 /* if we get here no threads are registered */
2288 SCFree(thread_store.threads);
2289 thread_store.threads = NULL;
2290 thread_store.threads_size = 0;
2291 thread_store.threads_cnt = 0;
2292
2293end:
2294 SCMutexUnlock(&thread_store_lock);
2295}
2296
2297void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts)
2298{
2299 SCTime_t now = SCTimeGetTime();
2300 int idx = id - 1;
2301 Thread *t = &thread_store.threads[idx];
2302 SCSpinLock(&t->spin);
2303 SC_ATOMIC_SET(t->pktts, ts);
2304
2305#ifdef DEBUG
2306 if (t->sys_sec_stamp.secs != 0) {
2307 SCTime_t tmpts = SCTIME_ADD_SECS(t->sys_sec_stamp, 3);
2308 if (SCTIME_CMP_LT(tmpts, now)) {
2309 SCLogDebug("%s: thread slept for %u secs", t->name, (uint32_t)(now.secs - tmpts.secs));
2310 }
2311 }
2312#endif
2313
2314 t->sys_sec_stamp = now;
2315 SCSpinUnlock(&t->spin);
2316}
2317
2319{
2320 static SCTime_t nullts = SCTIME_INITIALIZER;
2321 bool ready = true;
2322 for (size_t s = 0; s < thread_store.threads_size; s++) {
2323 Thread *t = &thread_store.threads[s];
2324 if (!t->in_use) {
2325 break;
2326 }
2327 SCSpinLock(&t->spin);
2328 if (t->type != TVT_PPT) {
2329 SCSpinUnlock(&t->spin);
2330 continue;
2331 }
2332 if (SCTIME_CMP_EQ(t->sys_sec_stamp, nullts)) {
2333 ready = false;
2334 SCSpinUnlock(&t->spin);
2335 break;
2336 }
2337 SCSpinUnlock(&t->spin);
2338 }
2339 return ready;
2340}
2341
2343{
2344 SCTime_t now = SCTimeGetTime();
2345 for (size_t s = 0; s < thread_store.threads_size; s++) {
2346 Thread *t = &thread_store.threads[s];
2347 if (!t->in_use) {
2348 break;
2349 }
2350 SCSpinLock(&t->spin);
2351 if (t->type != TVT_PPT) {
2352 SCSpinUnlock(&t->spin);
2353 continue;
2354 }
2355 SC_ATOMIC_SET(t->pktts, ts);
2356 t->sys_sec_stamp = now;
2357 SCSpinUnlock(&t->spin);
2358 }
2359}
2360
2362{
2363 DEBUG_VALIDATE_BUG_ON(idx == 0);
2364 const int i = idx - 1;
2365 Thread *t = &thread_store.threads[i];
2366 return SC_ATOMIC_GET(t->pktts);
2367}
2368
2370{
2371 struct timeval local = { 0 };
2372 static SCTime_t nullts = SCTIME_INITIALIZER;
2373 bool set = false;
2374 SCTime_t now = SCTimeGetTime();
2375
2376 for (size_t s = 0; s < thread_store.threads_size; s++) {
2377 Thread *t = &thread_store.threads[s];
2378 if (t->in_use == 0) {
2379 break;
2380 }
2381 SCSpinLock(&t->spin);
2382 /* only packet threads set timestamps based on packets */
2383 if (t->type != TVT_PPT) {
2384 SCSpinUnlock(&t->spin);
2385 continue;
2386 }
2387 SCTime_t pktts = SC_ATOMIC_GET(t->pktts);
2388 if (SCTIME_CMP_NEQ(pktts, nullts)) {
2389 SCTime_t sys_sec_stamp = SCTIME_ADD_SECS(t->sys_sec_stamp, 5);
2390 /* ignore sleeping threads */
2391 if (SCTIME_CMP_LT(sys_sec_stamp, now)) {
2392 SCSpinUnlock(&t->spin);
2393 continue;
2394 }
2395 if (!set) {
2396 SCTIME_TO_TIMEVAL(&local, pktts);
2397 set = true;
2398 } else {
2399 if (SCTIME_CMP_LT(pktts, SCTIME_FROM_TIMEVAL(&local))) {
2400 SCTIME_TO_TIMEVAL(&local, pktts);
2401 }
2402 }
2403 }
2404 SCSpinUnlock(&t->spin);
2405 }
2406 *ts = local;
2407 SCLogDebug("ts->tv_sec %"PRIuMAX, (uintmax_t)ts->tv_sec);
2408}
2409
2411{
2412 uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
2413 int thread_max = TmThreadGetNbThreads(WORKER_CPU_SET);
2414 /* always create at least one thread */
2415 if (thread_max == 0)
2416 thread_max = ncpus * threading_detect_ratio;
2417 if (thread_max < 1)
2418 thread_max = 1;
2419 if (thread_max > 1024) {
2420 SCLogWarning("limited number of 'worker' threads to 1024. Wanted %d", thread_max);
2421 thread_max = 1024;
2422 }
2423 return (uint16_t)thread_max;
2424}
2425
2426/** \brief inject a flow into a threads flow queue
2427 */
2428void TmThreadsInjectFlowById(Flow *f, const int id)
2429{
2430 if (id > 0 && id <= (int)thread_store.threads_size) {
2431 int idx = id - 1;
2432 Thread *t = &thread_store.threads[idx];
2433 ThreadVars *tv = t->tv;
2434 if (tv != NULL && tv->flow_queue != NULL) {
2436
2437 /* wake up listening thread(s) if necessary */
2438 if (tv->inq != NULL) {
2442 } else if (tv->break_loop) {
2443 TmThreadsCaptureBreakLoop(tv);
2444 }
2445 return;
2446 }
2447 }
2448 BUG_ON(1);
2449}
uint8_t len
void StatsThreadCleanup(ThreadVars *tv)
Definition counters.c:1308
void StatsSyncCounters(ThreadVars *tv)
Definition counters.c:445
int StatsSetupPrivate(ThreadVars *tv)
Definition counters.c:1209
uint8_t flags
Definition decode-gre.h:0
#define PKT_PSEUDO_STREAM_END
Definition decode.h:1268
#define PKT_SET_SRC(p, src_val)
Definition decode.h:1325
@ PKT_SRC_SHUTDOWN_FLUSH
Definition decode.h:64
@ PKT_SRC_CAPTURE_TIMEOUT
Definition decode.h:62
struct PrefilterEngineFlowbits __attribute__
DNP3 application header.
SCMutex m
Definition flow-hash.h:6
FlowQueue * FlowQueueNew(void)
Definition flow-queue.c:35
void FlowEnqueue(FlowQueue *q, Flow *f)
add a flow to a queue
Definition flow-queue.c:173
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition decode.c:293
Packet * PacketGetFromAlloc(void)
Get a malloced packet.
Definition decode.c:258
const char * PktSrcToString(enum PktSrcEnum pkt_src)
Definition decode.c:855
void CaptureStatsSetup(ThreadVars *tv)
Definition decode.c:1034
void PacketEnqueue(PacketQueue *q, Packet *p)
Packet * PacketDequeue(PacketQueue *q)
Packet * PacketDequeueNoLock(PacketQueueNoLock *qnl)
bool threading_set_cpu_affinity
Definition runmodes.c:62
bool RunmodeIsAutofp(void)
Definition runmodes.c:209
const char * thread_name_flow_mgr
Definition runmodes.c:70
const char * thread_name_flow_rec
Definition runmodes.c:71
const char * thread_name_verdict
Definition runmodes.c:69
uint64_t threading_set_stack_size
Definition runmodes.c:63
float threading_detect_ratio
Definition runmodes.c:949
bool RunmodeIsWorkers(void)
Definition runmodes.c:204
const char * thread_name_autofp
Definition runmodes.c:66
const char * thread_name_workers
Definition runmodes.c:68
uint64_t ts
Flow data structure.
Definition flow.h:356
simple fifo queue for packets
struct Packet_ * top
simple fifo queue for packets with mutex and cond Calling the mutex or triggering the cond is respons...
uint32_t len
SCMutex mutex_q
struct Packet_ * top
SCCondT cond_q
struct Flow_ * flow
Definition decode.h:546
uint32_t flags
Definition decode.h:544
uint64_t secs
Definition util-time.h:41
Per thread variable structure.
Definition threadvars.h:58
char * thread_group_name
Definition threadvars.h:67
bool break_loop
Definition threadvars.h:136
struct PacketQueue_ * stream_pq_local
Definition threadvars.h:117
uint8_t cap_flags
Definition threadvars.h:81
void *(* tm_func)(void *)
Definition threadvars.h:63
struct TmSlot_ * tm_slots
Definition threadvars.h:96
uint8_t type
Definition threadvars.h:72
void(* tmqh_out)(struct ThreadVars_ *, struct Packet_ *)
Definition threadvars.h:106
char name[16]
Definition threadvars.h:65
uint8_t outq_id
Definition threadvars.h:84
PacketQueueNoLock decode_pq
Definition threadvars.h:112
SCCtrlMutex * ctrl_mutex
Definition threadvars.h:132
uint8_t tmm_flags
Definition threadvars.h:79
struct Packet_ *(* tmqh_in)(struct ThreadVars_ *)
Definition threadvars.h:91
pthread_t t
Definition threadvars.h:59
void * outctx
Definition threadvars.h:105
struct ThreadVars_ * next
Definition threadvars.h:125
struct TmSlot_ * tm_flowworker
Definition threadvars.h:101
StatsPublicThreadContext perf_public_ctx
Definition threadvars.h:128
uint8_t inq_id
Definition threadvars.h:83
struct PacketQueue_ * stream_pq
Definition threadvars.h:116
char * printable_name
Definition threadvars.h:66
int thread_priority
Definition threadvars.h:75
char * iface_name
Definition threadvars.h:139
uint8_t thread_setup_flags
Definition threadvars.h:69
struct FlowQueue_ * flow_queue
Definition threadvars.h:135
uint16_t cpu_affinity
Definition threadvars.h:74
SCCtrlCondT * ctrl_cond
Definition threadvars.h:133
const char * name
SCTime_t sys_sec_stamp
ThreadVars * tv
SCSpinlock spin
SC_ATOMIC_DECLARE(SCTime_t, pktts)
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition tm-modules.h:53
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition tm-modules.h:52
bool(* ThreadBusy)(ThreadVars *tv, void *thread_data)
Definition tm-modules.h:67
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition tm-modules.h:61
uint8_t cap_flags
Definition tm-modules.h:77
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition tm-modules.h:56
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition tm-modules.h:58
uint8_t flags
Definition tm-modules.h:80
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition tm-modules.h:51
TmEcode(* Management)(ThreadVars *, void *)
Definition tm-modules.h:69
const void * slot_initdata
Definition tm-threads.h:77
TmSlotFunc SlotFunc
Definition tm-threads.h:56
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition tm-threads.h:57
TmEcode(* SlotThreadDeinit)(ThreadVars *, void *)
Definition tm-threads.h:74
struct TmSlot_ * slot_next
Definition tm-threads.h:62
uint8_t tm_flags
Definition tm-threads.h:67
TmEcode(* Management)(ThreadVars *, void *)
Definition tm-threads.h:58
TmEcode(* SlotThreadInit)(ThreadVars *, const void *, void **)
Definition tm-threads.h:72
void(* SlotThreadExitPrintStats)(ThreadVars *, void *)
Definition tm-threads.h:73
int tm_id
Definition tm-threads.h:70
uint16_t id
Definition tm-queues.h:32
uint16_t writer_cnt
Definition tm-queues.h:34
PacketQueue * pq
Definition tm-queues.h:35
uint16_t reader_cnt
Definition tm-queues.h:33
bool is_packet_pool
Definition tm-queues.h:31
Packet *(* InHandler)(ThreadVars *)
void(* OutHandlerCtxFree)(void *)
void(* InShutdownHandler)(ThreadVars *)
void(* OutHandler)(ThreadVars *, Packet *)
void *(* OutHandlerCtxSetup)(const char *)
#define BUG_ON(x)
#define MIN(x, y)
size_t strlcat(char *, const char *src, size_t siz)
#define CLS
size_t strlcpy(char *dst, const char *src, size_t siz)
void EngineDone(void)
Used to indicate that the current task is done.
Definition suricata.c:481
volatile uint8_t suricata_ctl_flags
Definition suricata.c:172
void SCThreadRunInitCallbacks(ThreadVars *tv)
unsigned int ThreadStorageSize(void)
void ThreadFreeStorage(ThreadVars *tv)
#define SCCtrlMutexInit(mut, mutattr)
#define SCCtrlCondInit
#define SCMutexDestroy
#define SCCtrlMutexLock(mut)
#define SCSpinlock
#define SCMUTEX_INITIALIZER
#define SCSpinInit
#define SCSpinUnlock
#define SCMutex
#define SCMutexUnlock(mut)
#define SCSpinLock
#define SCCtrlCondDestroy
#define SCCtrlMutexUnlock(mut)
#define SCMutexInit(mut, mutattrs)
#define SCCondSignal
#define SCCtrlMutexDestroy
#define SCMutexLock(mut)
thread_local uint64_t rww_lock_cnt
thread_local uint64_t mutex_lock_cnt
thread_local uint64_t spin_lock_wait_ticks
thread_local uint64_t rwr_lock_wait_ticks
thread_local uint64_t rwr_lock_cnt
thread_local uint64_t rww_lock_wait_ticks
thread_local uint64_t mutex_lock_wait_ticks
thread_local uint64_t spin_lock_cnt
thread_local uint64_t rwr_lock_contention
thread_local uint64_t mutex_lock_contention
thread_local uint64_t rww_lock_contention
thread_local uint64_t spin_lock_contention
#define SCSetThreadName(n)
Definition threads.h:304
@ PRIO_MEDIUM
Definition threads.h:89
@ PRIO_HIGH
Definition threads.h:90
@ PRIO_LOW
Definition threads.h:88
#define SCGetThreadIdLong(...)
Definition threads.h:255
#define THV_RUNNING_DONE
Definition threadvars.h:46
#define THV_FLOW_LOOP
Definition threadvars.h:48
#define THREAD_SET_AFFINITY
Definition threadvars.h:145
#define THV_REQ_FLOW_LOOP
Definition threadvars.h:47
#define THV_PAUSE
Definition threadvars.h:38
#define THV_CLOSED
Definition threadvars.h:42
#define THV_DEAD
Definition threadvars.h:54
#define THREAD_SET_AFFTYPE
Definition threadvars.h:147
#define THREAD_SET_PRIORITY
Definition threadvars.h:146
#define THV_DEINIT
Definition threadvars.h:45
#define THV_FAILED
Definition threadvars.h:41
#define THV_KILL
Definition threadvars.h:40
#define THV_RUNNING
Definition threadvars.h:55
#define THV_PAUSED
Definition threadvars.h:39
#define THV_INIT_DONE
Definition threadvars.h:37
int TmModuleGetIDForTM(TmModule *tm)
Given a TM Module, returns its id.
Definition tm-modules.c:88
TmModule * TmModuleGetById(int id)
Returns a TM Module by its id.
Definition tm-modules.c:69
TmModule * TmModuleGetByName(const char *name)
get a tm module ptr by name
Definition tm-modules.c:46
#define TM_FLAG_RECEIVE_TM
Definition tm-modules.h:32
#define TM_FLAG_DECODE_TM
Definition tm-modules.h:33
Tmqh * TmqhGetQueueHandlerByName(const char *name)
int TmqhNameToID(const char *name)
Tmqh * TmqhGetQueueHandlerByID(const int id)
@ TMQH_NOT_SET
Tmq * TmqGetQueueByName(const char *name)
Definition tm-queues.c:59
Tmq * TmqCreateQueue(const char *name)
@ TVT_MAX
@ TVT_CMD
@ TVT_MGMT
@ TVT_PPT
@ TMM_FLOWWORKER
@ TM_ECODE_FAILED
@ TM_ECODE_OK
@ TM_ECODE_DONE
void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
Appends a new entry to the slots.
Definition tm-threads.c:658
void TmThreadKillThreads(void)
SCTime_t sys_sec_stamp
void TmThreadClearThreadsFamily(int family)
SCTime_t TmThreadsGetThreadTime(const int idx)
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
Definition tm-threads.c:93
ThreadVars * TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
ThreadVars * TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots, void *(*fn_p)(void *), int mucond)
Creates and returns the TV instance for a new thread.
Definition tm-threads.c:938
void TmThreadSetPrio(ThreadVars *tv)
Adjusting nice value for threads.
Definition tm-threads.c:785
void TmThreadAppend(ThreadVars *tv, int type)
Appends this TV to tv_root based on its type.
void TmThreadInitMC(ThreadVars *tv)
Initializes the mutex and condition variables for this TV.
bool SCTmThreadsSlotPacketLoopFinish(ThreadVars *tv)
Definition tm-threads.c:273
TmEcode TmThreadWaitOnThreadInit(void)
Used to check if all threads have finished their initialization. On finding an un-initialized thread,...
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
Separate run function so we can call it recursively.
Definition tm-threads.c:133
void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
Unset a thread flag.
Definition tm-threads.c:109
ThreadVars * TmThreadCreateCmdThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Command thread (CMD). This function supports only custom sl...
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
int TmThreadGetNbThreads(uint8_t type)
Definition tm-threads.c:847
void TmThreadsUnsealThreads(void)
TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu)
Set the thread options (cpu affinity).
Definition tm-threads.c:822
#define STEP
void TmThreadSetGroupName(ThreadVars *tv, const char *name)
#define MIN_WAIT_TIME
int TmThreadsRegisterThread(ThreadVars *tv, const int type)
ThreadVars * TmThreadCreatePacketHandler(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots)
Creates and returns a TV instance for a Packet Processing Thread. This function doesn't support custo...
uint16_t TmThreadsGetWorkerThreadMax(void)
Threads
const char * name
void TmThreadsInjectFlowById(Flow *f, const int id)
inject a flow into a threads flow queue
void TmThreadWaitForFlag(ThreadVars *tv, uint32_t flags)
Waits till the specified flag(s) is(are) set. We don't bother if the kill flag has been set or not on...
void TmThreadsListThreads(void)
ThreadVars * tv_root[TVT_MAX]
Definition tm-threads.c:82
void TmThreadDisableReceiveThreads(void)
Disable all threads having the specified TMs.
void TmThreadsInitThreadsTimestamp(const SCTime_t ts)
TmEcode TmThreadLibSpawn(ThreadVars *tv)
Spawns a "fake" lib thread associated with the ThreadVars instance tv.
void TmThreadsUnregisterThread(const int id)
int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
Definition tm-threads.c:166
TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio)
Set the thread options (thread priority).
Definition tm-threads.c:774
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
returns a count of all the threads that match the flag
void TmThreadContinue(ThreadVars *tv)
Unpauses a thread.
void TmThreadKillThreadsFamily(int family)
ThreadVars * tv
void TmThreadContinueThreads(void)
Unpauses all threads present in tv_root.
SCMutex tv_root_lock
Definition tm-threads.c:85
bool TmThreadsWaitForUnpause(ThreadVars *tv)
Wait for a thread to become unpaused.
Definition tm-threads.c:363
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition tm-threads.c:101
int type
void TmThreadCheckThreadState(void)
Used to check the thread for certain conditions of failure.
void TmThreadDisablePacketThreads(const uint16_t set, const uint16_t check, const uint8_t module_flags)
Disable all packet threads.
#define MAX_WAIT_TIME
bool TmThreadsTimeSubsysIsReady(void)
void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts)
void TmThreadsSealThreads(void)
TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type)
Definition tm-threads.c:831
TmEcode TmThreadSetupOptions(ThreadVars *tv)
Set the thread options (cpu affinitythread). Priority should be already set by pthread_create.
Definition tm-threads.c:863
void TmThreadsGetMinimalTimestamp(struct timeval *ts)
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
TmEcode TmThreadWaitOnThreadRunning(void)
Waits for all threads to be in a running state.
TmEcode TmThreadsProcessDecodePseudoPackets(ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot)
Definition tm-threads.c:114
#define SleepMsec(msec)
Definition tm-threads.h:45
#define SleepUsec(usec)
Definition tm-threads.h:44
void PacketPoolDestroy(void)
void PacketPoolInit(void)
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
uint32_t cnt
ThreadsAffinityType thread_affinity[MAX_CPU_SET]
char * AffinityGetYamlPath(ThreadsAffinityType *taf)
Get the YAML path for the given affinity type. The path is built using the parent name (if available)...
uint16_t UtilAffinityGetAffinedCPUNum(ThreadsAffinityType *taf)
Return the total number of CPUs in a given affinity.
ThreadsAffinityType * FindAffinityByInterface(ThreadsAffinityType *parent, const char *interface_name)
uint16_t AffinityGetNextCPU(ThreadVars *tv, ThreadsAffinityType *taf)
@ MANAGEMENT_CPU_SET
@ RECEIVE_CPU_SET
@ MAX_CPU_SET
@ WORKER_CPU_SET
@ EXCLUSIVE_AFFINITY
#define SC_ATOMIC_INIT(name)
wrapper for initializing an atomic variable.
#define SC_ATOMIC_OR(name, val)
Bitwise OR a value to our atomic variable.
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value to our atomic variable.
#define SC_ATOMIC_INITPTR(name)
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
uint16_t UtilCpuGetNumProcessorsOnline(void)
Get the number of cpus online in the system.
Definition util-cpu.c:108
#define SCEnter(...)
Definition util-debug.h:277
#define FatalError(...)
Definition util-debug.h:510
#define SCLogPerf(...)
Definition util-debug.h:234
#define SCLogDebug(...)
Definition util-debug.h:275
#define SCReturnInt(x)
Definition util-debug.h:281
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition util-debug.h:243
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition util-debug.h:255
#define SCLogError(...)
Macro used to log ERROR messages.
Definition util-debug.h:267
#define SCReturn
Definition util-debug.h:279
#define SCMalloc(sz)
Definition util-mem.h:47
#define SCFree(p)
Definition util-mem.h:61
#define SCRealloc(ptr, sz)
Definition util-mem.h:50
#define SCCalloc(nm, sz)
Definition util-mem.h:53
#define SCStrdup(s)
Definition util-mem.h:56
#define likely(expr)
#define unlikely(expr)
#define SCDropCaps(...)
Definition util-privs.h:89
#define PACKET_PROFILING_TMM_END(p, id)
#define PACKET_PROFILING_TMM_START(p, id)
#define SCTIME_CMP_NEQ(a, b)
Definition util-time.h:107
#define SCTIME_TO_TIMEVAL(tv, t)
Definition util-time.h:97
#define SCTIME_INITIALIZER
Definition util-time.h:51
#define SCTIME_CMP_LT(a, b)
Definition util-time.h:105
#define SCTIME_CMP_EQ(a, b)
Definition util-time.h:108
#define SCTIME_ADD_SECS(ts, s)
Definition util-time.h:64
#define SCTIME_FROM_TIMEVAL(tv)
Definition util-time.h:79
#define DEBUG_VALIDATE_BUG_ON(exp)