For nfpcapd, use more efficient spin locks for thread sync

This commit is contained in:
Peter Haag 2016-11-26 18:08:52 +01:00
parent 4c712ee62a
commit f67d99e2e0
6 changed files with 111 additions and 49 deletions

View File

@ -48,13 +48,29 @@
#include <assert.h> #include <assert.h>
#include "rbtree.h" #include "rbtree.h"
#include "util.h"
#include "nffile.h" #include "nffile.h"
#include "bookkeeper.h" #include "bookkeeper.h"
#include "nfxstat.h" #include "nfxstat.h"
#include "collector.h" #include "collector.h"
#include "flowtree.h"
#include "netflow_pcap.h" #include "netflow_pcap.h"
#include "util.h"
#include "flowtree.h"
static void spin_lock(int *p);
static void spin_unlock(int volatile *p);
static void spin_lock(int *p) {
while(!__sync_bool_compare_and_swap(p, 0, 1));
}
static void spin_unlock(int volatile *p) {
__asm volatile (""); // acts as a memory barrier.
*p = 0;
}
#define GetTreeLock(a) spin_lock(&((a)->list_lock))
#define ReleaseTreeLock(a) spin_unlock(&((a)->list_lock))
static int FlowNodeCMP(struct FlowNode *e1, struct FlowNode *e2); static int FlowNodeCMP(struct FlowNode *e1, struct FlowNode *e2);
@ -376,6 +392,9 @@ NodeList_t *NodeList;
NodeList->list = NULL; NodeList->list = NULL;
NodeList->last = NULL; NodeList->last = NULL;
NodeList->length = 0; NodeList->length = 0;
NodeList->list_lock = 0;
NodeList->waiting = 0;
NodeList->waits = 0;
pthread_mutex_init(&NodeList->m_list, NULL); pthread_mutex_init(&NodeList->m_list, NULL);
pthread_cond_init(&NodeList->c_list, NULL); pthread_cond_init(&NodeList->c_list, NULL);
@ -392,6 +411,7 @@ void DisposeNodeList(NodeList_t *NodeList) {
LogError("Try to free non empty NodeList"); LogError("Try to free non empty NodeList");
return; return;
} }
printf("FREE - waitrs: %u\n", NodeList->waits);
free(NodeList); free(NodeList);
} // End of DisposeNodeList } // End of DisposeNodeList
@ -433,7 +453,8 @@ len, mem, proto, loops, Allocated, CacheOverflow);
void Push_Node(NodeList_t *NodeList, struct FlowNode *node) { void Push_Node(NodeList_t *NodeList, struct FlowNode *node) {
pthread_mutex_lock(&NodeList->m_list); GetTreeLock(NodeList);
// pthread_mutex_lock(&NodeList->m_list);
if ( NodeList->length == 0 ) { if ( NodeList->length == 0 ) {
// empty list // empty list
NodeList->list = node; NodeList->list = node;
@ -452,8 +473,12 @@ void Push_Node(NodeList_t *NodeList, struct FlowNode *node) {
(unsigned long long)node, proto, NodeList->length, (unsigned long long)NodeList->list, (unsigned long long)NodeList->last); (unsigned long long)node, proto, NodeList->length, (unsigned long long)NodeList->list, (unsigned long long)NodeList->last);
ListCheck(NodeList); ListCheck(NodeList);
#endif #endif
pthread_mutex_unlock(&NodeList->m_list); if ( NodeList->waiting ) {
pthread_cond_signal(&NodeList->c_list); pthread_cond_signal(&NodeList->c_list);
}
ReleaseTreeLock(NodeList);
// pthread_mutex_unlock(&NodeList->m_list);
// pthread_cond_signal(&NodeList->c_list);
} // End of Push_Node } // End of Push_Node
@ -461,19 +486,30 @@ struct FlowNode *Pop_Node(NodeList_t *NodeList, int *done) {
struct FlowNode *node; struct FlowNode *node;
int proto; int proto;
GetTreeLock(NodeList);
while ( NodeList->length == 0 && !*done ) {
pthread_mutex_lock(&NodeList->m_list); pthread_mutex_lock(&NodeList->m_list);
while ( NodeList->length == 0 && !*done ) NodeList->waiting = 1;
NodeList->waits++;
ReleaseTreeLock(NodeList);
// sleep ad wait
pthread_cond_wait(&NodeList->c_list, &NodeList->m_list); pthread_cond_wait(&NodeList->c_list, &NodeList->m_list);
if ( NodeList->length == 0 && *done ) { // wake up
GetTreeLock(NodeList);
NodeList->waiting = 0;
pthread_mutex_unlock(&NodeList->m_list); pthread_mutex_unlock(&NodeList->m_list);
}
if ( NodeList->length == 0 && *done ) {
ReleaseTreeLock(NodeList);
dbg_printf("Pop_Node done\n"); dbg_printf("Pop_Node done\n");
return NULL; return NULL;
} }
if ( NodeList->list == NULL ) { if ( NodeList->list == NULL ) {
// should never happen - list is supposed to have at least one item // should never happen - list is supposed to have at least one item
pthread_mutex_unlock(&NodeList->m_list); ReleaseTreeLock(NodeList);
LogError("Unexpected empty FlowNode_ProcessList"); LogError("Unexpected empty FlowNode_ProcessList");
return NULL; return NULL;
} }
@ -496,7 +532,7 @@ int proto;
ListCheck(NodeList); ListCheck(NodeList);
#endif #endif
pthread_mutex_unlock(&NodeList->m_list); ReleaseTreeLock(NodeList);
return node; return node;
} // End of Pop_Node } // End of Pop_Node

View File

@ -30,7 +30,21 @@
* *
*/ */
#ifndef _FLOWTREE_H
#define _FLOWTREE_H 1
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#ifdef HAVE_STDINT_H
#include <stdint.h>
#endif
#include <time.h>
#include "rbtree.h" #include "rbtree.h"
#include "nffile.h"
#define v4 ip_union._v4 #define v4 ip_union._v4
#define v6 ip_union._v6 #define v6 ip_union._v6
@ -95,9 +109,12 @@ struct FlowNode {
typedef struct NodeList_s { typedef struct NodeList_s {
struct FlowNode *list; struct FlowNode *list;
struct FlowNode *last; struct FlowNode *last;
sig_atomic_t list_lock;
pthread_mutex_t m_list; pthread_mutex_t m_list;
pthread_cond_t c_list; pthread_cond_t c_list;
uint32_t length; uint32_t length;
uint32_t waiting;
uint64_t waits;
} NodeList_t; } NodeList_t;
@ -143,4 +160,4 @@ void DumpList(NodeList_t *NodeList);
// Stat functions // Stat functions
void DumpNodeStat(void); void DumpNodeStat(void);
#endif // _FLOWTREE_H

View File

@ -1,4 +1,5 @@
/* /*
* Copyright (c) 2016, Peter Haag
* Copyright (c) 2014, Peter Haag * Copyright (c) 2014, Peter Haag
* Copyright (c) 2013, Peter Haag * Copyright (c) 2013, Peter Haag
* All rights reserved. * All rights reserved.
@ -27,15 +28,16 @@
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE. * POSSIBILITY OF SUCH DAMAGE.
* *
* $Author:$
*
* $Id:$
*
* $LastChangedRevision:$
*
*
*/ */
#ifndef _NETFLOW_PCAP_H
#define _NETFLOW_PCAP_H 1
#include <time.h>
#include "collector.h"
#include "flowtree.h"
int Init_pcap2nf(void); int Init_pcap2nf(void);
int StorePcapFlow(FlowSource_t *fs, struct FlowNode *Node); int StorePcapFlow(FlowSource_t *fs, struct FlowNode *Node);
@ -46,3 +48,5 @@ void SetClient_latency(struct FlowNode *node, struct timeval *t_packet);
void SetApplication_latency(struct FlowNode *node, struct timeval *t_packet); void SetApplication_latency(struct FlowNode *node, struct timeval *t_packet);
#endif

View File

@ -245,9 +245,9 @@ static void usage(char *name) {
"-i device\tspecify a device\n" "-i device\tspecify a device\n"
"-r pcapfile\tspecify a file to read from\n" "-r pcapfile\tspecify a file to read from\n"
"-B cache buckets\tset the number of cache buckets. (default 1048576)\n" "-B cache buckets\tset the number of cache buckets. (default 1048576)\n"
"-s snaplen\tset the snapshot length\n" "-s snaplen\tset the snapshot length - default 1500\n"
"-l flowdir \tset the flow output directory. (no default) \n" "-l flowdir \tset the flow output directory. (no default) \n"
"-l pcapdir \tset the pcapdir directory. (optional) \n" "-p pcapdir \tset the pcapdir directory. (optional) \n"
"-S subdir\tSub directory format. see nfcapd(1) for format\n" "-S subdir\tSub directory format. see nfcapd(1) for format\n"
"-I Ident\tset the ident string for stat file. (default 'none')\n" "-I Ident\tset the ident string for stat file. (default 'none')\n"
"-P pidfile\tset the PID file\n" "-P pidfile\tset the PID file\n"
@ -1184,7 +1184,7 @@ pcap_dev_t *pcap_dev;
p_packet_thread_args_t *p_packet_thread_args; p_packet_thread_args_t *p_packet_thread_args;
p_flow_thread_args_t *p_flow_thread_args; p_flow_thread_args_t *p_flow_thread_args;
snaplen = 100; snaplen = 1500;
do_daemonize = 0; do_daemonize = 0;
launcher_pid = 0; launcher_pid = 0;
device = NULL; device = NULL;

View File

@ -30,7 +30,9 @@
* *
*/ */
#ifdef HAVE_CONFIG_H
#include "config.h" #include "config.h"
#endif
#ifdef HAVE_FEATURES_H #ifdef HAVE_FEATURES_H
#include <features.h> #include <features.h>
@ -63,7 +65,6 @@
#include <pcap.h> #include <pcap.h>
#include "util.h"
#include "nffile.h" #include "nffile.h"
#include "bookkeeper.h" #include "bookkeeper.h"
#include "nfxstat.h" #include "nfxstat.h"
@ -72,14 +73,9 @@
#include "ipfrag.h" #include "ipfrag.h"
#include "pcaproc.h" #include "pcaproc.h"
#include "content_dns.h" #include "content_dns.h"
#include "util.h"
#include "netflow_pcap.h" #include "netflow_pcap.h"
#ifndef DEVEL
# define dbg_printf(...) /* printf(__VA_ARGS__) */
#else
# define dbg_printf(...) printf(__VA_ARGS__)
#endif
static inline void ProcessTCPFlow(FlowSource_t *fs, struct FlowNode *NewNode ); static inline void ProcessTCPFlow(FlowSource_t *fs, struct FlowNode *NewNode );
static inline void ProcessUDPFlow(FlowSource_t *fs, struct FlowNode *NewNode ); static inline void ProcessUDPFlow(FlowSource_t *fs, struct FlowNode *NewNode );
@ -393,7 +389,7 @@ void ProcessPacket(NodeList_t *NodeList, pcap_dev_t *pcap_dev, const struct pcap
struct FlowNode *Node; struct FlowNode *Node;
struct ip *ip; struct ip *ip;
void *payload, *defragmented; void *payload, *defragmented;
uint32_t size_ip, offset, data_len, payload_len; uint32_t size_ip, offset, data_len, payload_len, bytes;
uint16_t version, ethertype, proto; uint16_t version, ethertype, proto;
#ifdef DEVEL #ifdef DEVEL
char s1[64]; char s1[64];
@ -519,14 +515,11 @@ pkt->vlans[pkt->vlan_count].pcp = (p[0] >> 5) & 7;
// XXX Extension headers not processed // XXX Extension headers not processed
proto = ip6->ip6_ctlun.ip6_un1.ip6_un1_nxt; proto = ip6->ip6_ctlun.ip6_un1.ip6_un1_nxt;
payload_len = ntohs(ip6->ip6_ctlun.ip6_un1.ip6_un1_plen); payload_len = bytes = ntohs(ip6->ip6_ctlun.ip6_un1.ip6_un1_plen);
if (data_len < (payload_len + size_ip) ) { if (data_len < (payload_len + size_ip) ) {
LogError("Packet: %u Length error: data_len: %u < payload_len %u + size IPv6", // capture len was limited - so adapt payload_len
pkg_cnt, data_len, payload_len); payload_len = data_len - size_ip;
pcap_dev->proc_stat.short_snap++;
Free_Node(Node);
return;
} }
dbg_printf("Packet IPv6, SRC %s, DST %s, ", dbg_printf("Packet IPv6, SRC %s, DST %s, ",
@ -562,13 +555,13 @@ pkt->vlans[pkt->vlan_count].pcp = (p[0] >> 5) & 7;
dbg_printf("size IP hader: %u, len: %u, %u\n", size_ip, ip->ip_len, payload_len); dbg_printf("size IP hader: %u, len: %u, %u\n", size_ip, ip->ip_len, payload_len);
payload_len -= size_ip; // ajust length compatibel IPv6 payload_len -= size_ip; // ajust length compatibel IPv6
bytes = payload_len;
payload = (void *)ip + size_ip; payload = (void *)ip + size_ip;
proto = ip->ip_p; proto = ip->ip_p;
if (data_len < (payload_len + size_ip) ) { if (data_len < (payload_len + size_ip) ) {
LogError("Packet: %u Length error: data_len: %u < payload_len %u + size IPv4, captured: %u, hdr len: %u", // capture len was limited - so adapt payload_len
pkg_cnt, data_len, payload_len, hdr->caplen, hdr->len); payload_len = data_len - size_ip;
pcap_dev->proc_stat.short_snap++; pcap_dev->proc_stat.short_snap++;
} }
@ -604,9 +597,9 @@ pkt->vlans[pkt->vlan_count].pcp = (p[0] >> 5) & 7;
} }
Node->packets = 1; Node->packets = 1;
Node->bytes = payload_len; Node->bytes = bytes;
Node->proto = proto; Node->proto = proto;
dbg_printf("Size: %u\n", payload_len); dbg_printf("Payload: %u bytes, Full packet: %u bytes\n", payload_len, bytes);
// TCP/UDP decoding // TCP/UDP decoding
switch (proto) { switch (proto) {
@ -625,7 +618,7 @@ pkt->vlans[pkt->vlan_count].pcp = (p[0] >> 5) & 7;
} }
uint32_t size_udp_payload = ntohs(udp->uh_ulen) - 8; uint32_t size_udp_payload = ntohs(udp->uh_ulen) - 8;
if ( (payload_len - sizeof(struct udphdr)) != size_udp_payload ) { if ( (bytes == payload_len ) && (payload_len - sizeof(struct udphdr)) != size_udp_payload ) {
LogError("UDP payload legth error: Expected %u, have %u bytes\n", LogError("UDP payload legth error: Expected %u, have %u bytes\n",
size_udp_payload, (payload_len - (unsigned)sizeof(struct udphdr))); size_udp_payload, (payload_len - (unsigned)sizeof(struct udphdr)));
@ -647,7 +640,7 @@ pkt->vlans[pkt->vlan_count].pcp = (p[0] >> 5) & 7;
if ( hdr->caplen == hdr->len ) { if ( hdr->caplen == hdr->len ) {
// process payload of full packets // process payload of full packets
if ( Node->src_port == 53 || Node->dst_port == 53 ) if ( (bytes == payload_len) && (Node->src_port == 53 || Node->dst_port == 53) )
content_decode_dns(Node, payload, payload_len); content_decode_dns(Node, payload, payload_len);
} }
Push_Node(NodeList, Node); Push_Node(NodeList, Node);
@ -671,7 +664,6 @@ pkt->vlans[pkt->vlan_count].pcp = (p[0] >> 5) & 7;
payload = payload + size_tcp; payload = payload + size_tcp;
payload_len -= size_tcp; payload_len -= size_tcp;
dbg_printf("Size TCP header: %u, size TCP payload: %u ", size_tcp, payload_len); dbg_printf("Size TCP header: %u, size TCP payload: %u ", size_tcp, payload_len);
// XXX Debug stuff - remove when released ...
dbg_printf("src %i, DST %i, flags %i : ", dbg_printf("src %i, DST %i, flags %i : ",
ntohs(tcp->th_sport), ntohs(tcp->th_dport), tcp->th_flags); ntohs(tcp->th_sport), ntohs(tcp->th_dport), tcp->th_flags);

View File

@ -1,4 +1,5 @@
/* /*
* Copyright (c) 2016, Peter Haag
* Copyright (c) 2014, Peter Haag * Copyright (c) 2014, Peter Haag
* All rights reserved. * All rights reserved.
* *
@ -26,14 +27,25 @@
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE. * POSSIBILITY OF SUCH DAMAGE.
* *
* $Author$
*
* $Id$
*
* $LastChangedRevision$
*
*/ */
#ifndef _PCAPROC_H
#define _PCAPROC_H 1
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#ifdef HAVE_STDINT_H
#include <stdint.h>
#endif
#include <time.h>
#include <pthread.h>
#include <pcap.h>
#include "flowtree.h"
typedef struct proc_stat_s { typedef struct proc_stat_s {
uint32_t packets; uint32_t packets;
uint32_t unknown; uint32_t unknown;
@ -75,3 +87,4 @@ void ProcessFlowNode(FlowSource_t *fs, struct FlowNode *node);
void ProcessPacket(NodeList_t *nodeList, pcap_dev_t *pcap_dev, const struct pcap_pkthdr *hdr, const u_char *data); void ProcessPacket(NodeList_t *nodeList, pcap_dev_t *pcap_dev, const struct pcap_pkthdr *hdr, const u_char *data);
#endif // _PCAPROC_H