Add multiple packet repeaters to nfcapd/sfcapd

This commit is contained in:
Peter Haag 2018-06-24 12:42:05 +02:00
parent 214091e490
commit 9d187da615
7 changed files with 98 additions and 51 deletions

View File

@ -1,5 +1,6 @@
2018-06-24 2018-06-24
- Fix bookkeeper type - use key_t - Fix bookkeeper type - use key_t
- Add multiple packet repeaters to nfcapd/sfcapd. Up to 8 repeaters (-R) can be defined.
2018-05-06 2018-05-06
- New bookkeeper hash broke NfSen. Fixed. ported back to release 1.6.17 - New bookkeeper hash broke NfSen. Fixed. ported back to release 1.6.17

View File

@ -146,7 +146,7 @@ static void daemonize(void);
static void SetPriv(char *userid, char *groupid ); static void SetPriv(char *userid, char *groupid );
static void run(packet_function_t receive_packet, int socket, send_peer_t peer, static void run(packet_function_t receive_packet, int socket, repeater_t *repeater,
time_t twin, time_t t_begin, int report_seq, int use_subdirs, char *time_extension, int compress); time_t twin, time_t t_begin, int report_seq, int use_subdirs, char *time_extension, int compress);
/* Functions */ /* Functions */
@ -166,7 +166,7 @@ static void usage(char *name) {
"-H Add port histogram data to flow file.(default 'no')\n" "-H Add port histogram data to flow file.(default 'no')\n"
"-n Ident,IP,logdir\tAdd this flow source - multiple streams\n" "-n Ident,IP,logdir\tAdd this flow source - multiple streams\n"
"-P pidfile\tset the PID file\n" "-P pidfile\tset the PID file\n"
"-R IP[/port]\tRepeat incoming packets to IP address/port\n" "-R IP[/port]\tRepeat incoming packets to IP address/port. Max 8 repeaters.\n"
"-s rate\tset default sampling rate (default 1)\n" "-s rate\tset default sampling rate (default 1)\n"
"-x process\tlaunch process after a new file becomes available\n" "-x process\tlaunch process after a new file becomes available\n"
"-z\t\tLZO compress flows in output file.\n" "-z\t\tLZO compress flows in output file.\n"
@ -358,7 +358,7 @@ int err;
#include "nffile_inline.c" #include "nffile_inline.c"
#include "collector_inline.c" #include "collector_inline.c"
static void run(packet_function_t receive_packet, int socket, send_peer_t peer, static void run(packet_function_t receive_packet, int socket, repeater_t *repeater,
time_t twin, time_t t_begin, int report_seq, int use_subdirs, char *time_extension, int compress) { time_t twin, time_t t_begin, int report_seq, int use_subdirs, char *time_extension, int compress) {
common_flow_header_t *nf_header; common_flow_header_t *nf_header;
FlowSource_t *fs; FlowSource_t *fs;
@ -422,6 +422,7 @@ srecord_t *commbuff;
*/ */
while ( 1 ) { while ( 1 ) {
struct timeval tv; struct timeval tv;
int i;
/* read next bunch of data into beginn of input buffer */ /* read next bunch of data into beginn of input buffer */
if ( !done) { if ( !done) {
@ -443,12 +444,15 @@ srecord_t *commbuff;
continue; continue;
} }
if ( peer.hostname ) { i = 0;
while ( repeater[i].hostname && (i < MAX_REPEATERS)) {
ssize_t len; ssize_t len;
len = sendto(peer.sockfd, in_buff, cnt, 0, (struct sockaddr *)&(peer.addr), peer.addrlen); len = sendto(repeater[i].sockfd, in_buff, cnt, 0,
(struct sockaddr *)&(repeater[i].addr), repeater[i].addrlen);
if ( len < 0 ) { if ( len < 0 ) {
LogError("ERROR: sendto(): %s", strerror(errno)); LogError("ERROR: sendto(): %s", strerror(errno));
} }
i++;
} }
} }
@ -745,14 +749,14 @@ char *userid, *groupid, *checkptr, *listenport, *mcastgroup, *extension_tags;
char *Ident, *dynsrcdir, *time_extension, pidfile[MAXPATHLEN]; char *Ident, *dynsrcdir, *time_extension, pidfile[MAXPATHLEN];
struct stat fstat; struct stat fstat;
packet_function_t receive_packet; packet_function_t receive_packet;
send_peer_t peer; repeater_t repeater[MAX_REPEATERS];
FlowSource_t *fs; FlowSource_t *fs;
struct sigaction act; struct sigaction act;
int family, bufflen; int family, bufflen;
time_t twin, t_start; time_t twin, t_start;
int sock, synctime, do_daemonize, expire, spec_time_extension, report_sequence; int sock, synctime, do_daemonize, expire, spec_time_extension, report_sequence;
int subdir_index, sampling_rate, compress; int subdir_index, sampling_rate, compress;
int c; int c, i;
#ifdef PCAP #ifdef PCAP
char *pcap_file; char *pcap_file;
@ -780,8 +784,10 @@ char *pcap_file;
expire = 0; expire = 0;
sampling_rate = 1; sampling_rate = 1;
compress = NOT_COMPRESSED; compress = NOT_COMPRESSED;
memset((void *)&peer, 0, sizeof(send_peer_t)); memset((void *)&repeater, 0, sizeof(repeater));
peer.family = AF_UNSPEC; for ( i = 0; i < MAX_REPEATERS; i++ ) {
repeater[i].family = AF_UNSPEC;
}
Ident = "none"; Ident = "none";
FlowSource = NULL; FlowSource = NULL;
extension_tags = DefaultExtensions; extension_tags = DefaultExtensions;
@ -886,14 +892,23 @@ char *pcap_file;
pidfile[MAXPATHLEN-1] = 0; pidfile[MAXPATHLEN-1] = 0;
break; break;
case 'R': { case 'R': {
char *port, *hostname;
char *p = strchr(optarg, '/'); char *p = strchr(optarg, '/');
int i = 0;
if ( p ) { if ( p ) {
*p++ = '\0'; *p++ = '\0';
peer.port = strdup(p); port = strdup(p);
} else { } else {
peer.port = DEFAULTCISCOPORT; port = DEFAULTCISCOPORT;
} }
peer.hostname = strdup(optarg); hostname = strdup(optarg);
while ( repeater[i].hostname && (i < MAX_REPEATERS) ) i++;
if ( i == MAX_REPEATERS ) {
fprintf(stderr, "Too many packet repeaters! Max: %i repeaters allowed.\n", MAX_REPEATERS);
exit(255);
}
repeater[i].hostname = hostname;
repeater[i].port = port;
break; } break; }
case 'r': case 'r':
@ -1040,12 +1055,14 @@ char *pcap_file;
exit(255); exit(255);
} }
if ( peer.hostname ) { i = 0;
peer.sockfd = Unicast_send_socket (peer.hostname, peer.port, peer.family, bufflen, while ( repeater[i].hostname && (i < MAX_REPEATERS) ) {
&peer.addr, &peer.addrlen ); repeater[i].sockfd = Unicast_send_socket (repeater[i].hostname, repeater[i].port, repeater[i].family, bufflen,
if ( peer.sockfd <= 0 ) &repeater[i].addr, &repeater[i].addrlen );
if ( repeater[i].sockfd <= 0 )
exit(255); exit(255);
LogInfo("Replay flows to host: %s port: %s", peer.hostname, peer.port); LogInfo("Replay flows to host: %s port: %s", repeater[i].hostname, repeater[i].port);
i++;
} }
if ( sampling_rate < 0 ) { if ( sampling_rate < 0 ) {
@ -1205,7 +1222,7 @@ char *pcap_file;
sigaction(SIGCHLD, &act, NULL); sigaction(SIGCHLD, &act, NULL);
LogInfo("Startup."); LogInfo("Startup.");
run(receive_packet, sock, peer, twin, t_start, report_sequence, subdir_index, run(receive_packet, sock, repeater, twin, t_start, report_sequence, subdir_index,
time_extension, compress); time_extension, compress);
close(sock); close(sock);
kill_launcher(launcher_pid); kill_launcher(launcher_pid);

View File

@ -178,12 +178,11 @@ int error, p, sockfd;
int Unicast_send_socket (const char *hostname, const char *sendport, int family, int Unicast_send_socket (const char *hostname, const char *sendport, int family,
unsigned int wmem_size, struct sockaddr_storage *addr, int *addrlen) { unsigned int wmem_size, struct sockaddr_storage *addr, int *addrlen) {
struct addrinfo hints, *res, *ressave; struct addrinfo hints, *res, *ressave;
int n, sockfd; int error, sockfd;
unsigned int wmem_actual; unsigned int wmem_actual;
socklen_t optlen; socklen_t optlen;
if ( !hostname || !sendport ) { if ( !hostname || !sendport ) {
fprintf(stderr, "hostname and listen port required!\n");
LogError("hostname and listen port required!"); LogError("hostname and listen port required!");
return -1; return -1;
} }
@ -194,10 +193,9 @@ socklen_t optlen;
hints.ai_family = family; hints.ai_family = family;
hints.ai_socktype = SOCK_DGRAM; hints.ai_socktype = SOCK_DGRAM;
n = getaddrinfo(hostname, sendport, &hints, &res); error = getaddrinfo(hostname, sendport, &hints, &res);
if ( error ) {
if ( n < 0 ) { LogError("getaddrinfo() error: %s", gai_strerror(error));
fprintf(stderr, "getaddrinfo error: [%s]\n", strerror(errno));
return -1; return -1;
} }
@ -205,27 +203,28 @@ socklen_t optlen;
sockfd = -1; sockfd = -1;
while (res) { while (res) {
sockfd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); sockfd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if ( sockfd < 0 ) {
if ( !(sockfd < 0) ) { LogError("socket() error: could not open the requested socket: %s", strerror (errno));
} else {
// socket call was successsful // socket call was successsful
if (connect(sockfd, res->ai_addr, res->ai_addrlen) == 0) { if (connect(sockfd, res->ai_addr, res->ai_addrlen) < 0) {
// unsuccessful connect :(
LogError("connect() error: could not open the requested socket: %s", strerror (errno));
close(sockfd);
sockfd = -1;
} else {
// connect successful - we are done // connect successful - we are done
close(sockfd); close(sockfd);
// ok - we need now an unconnected socket // ok - we need now an unconnected socket
sockfd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); sockfd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
break; break;
} }
// unsuccessful connect :(
close(sockfd);
sockfd = -1;
} }
res=res->ai_next; res=res->ai_next;
} }
if (sockfd < 0) { if (sockfd < 0) {
freeaddrinfo(ressave); freeaddrinfo(ressave);
fprintf(stderr, "Send socket error: could not open the requested socket: %s\n", strerror (errno));
LogError("Send socket error: could not open the requested socket: %s", strerror (errno));
return -1; return -1;
} }

View File

@ -57,6 +57,17 @@ typedef struct send_peer_s {
void *endp; void *endp;
} send_peer_t; } send_peer_t;
typedef struct repeater_s {
char *hostname;
char *port;
struct sockaddr_storage addr;
int addrlen;
int family;
int sockfd;
} repeater_t;
#define MAX_REPEATERS 8
/* Function prototypes */ /* Function prototypes */
int Unicast_receive_socket(const char *bindhost, const char *listenport, int family, int sockbuflen ); int Unicast_receive_socket(const char *bindhost, const char *listenport, int family, int sockbuflen );

View File

@ -1,4 +1,5 @@
/* /*
* Copyright (c) 2018, Peter Haag
* Copyright (c) 2017, Peter Haag * Copyright (c) 2017, Peter Haag
* Copyright (c) 2016, Peter Haag * Copyright (c) 2016, Peter Haag
* Copyright (c) 2014, Peter Haag * Copyright (c) 2014, Peter Haag
@ -124,7 +125,7 @@ static void daemonize(void);
static void SetPriv(char *userid, char *groupid ); static void SetPriv(char *userid, char *groupid );
static void run(packet_function_t receive_packet, int socket, send_peer_t peer, static void run(packet_function_t receive_packet, int socket, repeater_t *repeater,
time_t twin, time_t t_begin, int report_seq, int use_subdirs, char *time_extension, int compress); time_t twin, time_t t_begin, int report_seq, int use_subdirs, char *time_extension, int compress);
/* Functions */ /* Functions */
@ -144,7 +145,7 @@ static void usage(char *name) {
"-H Add port histogram data to flow file.(default 'no')\n" "-H Add port histogram data to flow file.(default 'no')\n"
"-n Ident,IP,logdir\tAdd this flow source - multiple streams\n" "-n Ident,IP,logdir\tAdd this flow source - multiple streams\n"
"-P pidfile\tset the PID file\n" "-P pidfile\tset the PID file\n"
"-R IP[/port]\tRepeat incoming packets to IP address/port\n" "-R IP[/port]\tRepeat incoming packets to IP address/port. Max 8 repeaters\n"
"-x process\tlaunch process after a new file becomes available\n" "-x process\tlaunch process after a new file becomes available\n"
"-z\t\tLZO compress flows in output file.\n" "-z\t\tLZO compress flows in output file.\n"
"-y\t\tLZ4 compress flows in output file.\n" "-y\t\tLZ4 compress flows in output file.\n"
@ -332,7 +333,7 @@ int err;
#include "nffile_inline.c" #include "nffile_inline.c"
#include "collector_inline.c" #include "collector_inline.c"
static void run(packet_function_t receive_packet, int socket, send_peer_t peer, static void run(packet_function_t receive_packet, int socket, repeater_t *repeater,
time_t twin, time_t t_begin, int report_seq, int use_subdirs, char *time_extension, int compress) { time_t twin, time_t t_begin, int report_seq, int use_subdirs, char *time_extension, int compress) {
FlowSource_t *fs; FlowSource_t *fs;
struct sockaddr_storage sf_sender; struct sockaddr_storage sf_sender;
@ -393,6 +394,7 @@ srecord_t *commbuff;
*/ */
while ( 1 ) { while ( 1 ) {
struct timeval tv; struct timeval tv;
int i;
/* read next bunch of data into beginn of input buffer */ /* read next bunch of data into beginn of input buffer */
if ( !done) { if ( !done) {
@ -415,12 +417,15 @@ srecord_t *commbuff;
continue; continue;
} }
if ( peer.hostname ) { i = 0;
while ( repeater[i].hostname && (i < MAX_REPEATERS)) {
ssize_t len; ssize_t len;
len = sendto(peer.sockfd, in_buff, cnt, 0, (struct sockaddr *)&(peer.addr), peer.addrlen); len = sendto(repeater[i].sockfd, in_buff, cnt, 0,
(struct sockaddr *)&(repeater[i].addr), repeater[i].addrlen);
if ( len < 0 ) { if ( len < 0 ) {
LogError("ERROR: sendto(): %s", strerror(errno)); LogError("ERROR: sendto(): %s", strerror(errno));
} }
i++;
} }
} }
@ -658,14 +663,14 @@ char *userid, *groupid, *checkptr, *listenport, *mcastgroup, *extension_tags;
char *Ident, *pcap_file, *time_extension, pidfile[MAXPATHLEN]; char *Ident, *pcap_file, *time_extension, pidfile[MAXPATHLEN];
struct stat fstat; struct stat fstat;
packet_function_t receive_packet; packet_function_t receive_packet;
send_peer_t peer; repeater_t repeater[MAX_REPEATERS];
FlowSource_t *fs; FlowSource_t *fs;
struct sigaction act; struct sigaction act;
int family, bufflen; int family, bufflen;
time_t twin, t_start; time_t twin, t_start;
int sock, synctime, do_daemonize, expire, spec_time_extension, report_sequence; int sock, synctime, do_daemonize, expire, spec_time_extension, report_sequence;
int subdir_index, compress; int subdir_index, compress;
int c; int c, i;
receive_packet = recvfrom; receive_packet = recvfrom;
verbose = synctime = do_daemonize = 0; verbose = synctime = do_daemonize = 0;
@ -687,8 +692,10 @@ int c;
expire = 0; expire = 0;
spec_time_extension = 0; spec_time_extension = 0;
compress = NOT_COMPRESSED; compress = NOT_COMPRESSED;
memset((void *)&peer, 0, sizeof(send_peer_t)); memset((void *)&repeater, 0, sizeof(repeater));
peer.family = AF_UNSPEC; for ( i = 0; i < MAX_REPEATERS; i++ ) {
repeater[i].family = AF_UNSPEC;
}
Ident = "none"; Ident = "none";
FlowSource = NULL; FlowSource = NULL;
extension_tags = DefaultExtensions; extension_tags = DefaultExtensions;
@ -794,14 +801,23 @@ int c;
pidfile[MAXPATHLEN-1] = 0; pidfile[MAXPATHLEN-1] = 0;
break; break;
case 'R': { case 'R': {
char *port, *hostname;
char *p = strchr(optarg, '/'); char *p = strchr(optarg, '/');
int i = 0;
if ( p ) { if ( p ) {
*p++ = '\0'; *p++ = '\0';
peer.port = strdup(p); port = strdup(p);
} else { } else {
peer.port = DEFAULTSFLOWPORT; port = DEFAULTSFLOWPORT;
} }
peer.hostname = strdup(optarg); hostname = strdup(optarg);
while ( repeater[i].hostname && (i < MAX_REPEATERS) ) i++;
if ( i == MAX_REPEATERS ) {
fprintf(stderr, "Too many packet repeaters! Max: %i repeaters allowed.\n", MAX_REPEATERS);
exit(255);
}
repeater[i].hostname = hostname;
repeater[i].port = port;
break; } break; }
case 'r': case 'r':
@ -914,11 +930,14 @@ int c;
exit(255); exit(255);
} }
if ( peer.hostname ) { i = 0;
peer.sockfd = Unicast_send_socket (peer.hostname, peer.port, peer.family, bufflen, while ( repeater[i].hostname && (i < MAX_REPEATERS) ) {
&peer.addr, &peer.addrlen ); repeater[i].sockfd = Unicast_send_socket (repeater[i].hostname, repeater[i].port, repeater[i].family, bufflen,
if ( peer.sockfd <= 0 ) &repeater[i].addr, &repeater[i].addrlen );
if ( repeater[i].sockfd <= 0 )
exit(255); exit(255);
LogInfo("Replay flows to host: %s port: %s", repeater[i].hostname, repeater[i].port);
i++;
} }
SetPriv(userid, groupid); SetPriv(userid, groupid);
@ -1071,7 +1090,7 @@ int c;
sigaction(SIGCHLD, &act, NULL); sigaction(SIGCHLD, &act, NULL);
LogInfo("Startup."); LogInfo("Startup.");
run(receive_packet, sock, peer, twin, t_start, report_sequence, subdir_index, run(receive_packet, sock, repeater, twin, t_start, report_sequence, subdir_index,
time_extension, compress); time_extension, compress);
close(sock); close(sock);
kill_launcher(launcher_pid); kill_launcher(launcher_pid);

View File

@ -65,7 +65,7 @@ Join the specified IPv4 or IPv6 multicast group for listening.
Enable packet repeater. Send all incoming packets to another \fIhost\fR and \fIport\fR. Enable packet repeater. Send all incoming packets to another \fIhost\fR and \fIport\fR.
\fIhost\fR is either a valid IPv4/IPv6 address, or a valid symbolic hostname, which resolves to \fIhost\fR is either a valid IPv4/IPv6 address, or a valid symbolic hostname, which resolves to
a IPv6 or IPv4 address. \fIport\fR may be omitted and defaults to port 9995. Note: Due to IPv4/IPv6 a IPv6 or IPv4 address. \fIport\fR may be omitted and defaults to port 9995. Note: Due to IPv4/IPv6
accepted addresses the port separator is '/'. accepted addresses the port separator is '/'. Up to 8 repeaters my be defined.
.TP 3 .TP 3
.B -I \fIIdentString ( capital letter i ) .B -I \fIIdentString ( capital letter i )
Specifies an ident string, which describes the source e.g. the Specifies an ident string, which describes the source e.g. the

View File

@ -42,7 +42,7 @@ Join the specified IPv6 or IPv6 multicast group for listening.
Enable packet repeater. Send all incoming packets to another \fIhost\fR and \fIport\fR. Enable packet repeater. Send all incoming packets to another \fIhost\fR and \fIport\fR.
\fIhost\fR is either a valid IPv4/IPv6 address, or a valid simbolic hostname, which resolves to \fIhost\fR is either a valid IPv4/IPv6 address, or a valid simbolic hostname, which resolves to
a IPv6 or IPv4 address. \fIport\fR may be omitted and defaults to port 6343. Note: Due to IPv4/IPv6 a IPv6 or IPv4 address. \fIport\fR may be omitted and defaults to port 6343. Note: Due to IPv4/IPv6
accepted addresses the port separator is '/'. accepted addresses the port separator is '/'. Up to 8 repeaters my be defined.
.TP 3 .TP 3
.B -I \fIIdentString ( capital letter i ) .B -I \fIIdentString ( capital letter i )
Specifies an ident string, which describes the source e.g. the Specifies an ident string, which describes the source e.g. the