suricata
flow-worker.c
Go to the documentation of this file.
1/* Copyright (C) 2016-2024 Open Information Security Foundation
2 *
3 * You can copy, redistribute or modify this Program under the terms of
4 * the GNU General Public License version 2 as published by the Free
5 * Software Foundation.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * version 2 along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15 * 02110-1301, USA.
16 */
17
18/**
19 * \file
20 *
21 * \author Victor Julien <victor@inliniac.net>
22 *
23 * Flow Workers are single thread modules taking care of (almost)
24 * everything related to packets with flows:
25 *
26 * - Lookup/creation
27 * - Stream tracking, reassembly
28 * - Applayer update
29 * - Detection
30 *
31 * This all while holding the flow lock.
32 */
33
34#include "suricata-common.h"
35#include "suricata.h"
36
37#include "action-globals.h"
38#include "packet.h"
39#include "decode.h"
40#include "detect.h"
41#include "stream-tcp.h"
42#include "app-layer.h"
43#include "detect-engine.h"
44#include "output.h"
45#include "app-layer-parser.h"
46#include "app-layer-frames.h"
47
48#include "util-profiling.h"
49#include "util-validate.h"
50#include "util-time.h"
51#include "tmqh-packetpool.h"
52
53#include "flow-util.h"
54#include "flow-manager.h"
55#include "flow-timeout.h"
56#include "flow-spare-pool.h"
57#include "flow-worker.h"
58
60
65
66typedef struct FlowWorkerThreadData_ {
68
69 union {
72 };
73
75
76 SC_ATOMIC_DECLARE(bool, flush_ack);
77
78 void *output_thread; /* Output thread data. */
79 void *output_thread_flow; /* Output thread data. */
80
85 /** Queue to put pseudo packets that have been created by the stream (RST response) and by the
86 * flush logic following a protocol change. */
89
90 struct {
93 uint16_t flows_removed;
96 } cnt;
98
100
101static void FlowWorkerFlowTimeout(
103
104/**
105 * \internal
106 * \brief Forces reassembly for flow if it needs it.
107 *
108 * The function requires flow to be locked beforehand.
109 *
110 * \param f Pointer to the flow.
111 *
112 * \retval cnt number of packets injected
113 */
114static int FlowFinish(ThreadVars *tv, Flow *f, FlowWorkerThreadData *fw, void *detect_thread)
115{
116 const int server = f->ffr_tc;
117 const int client = f->ffr_ts;
118 int cnt = 0;
119
120 /* Get the tcp session for the flow */
121 const TcpSession *ssn = (TcpSession *)f->protoctx;
122
123 /* insert a pseudo packet in the toserver direction */
125 Packet *p = FlowPseudoPacketGet(0, f, ssn);
126 if (p != NULL) {
130 }
131 FlowWorkerFlowTimeout(tv, p, fw, detect_thread);
133 cnt++;
134 }
135 }
136
137 /* handle toclient */
139 Packet *p = FlowPseudoPacketGet(1, f, ssn);
140 if (p != NULL) {
143 FlowWorkerFlowTimeout(tv, p, fw, detect_thread);
146 cnt++;
147 }
148 }
149
150 if (cnt > 0) {
152 }
153 return cnt;
154}
155
156/** \param[in] max_work Max flows to process. 0 if unlimited. */
157static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw, FlowTimeoutCounters *counters,
158 FlowQueuePrivate *fq, const uint32_t max_work)
159{
160 FlowQueuePrivate ret_queue = { NULL, NULL, 0 };
161 uint32_t i = 0;
162 Flow *f;
163 while ((f = FlowQueuePrivateGetFromTop(fq)) != NULL) {
165 f->flow_end_flags |= FLOW_END_FLAG_TIMEOUT; //TODO emerg
166
167 if (f->proto == IPPROTO_TCP) {
169 !FlowIsBypassed(f) && FlowNeedsReassembly(f) && f->ffr != 0) {
170 /* read detect thread in case we're doing a reload */
171 void *detect_thread = SC_ATOMIC_GET(fw->detect_thread);
172 int cnt = FlowFinish(tv, f, fw, detect_thread);
173 counters->flows_aside_pkt_inject += cnt;
174 counters->flows_aside_needs_work++;
175 }
176 }
177
178 /* no one is referring to this flow, removed from hash
179 * so we can unlock it and pass it to the flow recycler */
180
181 if (fw->output_thread_flow != NULL)
182 (void)OutputFlowLog(tv, fw->output_thread_flow, f);
183
184 FlowEndCountersUpdate(tv, &fw->fec, f);
185 if (f->proto == IPPROTO_TCP && f->protoctx != NULL) {
187 }
189
192
194 FlowQueuePrivatePrependFlow(&ret_queue, f);
195 if (ret_queue.len == FLOW_SPARE_POOL_BLOCK_SIZE) {
196 FlowSparePoolReturnFlows(&ret_queue);
197 }
198 } else {
200 }
201
202 if (max_work != 0 && ++i == max_work)
203 break;
204 }
205 if (ret_queue.len > 0) {
206 FlowSparePoolReturnFlows(&ret_queue);
207 }
208
209 StatsAddUI64(tv, fw->cnt.flows_removed, (uint64_t)i);
210}
211
212/** \brief handle flow for packet
213 *
214 * Handle flow creation/lookup
215 */
216static inline TmEcode FlowUpdate(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p)
217{
218 FlowHandlePacketUpdate(p->flow, p, tv, fw->dtv);
219
220 int state = p->flow->flow_state;
221 switch (state) {
222#ifdef CAPTURE_OFFLOAD
223 case FLOW_STATE_CAPTURE_BYPASSED: {
226 Flow *f = p->flow;
227 FlowDeReference(&p->flow);
229 return TM_ECODE_DONE;
230 }
231#endif
235 Flow *f = p->flow;
236 FlowDeReference(&p->flow);
238 return TM_ECODE_DONE;
239 }
240 default:
241 return TM_ECODE_OK;
242 }
243}
244
245static TmEcode FlowWorkerThreadDeinit(ThreadVars *tv, void *data);
246
247static TmEcode FlowWorkerThreadInit(ThreadVars *tv, const void *initdata, void **data)
248{
249 FlowWorkerThreadData *fw = SCCalloc(1, sizeof(*fw));
250 if (fw == NULL)
251 return TM_ECODE_FAILED;
252
253 SC_ATOMIC_INITPTR(fw->detect_thread);
254 SC_ATOMIC_SET(fw->detect_thread, NULL);
255
256 fw->local_bypass_pkts = StatsRegisterCounter("flow_bypassed.local_pkts", tv);
257 fw->local_bypass_bytes = StatsRegisterCounter("flow_bypassed.local_bytes", tv);
258 fw->both_bypass_pkts = StatsRegisterCounter("flow_bypassed.local_capture_pkts", tv);
259 fw->both_bypass_bytes = StatsRegisterCounter("flow_bypassed.local_capture_bytes", tv);
260
261 fw->cnt.flows_aside_needs_work = StatsRegisterCounter("flow.wrk.flows_evicted_needs_work", tv);
262 fw->cnt.flows_aside_pkt_inject = StatsRegisterCounter("flow.wrk.flows_evicted_pkt_inject", tv);
263 fw->cnt.flows_removed = StatsRegisterCounter("flow.wrk.flows_evicted", tv);
264 fw->cnt.flows_injected = StatsRegisterCounter("flow.wrk.flows_injected", tv);
265 fw->cnt.flows_injected_max = StatsRegisterMaxCounter("flow.wrk.flows_injected_max", tv);
266
267 fw->fls.dtv = fw->dtv = DecodeThreadVarsAlloc(tv);
268 if (fw->dtv == NULL) {
269 FlowWorkerThreadDeinit(tv, fw);
270 return TM_ECODE_FAILED;
271 }
272
273 /* setup TCP */
275 FlowWorkerThreadDeinit(tv, fw);
276 return TM_ECODE_FAILED;
277 }
278
279 if (DetectEngineEnabled()) {
280 /* setup DETECT */
281 void *detect_thread = NULL;
282 if (DetectEngineThreadCtxInit(tv, NULL, &detect_thread) != TM_ECODE_OK) {
283 FlowWorkerThreadDeinit(tv, fw);
284 return TM_ECODE_FAILED;
285 }
286 SC_ATOMIC_SET(fw->detect_thread, detect_thread);
287 }
288
289 /* Setup outputs for this thread. */
290 if (OutputLoggerThreadInit(tv, initdata, &fw->output_thread) != TM_ECODE_OK) {
291 FlowWorkerThreadDeinit(tv, fw);
292 return TM_ECODE_FAILED;
293 }
295 SCLogError("initializing flow log API for thread failed");
296 FlowWorkerThreadDeinit(tv, fw);
297 return TM_ECODE_FAILED;
298 }
299
303
304 /* setup pq for stream end pkts */
305 memset(&fw->pq, 0, sizeof(PacketQueueNoLock));
306 *data = fw;
307 return TM_ECODE_OK;
308}
309
310static TmEcode FlowWorkerThreadDeinit(ThreadVars *tv, void *data)
311{
312 FlowWorkerThreadData *fw = data;
313
315
316 /* free TCP */
318
319 /* free DETECT */
320 void *detect_thread = SC_ATOMIC_GET(fw->detect_thread);
321 if (detect_thread != NULL) {
322 DetectEngineThreadCtxDeinit(tv, detect_thread);
323 SC_ATOMIC_SET(fw->detect_thread, NULL);
324 }
325
326 /* Free output. */
329
330 /* free pq */
331 BUG_ON(fw->pq.len);
332
333 Flow *f;
334 while ((f = FlowQueuePrivateGetFromTop(&fw->fls.spare_queue)) != NULL) {
335 FlowFree(f);
336 }
337
338 SCFree(fw);
339 return TM_ECODE_OK;
340}
341
342TmEcode Detect(ThreadVars *tv, Packet *p, void *data);
344
345static inline void UpdateCounters(ThreadVars *tv,
346 FlowWorkerThreadData *fw, const FlowTimeoutCounters *counters)
347{
348 if (counters->flows_aside_needs_work) {
350 (uint64_t)counters->flows_aside_needs_work);
351 }
352 if (counters->flows_aside_pkt_inject) {
354 (uint64_t)counters->flows_aside_pkt_inject);
355 }
356}
357
358/** \brief update stream engine
359 *
360 * We can be called from both the flow timeout path as well as from the
361 * "real" traffic path. If in the timeout path any additional packets we
362 * forge for flushing pipelines should not leave our scope. If the original
363 * packet is real (or related to a real packet) we need to push the packets
364 * on, so IPS logic stays valid.
365 */
366static inline void FlowWorkerStreamTCPUpdate(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p,
367 DetectEngineThreadCtx *det_ctx, const bool timeout)
368{
369 if (det_ctx != NULL && det_ctx->de_ctx->PreStreamHook != NULL) {
370 const uint8_t action = det_ctx->de_ctx->PreStreamHook(tv, det_ctx, p);
371 if (action & ACTION_DROP) {
373 return;
374 }
375 }
376
378 StreamTcp(tv, p, fw->stream_thread, &fw->pq);
380
381 // this is the first packet that sets no payload inspection
382 bool setting_nopayload =
383 p->flow->alparser &&
386 if (FlowChangeProto(p->flow) || setting_nopayload) {
388 if (setting_nopayload) {
389 FlowSetNoPayloadInspectionFlag(p->flow);
390 }
393 }
394
395 /* Packets here can safely access p->flow as it's locked */
396 SCLogDebug("packet %"PRIu64": extra packets %u", p->pcap_cnt, fw->pq.len);
397 Packet *x;
398 while ((x = PacketDequeueNoLock(&fw->pq))) {
399 SCLogDebug("packet %"PRIu64" extra packet %p", p->pcap_cnt, x);
400
401 if (det_ctx != NULL) {
403 Detect(tv, x, det_ctx);
405 }
406
408
409 FramesPrune(x->flow, x);
410 /* Release tcp segments. Done here after alerting can use them. */
413 x->flow, x->flowflags & FLOW_PKT_TOSERVER ? STREAM_TOSERVER : STREAM_TOCLIENT);
415
416 /* no need to keep a flow ref beyond this point */
417 FlowDeReference(&x->flow);
418
419 /* no further work to do for this pseudo packet, so we can return
420 * it to the pool immediately. */
421 if (timeout) {
423 } else {
424 /* to support IPS verdict logic, in the non-timeout case we need to do a bit more */
426 }
427 }
429 // in case f->flags & FLOW_ACTION_DROP was set by one of the dequeued packets
431 }
432}
433
434static void FlowWorkerFlowTimeout(
436{
438
439 SCLogDebug("packet %"PRIu64" is TCP. Direction %s", p->pcap_cnt, PKT_IS_TOSERVER(p) ? "TOSERVER" : "TOCLIENT");
440 DEBUG_VALIDATE_BUG_ON(!(p->flow && PacketIsTCP(p)));
442
443 /* handle TCP and app layer */
444 FlowWorkerStreamTCPUpdate(tv, fw, p, det_ctx, true);
445
447
448 /* handle Detect */
449 SCLogDebug("packet %"PRIu64" calling Detect", p->pcap_cnt);
450 if (det_ctx != NULL) {
452 Detect(tv, p, det_ctx);
454 }
455
456 // Outputs.
458
459 FramesPrune(p->flow, p);
460
461 /* Release tcp segments. Done here after alerting can use them. */
464 STREAM_TOSERVER : STREAM_TOCLIENT);
466
467 /* run tx cleanup last */
469
470 FlowDeReference(&p->flow);
471 /* flow is unlocked later in FlowFinish() */
472}
473
474/** \internal
475 * \brief process flows injected into our queue by other threads
476 */
477static inline void FlowWorkerProcessInjectedFlows(
479{
480 /* take injected flows and append to our work queue */
482 FlowQueuePrivate injected = { NULL, NULL, 0 };
483 if (SC_ATOMIC_GET(tv->flow_queue->non_empty))
485 if (injected.len > 0) {
486 StatsAddUI64(tv, fw->cnt.flows_injected, (uint64_t)injected.len);
487 if (p->pkt_src == PKT_SRC_WIRE)
488 StatsSetUI64(tv, fw->cnt.flows_injected_max, (uint64_t)injected.len);
489
490 /* move to local queue so we can process over the course of multiple packets */
492 }
494}
495
496/** \internal
497 * \brief process flows set aside locally during flow lookup
498 */
499static inline void FlowWorkerProcessLocalFlows(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p)
500{
501 uint32_t max_work = 2;
503 max_work = 0;
504
506 if (fw->fls.work_queue.len) {
507 FlowTimeoutCounters counters = { 0, 0, };
508 CheckWorkQueue(tv, fw, &counters, &fw->fls.work_queue, max_work);
509 UpdateCounters(tv, fw, &counters);
510 }
512}
513
514/** \internal
515 * \brief apply Packet::app_update_direction to the flow flags
516 */
517static void PacketAppUpdate2FlowFlags(Packet *p)
518{
519 switch ((enum StreamUpdateDir)p->app_update_direction) {
520 case UPDATE_DIR_NONE: // NONE implies pseudo packet
521 SCLogDebug("pcap_cnt %" PRIu64 ", UPDATE_DIR_NONE", p->pcap_cnt);
522 break;
524 if (PKT_IS_TOSERVER(p)) {
526 SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED set", p->pcap_cnt);
527 } else {
529 SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED set", p->pcap_cnt);
530 }
531 break;
532 case UPDATE_DIR_BOTH:
533 if (PKT_IS_TOSERVER(p)) {
535 SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED|FLOW_TC_APP_UPDATE_NEXT set",
536 p->pcap_cnt);
537 } else {
539 SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED|FLOW_TS_APP_UPDATE_NEXT set",
540 p->pcap_cnt);
541 }
542 /* fall through */
544 if (PKT_IS_TOSERVER(p)) {
546 SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED|FLOW_TS_APP_UPDATE_NEXT set",
547 p->pcap_cnt);
548 } else {
550 SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED|FLOW_TC_APP_UPDATE_NEXT set",
551 p->pcap_cnt);
552 }
553 break;
554 }
555}
556
557static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)
558{
559 FlowWorkerThreadData *fw = data;
560 DetectEngineThreadCtx *det_ctx = SC_ATOMIC_GET(fw->detect_thread);
561
562 DEBUG_VALIDATE_BUG_ON(p == NULL);
564
565 SCLogDebug("packet %"PRIu64, p->pcap_cnt);
566
567 if ((PKT_IS_FLUSHPKT(p))) {
568 SCLogDebug("thread %s flushing", tv->printable_name);
570 /* Ack if a flush was requested */
571 bool notset = false;
572 SC_ATOMIC_CAS(&fw->flush_ack, notset, true);
573 return TM_ECODE_OK;
574 }
575
576 /* handle Flow */
577 if (det_ctx != NULL && det_ctx->de_ctx->PreFlowHook != NULL) {
578 const uint8_t action = det_ctx->de_ctx->PreFlowHook(tv, det_ctx, p);
579 if (action & ACTION_DROP) {
581 goto pre_flow_drop;
582 }
583 }
584
585 if (p->flags & PKT_WANTS_FLOW) {
587
588 FlowHandlePacket(tv, &fw->fls, p);
589 if (likely(p->flow != NULL)) {
591 if (FlowUpdate(tv, fw, p) == TM_ECODE_DONE) {
592 /* update time */
593 if (!(PKT_IS_PSEUDOPKT(p))) {
594 TimeSetByThread(tv->id, p->ts);
595 }
596 goto housekeeping;
597 }
598 }
599 /* Flow is now LOCKED */
600
602
603 /* if PKT_WANTS_FLOW is not set, but PKT_HAS_FLOW is, then this is a
604 * pseudo packet created by the flow manager. */
605 } else if (p->flags & PKT_HAS_FLOW) {
608 }
609
610 /* update time */
611 if (!(PKT_IS_PSEUDOPKT(p))) {
612 TimeSetByThread(tv->id, p->ts);
613 }
614
615 SCLogDebug("packet %"PRIu64" has flow? %s", p->pcap_cnt, p->flow ? "yes" : "no");
616
617 /* handle TCP and app layer */
618 if (p->flow) {
619 SCLogDebug("packet %" PRIu64
620 ": direction %s FLOW_TS_APP_UPDATE_NEXT %s FLOW_TC_APP_UPDATE_NEXT %s",
621 p->pcap_cnt, PKT_IS_TOSERVER(p) ? "toserver" : "toclient",
622 BOOL2STR((p->flow->flags & FLOW_TS_APP_UPDATE_NEXT) != 0),
623 BOOL2STR((p->flow->flags & FLOW_TC_APP_UPDATE_NEXT) != 0));
624 /* see if need to consider flags set by prev packets */
627 p->flow->flags &= ~FLOW_TS_APP_UPDATE_NEXT;
628 SCLogDebug("FLOW_TS_APP_UPDATED");
629 } else if (PKT_IS_TOCLIENT(p) && (p->flow->flags & FLOW_TC_APP_UPDATE_NEXT)) {
631 p->flow->flags &= ~FLOW_TC_APP_UPDATE_NEXT;
632 SCLogDebug("FLOW_TC_APP_UPDATED");
633 }
634
635 if (PacketIsTCP(p)) {
636 SCLogDebug("packet %" PRIu64 " is TCP. Direction %s", p->pcap_cnt,
637 PKT_IS_TOSERVER(p) ? "TOSERVER" : "TOCLIENT");
639
640 /* if detect is disabled, we need to apply file flags to the flow
641 * here on the first packet. */
642 if (det_ctx == NULL &&
646 }
647
648 FlowWorkerStreamTCPUpdate(tv, fw, p, det_ctx, false);
649 PacketAppUpdate2FlowFlags(p);
650
651 /* handle the app layer part of the UDP packet payload */
652 } else if (p->proto == IPPROTO_UDP && !PacketCheckAction(p, ACTION_DROP)) {
656 PacketAppUpdate2FlowFlags(p);
657 }
658 }
659
661
662 /* handle Detect */
664 SCLogDebug("packet %"PRIu64" calling Detect", p->pcap_cnt);
665 if (det_ctx != NULL) {
667 Detect(tv, p, det_ctx);
669 }
670
671pre_flow_drop:
672 // Outputs.
674
675 /* Release tcp segments. Done here after alerting can use them. */
676 if (p->flow != NULL) {
678
679 if (FlowIsBypassed(p->flow)) {
681 if (p->proto == IPPROTO_TCP) {
683 }
684 } else if (p->proto == IPPROTO_TCP && p->flow->protoctx && p->flags & PKT_STREAM_EST) {
685 if ((p->flow->flags & FLOW_TS_APP_UPDATED) && PKT_IS_TOSERVER(p)) {
686 FramesPrune(p->flow, p);
687 } else if ((p->flow->flags & FLOW_TC_APP_UPDATED) && PKT_IS_TOCLIENT(p)) {
688 FramesPrune(p->flow, p);
689 }
692 STREAM_TOSERVER : STREAM_TOCLIENT);
694 } else if (p->proto == IPPROTO_UDP) {
695 FramesPrune(p->flow, p);
696 }
697
698 if ((PKT_IS_PSEUDOPKT(p)) ||
700 if ((p->flags & PKT_STREAM_EST) || p->proto != IPPROTO_TCP) {
701 if (PKT_IS_TOSERVER(p)) {
702 if (PKT_IS_PSEUDOPKT(p) || (p->flow->flags & (FLOW_TS_APP_UPDATED))) {
703 AppLayerParserTransactionsCleanup(p->flow, STREAM_TOSERVER);
704 p->flow->flags &= ~FLOW_TS_APP_UPDATED;
705 SCLogDebug("~FLOW_TS_APP_UPDATED");
706 }
707 } else {
708 if (PKT_IS_PSEUDOPKT(p) || (p->flow->flags & (FLOW_TC_APP_UPDATED))) {
709 AppLayerParserTransactionsCleanup(p->flow, STREAM_TOCLIENT);
710 p->flow->flags &= ~FLOW_TC_APP_UPDATED;
711 SCLogDebug("~FLOW_TC_APP_UPDATED");
712 }
713 }
714 }
715 } else {
716 SCLogDebug("not pseudo, no app update: skip");
717 }
718
719 if (p->flow->flags & FLOW_ACTION_DROP) {
720 SCLogDebug("flow drop in place: remove app update flags");
722 }
723
724 Flow *f = p->flow;
725 FlowDeReference(&p->flow);
727 }
728
729housekeeping:
730
731 /* take injected flows and add them to our local queue */
732 FlowWorkerProcessInjectedFlows(tv, fw, p);
733
734 /* process local work queue */
735 FlowWorkerProcessLocalFlows(tv, fw, p);
736
737 return TM_ECODE_OK;
738}
739
740void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx)
741{
742 FlowWorkerThreadData *fw = flow_worker;
743
744 SC_ATOMIC_SET(fw->detect_thread, detect_ctx);
745}
746
747void *FlowWorkerGetDetectCtxPtr(void *flow_worker)
748{
749 FlowWorkerThreadData *fw = flow_worker;
750
751 return SC_ATOMIC_GET(fw->detect_thread);
752}
753
754void *FlowWorkerGetThreadData(void *flow_worker)
755{
756 return (FlowWorkerThreadData *)flow_worker;
757}
758
759bool FlowWorkerGetFlushAck(void *flow_worker)
760{
761 FlowWorkerThreadData *fw = flow_worker;
762 return SC_ATOMIC_GET(fw->flush_ack) == true;
763}
764
765void FlowWorkerSetFlushAck(void *flow_worker)
766{
767 FlowWorkerThreadData *fw = flow_worker;
768 SC_ATOMIC_SET(fw->flush_ack, false);
769}
770
772{
773 switch (fwi) {
775 return "flow";
777 return "stream";
779 return "app-layer";
781 return "detect";
783 return "tcp-prune";
785 return "flow-inject";
787 return "flow-evict";
789 return "size";
790 }
791 return "error";
792}
793
794static bool FlowWorkerIsBusy(ThreadVars *tv, void *flow_worker)
795{
796 FlowWorkerThreadData *fw = flow_worker;
797 if (fw->pq.len)
798 return true;
799 if (fw->fls.work_queue.len)
800 return true;
801
802 if (tv->flow_queue) {
804 bool fq_done = (tv->flow_queue->qlen == 0);
806 if (!fq_done) {
807 return true;
808 }
809 }
810
811 return false;
812}
813
815{
816 tmm_modules[TMM_FLOWWORKER].name = "FlowWorker";
817 tmm_modules[TMM_FLOWWORKER].ThreadInit = FlowWorkerThreadInit;
818 tmm_modules[TMM_FLOWWORKER].Func = FlowWorker;
819 tmm_modules[TMM_FLOWWORKER].ThreadBusy = FlowWorkerIsBusy;
820 tmm_modules[TMM_FLOWWORKER].ThreadDeinit = FlowWorkerThreadDeinit;
823}
#define ACTION_DROP
void FramesPrune(Flow *f, Packet *p)
uint16_t SCAppLayerParserStateIssetFlag(AppLayerParserState *pstate, uint16_t flag)
void AppLayerParserTransactionsCleanup(Flow *f, const uint8_t pkt_dir)
remove obsolete (inspected and logged) transactions
void SCAppLayerParserStateSetFlag(AppLayerParserState *pstate, uint16_t flag)
#define APP_LAYER_PARSER_NO_INSPECTION
#define APP_LAYER_PARSER_EOF_TC
#define APP_LAYER_PARSER_EOF_TS
void AppLayerRegisterThreadCounters(ThreadVars *tv)
Registers per flow counters for all protocols.
Definition app-layer.c:1307
int AppLayerHandleUdp(ThreadVars *tv, AppLayerThreadCtx *tctx, Packet *p, Flow *f)
Handle a app layer UDP message.
Definition app-layer.c:878
void StatsDecr(ThreadVars *tv, uint16_t id)
Decrements the local counter.
Definition counters.c:186
uint16_t StatsRegisterMaxCounter(const char *name, struct ThreadVars_ *tv)
Registers a counter, whose value holds the maximum of all the values assigned to it.
Definition counters.c:992
uint16_t StatsRegisterCounter(const char *name, struct ThreadVars_ *tv)
Registers a normal, unqualified counter.
Definition counters.c:952
void StatsSetUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Sets a value of type double to the local counter.
Definition counters.c:207
void StatsAddUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Adds a value of type uint64_t to the local counter.
Definition counters.c:146
uint8_t flags
Definition decode-gre.h:0
#define PKT_HAS_FLOW
Definition decode.h:1266
@ PKT_DROP_REASON_FLOW_DROP
Definition decode.h:386
@ PKT_DROP_REASON_STREAM_PRE_HOOK
Definition decode.h:400
@ PKT_DROP_REASON_FLOW_PRE_HOOK
Definition decode.h:401
#define PKT_WANTS_FLOW
Definition decode.h:1296
#define PKT_NOPAYLOAD_INSPECTION
Definition decode.h:1252
#define PKT_SET_SRC(p, src_val)
Definition decode.h:1325
#define PKT_IS_TOCLIENT(p)
Definition decode.h:239
#define GET_PKT_LEN(p)
Definition decode.h:208
#define PKT_IS_FLUSHPKT(p)
Definition decode.h:1323
#define PKT_IS_PSEUDOPKT(p)
return 1 if the packet is a pseudo packet
Definition decode.h:1321
#define PKT_IS_TOSERVER(p)
Definition decode.h:238
#define PKT_STREAM_EST
Definition decode.h:1262
@ PKT_SRC_WIRE
Definition decode.h:52
@ PKT_SRC_SHUTDOWN_FLUSH
Definition decode.h:64
@ PKT_SRC_CAPTURE_TIMEOUT
Definition decode.h:62
@ PKT_SRC_FFR
Definition decode.h:58
TmEcode DetectEngineThreadCtxInit(ThreadVars *tv, void *initdata, void **data)
initialize thread specific detection engine context
int DetectEngineEnabled(void)
Check if detection is enabled.
TmEcode DetectEngineThreadCtxDeinit(ThreadVars *tv, void *data)
void DisableDetectFlowFileFlags(Flow *f)
disable file features we don't need Called if we have no detection engine.
Definition detect.c:2410
void FlowQueuePrivatePrependFlow(FlowQueuePrivate *fqc, Flow *f)
Definition flow-queue.c:78
Flow * FlowQueuePrivateGetFromTop(FlowQueuePrivate *fqc)
Definition flow-queue.c:151
FlowQueuePrivate FlowQueueExtractPrivate(FlowQueue *fq)
Definition flow-queue.c:140
void FlowQueuePrivateAppendPrivate(FlowQueuePrivate *dest, FlowQueuePrivate *src)
Definition flow-queue.c:88
#define FQLOCK_LOCK(q)
Definition flow-queue.h:72
#define FQLOCK_UNLOCK(q)
Definition flow-queue.h:74
void FlowSparePoolReturnFlows(FlowQueuePrivate *fqp)
#define FLOW_SPARE_POOL_BLOCK_SIZE
Packet * FlowPseudoPacketGet(int direction, Flow *f, const TcpSession *ssn)
bool FlowNeedsReassembly(Flow *f)
Check if a flow needs forced reassembly, or any other processing.
void FlowEndCountersRegister(ThreadVars *t, FlowEndCounters *fec)
Definition flow-util.c:246
void FlowFree(Flow *f)
cleanup & free the memory of a flow
Definition flow-util.c:84
void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx)
TmEcode Detect(ThreadVars *tv, Packet *p, void *data)
Detection engine thread wrapper.
Definition detect.c:2341
void * FlowWorkerGetThreadData(void *flow_worker)
void * FlowWorkerGetDetectCtxPtr(void *flow_worker)
struct FlowWorkerThreadData_ FlowWorkerThreadData
const char * ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi)
void FlowWorkerSetFlushAck(void *flow_worker)
TmEcode StreamTcp(ThreadVars *, Packet *, void *, PacketQueueNoLock *pq)
void TmModuleFlowWorkerRegister(void)
DetectEngineThreadCtx * DetectEngineThreadCtxPtr
Definition flow-worker.c:59
bool FlowWorkerGetFlushAck(void *flow_worker)
ProfileFlowWorkerId
Definition flow-worker.h:21
@ PROFILE_FLOWWORKER_SIZE
Definition flow-worker.h:29
@ PROFILE_FLOWWORKER_APPLAYERUDP
Definition flow-worker.h:24
@ PROFILE_FLOWWORKER_FLOW_EVICTED
Definition flow-worker.h:28
@ PROFILE_FLOWWORKER_DETECT
Definition flow-worker.h:25
@ PROFILE_FLOWWORKER_FLOW
Definition flow-worker.h:22
@ PROFILE_FLOWWORKER_STREAM
Definition flow-worker.h:23
@ PROFILE_FLOWWORKER_TCPPRUNE
Definition flow-worker.h:26
@ PROFILE_FLOWWORKER_FLOW_INJECTED
Definition flow-worker.h:27
void FlowHandlePacket(ThreadVars *tv, FlowLookupStruct *fls, Packet *p)
Entry point for packet flow handling.
Definition flow.c:533
int FlowClearMemory(Flow *f, uint8_t proto_map)
Function clear the flow memory before queueing it to spare flow queue.
Definition flow.c:1097
void FlowCleanupAppLayer(Flow *f)
Definition flow.c:139
int FlowChangeProto(Flow *f)
Check if change proto flag is set for flow.
Definition flow.c:196
void FlowHandlePacketUpdate(Flow *f, Packet *p, ThreadVars *tv, DecodeThreadVars *dtv)
Update Packet and Flow.
Definition flow.c:402
@ FLOW_STATE_LOCAL_BYPASSED
Definition flow.h:507
#define FLOW_PKT_TOSERVER
Definition flow.h:233
#define FLOW_TC_APP_UPDATED
Definition flow.h:120
#define FLOW_ACTION_DROP
Definition flow.h:70
#define FLOW_END_FLAG_TIMEOUT
Definition flow.h:243
#define FLOW_PKT_LAST_PSEUDO
Definition flow.h:240
#define FLOW_TIMEOUT_REASSEMBLY_DONE
Definition flow.h:97
#define FLOW_TS_APP_UPDATED
Definition flow.h:119
#define FLOW_TC_APP_UPDATE_NEXT
Definition flow.h:56
#define FLOW_PKT_TOCLIENT_FIRST
Definition flow.h:237
#define FLOW_TS_APP_UPDATE_NEXT
Definition flow.h:123
#define FLOW_PKT_TOSERVER_FIRST
Definition flow.h:236
#define FLOWLOCK_UNLOCK(fb)
Definition flow.h:273
#define FLOWLOCK_WRLOCK(fb)
Definition flow.h:270
ThreadVars * tv
void PacketUpdateEngineEventCounters(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p)
Definition decode.c:239
void DecodeRegisterPerfCounters(DecodeThreadVars *dtv, ThreadVars *tv)
Definition decode.c:628
DecodeThreadVars * DecodeThreadVarsAlloc(ThreadVars *tv)
Alloc and setup DecodeThreadVars.
Definition decode.c:804
void DecodeThreadVarsFree(ThreadVars *tv, DecodeThreadVars *dtv)
Definition decode.c:822
TmEcode OutputFlowLogThreadDeinit(ThreadVars *tv, void *thread_data)
TmEcode OutputFlowLogThreadInit(ThreadVars *tv, void **data)
thread init for the flow logger This will run the thread init functions for the individual registered...
TmEcode OutputFlowLog(ThreadVars *tv, void *thread_data, Flow *f)
Run flow logger(s)
Definition output-flow.c:87
TmEcode OutputLoggerThreadDeinit(ThreadVars *tv, void *thread_data)
Definition output.c:848
TmEcode OutputLoggerThreadInit(ThreadVars *tv, const void *initdata, void **data)
Definition output.c:817
TmEcode OutputLoggerLog(ThreadVars *tv, Packet *p, void *thread_data)
Definition output.c:803
TmEcode OutputLoggerFlush(ThreadVars *tv, Packet *p, void *thread_data)
Definition output.c:788
Packet * PacketDequeueNoLock(PacketQueueNoLock *qnl)
void PacketDrop(Packet *p, const uint8_t action, enum PacketDropReason r)
issue drop action
Definition packet.c:33
bool PacketCheckAction(const Packet *p, const uint8_t a)
Definition packet.c:49
void StreamTcpPruneSession(Flow *f, uint8_t flags)
Remove idle TcpSegments from TcpSession.
@ UPDATE_DIR_OPPOSING
@ UPDATE_DIR_BOTH
@ UPDATE_DIR_PACKET
@ UPDATE_DIR_NONE
TmEcode StreamTcpThreadInit(ThreadVars *tv, void *initdata, void **data)
TmEcode StreamTcpThreadDeinit(ThreadVars *tv, void *data)
void StreamTcpSessionCleanup(TcpSession *ssn)
Session cleanup function. Does not free the ssn.
Definition stream-tcp.c:327
void StreamTcpDetectLogFlush(ThreadVars *tv, StreamTcpThread *stt, Flow *f, Packet *p, PacketQueueNoLock *pq)
create packets in both directions to flush out logging and detection before switching protocols....
@ STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION
Definition stream-tcp.h:193
@ STREAM_HAS_UNPROCESSED_SEGMENTS_NONE
Definition stream-tcp.h:190
#define STREAM_FLAGS_FOR_PACKET(p)
Definition stream.h:30
Structure to hold thread specific data for all decode modules.
Definition decode.h:963
uint16_t counter_flow_active
Definition decode.h:1034
uint16_t counter_tcp_active_sessions
Definition decode.h:1032
DetectPacketHookFunc PreFlowHook
Definition detect.h:1158
DetectPacketHookFunc PreStreamHook
Definition detect.h:1153
DetectEngineCtx * de_ctx
Definition detect.h:1364
FlowQueuePrivate spare_queue
Definition flow.h:544
DecodeThreadVars * dtv
Definition flow.h:545
FlowQueuePrivate work_queue
Definition flow.h:546
uint32_t flows_aside_needs_work
Definition flow-worker.c:62
uint32_t flows_aside_pkt_inject
Definition flow-worker.c:63
PacketQueueNoLock pq
Definition flow-worker.c:87
uint16_t flows_injected_max
Definition flow-worker.c:92
DecodeThreadVars * dtv
Definition flow-worker.c:67
uint16_t flows_aside_needs_work
Definition flow-worker.c:94
uint16_t flows_aside_pkt_inject
Definition flow-worker.c:95
SC_ATOMIC_DECLARE(bool, flush_ack)
SC_ATOMIC_DECLARE(DetectEngineThreadCtxPtr, detect_thread)
struct FlowWorkerThreadData_::@127 cnt
uint16_t local_bypass_bytes
Definition flow-worker.c:82
StreamTcpThread * stream_thread
Definition flow-worker.c:70
FlowEndCounters fec
Definition flow-worker.c:97
FlowLookupStruct fls
Definition flow-worker.c:88
Flow data structure.
Definition flow.h:356
uint8_t ffr_tc
Definition flow.h:388
uint8_t proto
Definition flow.h:378
uint32_t flags
Definition flow.h:421
uint8_t ffr_ts
Definition flow.h:387
uint8_t flow_end_flags
Definition flow.h:447
FlowStateType flow_state
Definition flow.h:412
void * protoctx
Definition flow.h:441
AppLayerParserState * alparser
Definition flow.h:478
uint8_t protomap
Definition flow.h:445
uint8_t ffr
Definition flow.h:390
simple fifo queue for packets
uint8_t flowflags
Definition decode.h:532
uint64_t pcap_cnt
Definition decode.h:626
SCTime_t ts
Definition decode.h:555
uint8_t pkt_src
Definition decode.h:611
uint8_t app_update_direction
Definition decode.h:535
struct Flow_ * flow
Definition decode.h:546
uint32_t flags
Definition decode.h:544
uint8_t proto
Definition decode.h:523
TcpReassemblyThreadCtx * ra_ctx
Definition stream-tcp.h:117
Per thread variable structure.
Definition threadvars.h:58
char * printable_name
Definition threadvars.h:66
struct FlowQueue_ * flow_queue
Definition threadvars.h:135
const char * name
Definition tm-modules.h:48
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition tm-modules.h:53
bool(* ThreadBusy)(ThreadVars *tv, void *thread_data)
Definition tm-modules.h:67
uint8_t cap_flags
Definition tm-modules.h:77
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition tm-modules.h:56
uint8_t flags
Definition tm-modules.h:80
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition tm-modules.h:51
#define BUG_ON(x)
TmModule tmm_modules[TMM_SIZE]
Definition tm-modules.c:29
#define TM_FLAG_FLOWWORKER_TM
Definition tm-modules.h:34
@ TMM_FLOWWORKER
@ TM_ECODE_FAILED
@ TM_ECODE_OK
@ TM_ECODE_DONE
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
void PacketPoolReturnPacket(Packet *p)
Return packet to Packet pool.
uint32_t cnt
#define SC_ATOMIC_CAS(name, cmpval, newval)
atomic Compare and Switch
#define SC_ATOMIC_INITPTR(name)
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
#define BOOL2STR(b)
Definition util-debug.h:535
#define SCLogDebug(...)
Definition util-debug.h:275
#define SCLogError(...)
Macro used to log ERROR messages.
Definition util-debug.h:267
#define SCFree(p)
Definition util-mem.h:61
#define SCCalloc(nm, sz)
Definition util-mem.h:53
#define likely(expr)
#define FLOWWORKER_PROFILING_END(p, id)
#define FLOWWORKER_PROFILING_START(p, id)
void TimeSetByThread(const int thread_id, SCTime_t tv)
Definition util-time.c:116
#define DEBUG_VALIDATE_BUG_ON(exp)
#define DEBUG_ASSERT_FLOW_LOCKED(f)