#! /usr/bin/perl
#
# Events collector for Vargus
######################
#    Copyright (C) 2010-2012 Michael A. Kangin <mak@complife.ru>
#
#    Copyright: Vargus is under GNU GPL, the GNU General Public License.
#
#    This program is free software; you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation; version 2 dated June, 1991.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    http://www.gnu.org/licenses/gpl-2.0.html
#

use strict;
use warnings;

use Getopt::Long;
use IO::Socket;
use IO::Socket::INET;
use IO::Select;
use IO::Handle;
use Module::Load;
use Proc::Daemon;
use Privileges::Drop;
use Encode qw(encode decode is_utf8);
use DBI;
use Sys::Syslog qw(LOG_DAEMON LOG_ERR LOG_WARNING LOG_NOTICE LOG_INFO LOG_DEBUG);
use Time::HiRes qw(sleep gettimeofday);
use File::Temp qw(tempfile);

use Vargus::Common;
use Vargus::Objects;

use utf8;
binmode(STDOUT, ':utf8');
binmode(STDIN, ':utf8');


our $conf_dir = "/etc/vargus";
our $obj_name;
our $obj_id;
our %vargus_objects;
our %now_events_by_file;
our %preprocessor_state;
our %preprocessor_handlers;
our %db_handlers;
our %logger_handlers;
our $max_lines = 10;
our %fd;
our $informer_port = 9165;

my $stat_dir = "/var/cache/vargus";
my $config_file = $conf_dir . "/events.cfg";

my $vargus_user = 'vargus';
my @events_listeners = ("0.0.0.0:7713/udp", "0.0.0.0:7713/tcp");
my $alerts_port = 7714;
my $bind_address = '0.0.0.0';
my $events_dir = $stat_dir;
my $daemon_pid_file = '/var/run/vargus/_events.pid';
my $daemon_mode = 1;
my %known_agents;
my %sql_access;
my ($dbh, $sth);
my $ext_modules;
my $cam_object;
my $events_type;
my $sock;
my @buffer;
my $proc_title = 'vargus-events';
my $flusher;
my $vargus_server = 'localhost';

my $send2all_sock;
my $send2all_host = 'localhost';
my $send2all_collect_port = "7716/udp";
my $send2all_distribute_port = 7715;
my $send2all_collect = 1;
my $send2all_distribute = 1;




sub close_all_fd {
	foreach my $event_file (keys %fd) {
		close($fd{$event_file}) if $fd{$event_file}->opened();
	}
}

sub prepare_to_die {
	$SIG{INT} = $SIG{TERM} = $SIG{PIPE} = $SIG{CHLD} = $SIG{ALRM} = 'IGNORE';
	$sth->finish if $sth;
	$dbh->disconnect if $dbh;
	close($send2all_sock) if $send2all_sock && $send2all_sock->opened();
	close_all_fd;
	if (-e $daemon_pid_file) {
		open(PIDFILE, $daemon_pid_file) or die;
		my ($test_pid) = <PIDFILE>;
		close(PIDFILE);

		if ($test_pid == $$) {
			kill("TERM", 0);
			unlink $daemon_pid_file;
		}
	}
	die;
}



sub configure {
	my @cfg_body = ();
	my @main_section = ();
	my $is_option;

	open(CFG, $config_file) or log_n_die("Error reading config file $config_file");
	chomp(@cfg_body = <CFG>);
	close(CFG);

	@main_section = expand_macroses(get_cfg_section("main", @cfg_body));

	$sql_access{host} = get_option("sql-host", @main_section);
	$sql_access{db} = get_option("sql-db", @main_section) 
		or log_n_die("No SQL database specified");
	$sql_access{user} = get_option("sql-user", @main_section) 
		or log_n_die("No SQL user specified");
	$sql_access{password} = get_option("sql-password", @main_section) 
		or log_n_die("No SQL password specified");

	$sql_access{dsn} = "DBI:mysql:$sql_access{db}";

	my $events_listeners = get_option("events-listeners", @main_section);
	if ($events_listeners) {
		@events_listeners = split(/[,\s]+/, $events_listeners) or log_n_die("Bad events-listeners format $events_listeners");
		foreach (@events_listeners) {
			s|/|:|g;
			my ($addr, $port, $proto) = split(/:/, $_, 3);
			log_n_die("Bad listener format ($addr, $port, $proto)  $_") unless $addr && $port && $proto;
			log_n_die("Bad port format $port in listener $_") unless $port =~ /^\d+$/ && $port < 65536;
			log_n_die("Bad address format $addr in listener $_") unless $addr =~ /^(\d{1,3}\.){3}\d{1,3}$/;
			log_n_die("Proto must be tcp or udp in listener $_") unless $proto eq 'tcp' || $proto eq 'udp';
		}
	}


	$is_option = get_option("bind-address", @main_section) and 
		$bind_address = $is_option;
	$is_option = get_option("max-lines", @main_section) and 
		$max_lines = $is_option;

	$ext_modules = get_option("extensions", @main_section);



	my @send2all_section = expand_macroses(get_cfg_section("distribute", @cfg_body));

	$is_option = get_option("collect", @send2all_section);
	$send2all_collect = $is_option if defined($is_option);

	$is_option = get_option("distribute", @send2all_section);
	$send2all_distribute = $is_option if defined($is_option);
 
	$is_option = get_option("collect-host", @send2all_section) and
		$send2all_host = $is_option;
 
	$is_option = get_option("collect-port", @send2all_section) and
		$send2all_collect_port = $is_option;

	$is_option = get_option("distribute-port", @send2all_section) and
		$send2all_distribute_port = $is_option;
}


sub keep_eventfile_opened {
	my $event_file = shift;

	unless (-e $event_file) {
		close($fd{$event_file}) if $fd{$event_file} && $fd{$event_file}->opened();
	}

	unless ($fd{$event_file} && $fd{$event_file}->opened()) {
		if (-e $event_file) {
			open($fd{$event_file}, $event_file) or log_n_die("Can't open event file $event_file");
			binmode($fd{$event_file}, ':utf8');
			my @now_events = readline($fd{$event_file});
			$now_events_by_file{$event_file} = [@now_events];
			close($fd{$event_file});
		}
		open($fd{$event_file}, ">> $event_file") or log_n_die("Can't open event file $event_file");
		binmode($fd{$event_file}, ':utf8');
		$fd{$event_file}->autoflush();
	}
}

sub send_to_all {
	my $camera = shift;
	my @now_events = @_;

	my $events = "$camera:" . join("", @now_events);
	my $events_length = sprintf "%08d", (length(Encode::encode_utf8($events)) + 9);

	print $send2all_sock $events_length . "\n" . $events;
}

sub get_port_proto {
	my $string_port = shift;
	my ($port, $proto) = split('/', $string_port);
	unless ($port =~ /\d+/) {
		log_n_die("Wrong port $port ($string_port)");
	}

	if ($proto) {
		unless (grep(/$proto/, ('tcp', 'udp'))) {
			log_n_die("Wrong proto $proto for port $string_port (supported: tcp, udp)");
		}
	} else {
		$proto = 'udp';
	}

	return ($port, $proto);
}


sub distribute_events {
	my %all_events;
	my @events_buf;
	my %known_handles;

	my $c_port;
	my $c_proto;

	sub handle_events {
		my $events = shift;
		my $all_events = shift;
		my $events_buf = shift;

		push(@{$events_buf}, split(/\n/, $events));

		while (@{$events_buf} and not ${$events_buf}[0] =~ /^\d{8}$/) {
			shift @{$events_buf};
		}

		while (@{$events_buf}) {
			my @events_by_cam = ();
			my $cam;

			while (@{$events_buf}) {
				push(@events_by_cam, shift(@{$events_buf}));
				last if @{$events_buf} and ${$events_buf}[0] =~ /^\d{8}$/;
			}
			
			my $length = $events_by_cam[0];
			($cam = $events_by_cam[1]) =~ s/:.*// if $events_by_cam[1];
			my $events = join("\n", @events_by_cam) . "\n";
			my $real_length = length($events);

			if ($length == $real_length) {
				${$all_events}{$cam}{events} = $events;
				${$all_events}{$cam}{is_new} = 1;
			} else {
				unless (@{$events_buf}) {
					push(@{$events_buf}, @events_by_cam);
					last;
				} else {
					s_log(LOG_NOTICE, "Possibly malformed event buffer, some events for $cam lost.");
				}
			}
		}
	}

	sub write_handle {
		my $handle = shift;
		my $msg = shift;
		my $d_sel_ref = shift;
		my $known_handles_ref = shift;

		my $writed = syswrite($handle, $msg, length($msg));
		unless ($writed and $writed == length($msg)) {
			s_log(LOG_ERR, "Unable to send data to " . $handle->peerhost);
			s_log(LOG_DEBUG, "Close distribution connection from " . $handle->peerhost);
			${$d_sel_ref}->remove($handle);
			delete(${$known_handles_ref}{$handle}) if exists ${$known_handles_ref}{$handle};
			close($handle);
		}
	}

	($c_port, $c_proto) = get_port_proto($send2all_collect_port);

	my $pid;
        unless (defined ($pid = fork())) {
                s_log(LOG_WARNING, "Can't fork a new child for events distribute: $!");
        }

        if ($pid) {
                return;
        }

        $0 = $proc_title . ": events distributor";

	$SIG{INT} = $SIG{TERM} = sub {
		prepare_to_die;
	};

	$SIG{PIPE} = 'IGNORE';

	my $d_sock = IO::Socket::INET->new(
		LocalAddr => '0.0.0.0', 
		LocalPort => $send2all_distribute_port,
		Listen => 5,
		Proto => 'tcp',
		ReuseAddr => 1
	) or log_n_die "Can't open events distribute socket ($!)";

	my %c_sock_params = (LocalAddr => '0.0.0.0', LocalPort => $c_port, Proto => $c_proto);
	if ($c_proto eq 'tcp') {
		$c_sock_params{Listen} = 5;
		$c_sock_params{ReuseAddr} = 1;
	} else {
		$c_sock_params{Blocking} = 0;
	}

	my $c_sock = IO::Socket::INET->new(%c_sock_params) or log_n_die "Can't open events collect socket ($!)";

	my $d_sel = IO::Select->new($d_sock) or log_n_die "Can't open (select) events distribute socket ($!)";
	my $c_sel = IO::Select->new($c_sock) or log_n_die "Can't open (select) events collect socket ($!)";

	while (1) {
		if ($c_proto eq 'tcp') {
			for my $handle ($c_sel->can_read(0.01)) {
				if ($handle eq $c_sock) {
					my $connection = $c_sock->accept();
					$c_sel->add($connection);
					s_log(LOG_DEBUG, "Get collect connection from " . $connection->peerhost);
				} else {
					my $events;
					sysread $handle, $events, 16384; 

					if ($events) {
						handle_events($events, \%all_events, \@events_buf);
					} else {
						s_log(LOG_DEBUG, "Close collect connection from " . $handle->peerhost);
						$c_sel->remove($handle);
						close($handle);
					}
				}
			}
		} else {
			my $events;
			sleep(0.01);
			$c_sock->recv($events, 16384);
			s_log(LOG_DEBUG, "Get collect data from " . $c_sock->peerhost) if $c_sock->peerhost;
			handle_events($events, \%all_events, \@events_buf) if $events;
		}

		for my $handle ($d_sel->can_read(0.01)) {
			if ($handle eq $d_sock) {
				my $connection = $d_sock->accept();
				$d_sel->add($connection);
				s_log(LOG_DEBUG, "Get distribution connection from " . $connection->peerhost);
			} else {
				my $fake_read;
				sysread $handle, $fake_read, 16384;
				unless ($fake_read) {
					s_log(LOG_DEBUG, "Close distribution connection from " . $handle->peerhost);
					$d_sel->remove($handle);
					delete($known_handles{$handle}) if exists $known_handles{$handle};
					close($handle);
				}
			}
		}


		for my $handle ($d_sel->can_write(0.01)) {
			if ($known_handles{$handle}) {
				foreach my $cam (keys %all_events) {
					write_handle($handle, $all_events{$cam}{events}, \$d_sel, \%known_handles) 
						if $all_events{$cam}{is_new};
				}
			} else {
				$known_handles{$handle} = 1;
				foreach my $cam (keys %all_events) {
					write_handle($handle, $all_events{$cam}{events}, \$d_sel, \%known_handles);
				}
			}
		}

		
		foreach my $cam (keys %all_events) {
			$all_events{$cam}{is_new} = 0;
		}
	}
}


sub common_preprocessor {
}

sub common_db_handler {
}


sub common_event_logger {
	my $event_file = shift;
	my $event_structure = shift;
	my $time = ${$event_structure}{now};
	my $event = ${$event_structure}{event};
	my $camera = ${$event_structure}{camera};

	keep_eventfile_opened($event_file);

	my @now_events = ();
	if ($now_events_by_file{$event_file}) {
		@now_events = @{$now_events_by_file{$event_file}};
	}

	my $timelabel = sprintf("%02d:%02d:%02d", 
		sub {($_[2], $_[1], $_[0])}->(localtime($time)));

	if ($#now_events + 1 >= $max_lines) {
		shift(@now_events) while ($#now_events + 1 >= $max_lines);
		truncate($fd{$event_file}, 0);
		seek($fd{$event_file}, 0, 0);
		foreach(@now_events) {
			print({$fd{$event_file}} $_);
		}
	}

	push(@now_events, "$timelabel $event\n");
	$now_events_by_file{$event_file} = [@now_events];
	print({$fd{$event_file}} "$timelabel $event\n");
}

sub write_events_db {
	my $last_id;

	return unless $dbh;

	foreach (@buffer) {
		my $now = ${$_}{now};
		my $nowms = ${$_}{nowms};
		my $type = ${$_}{type};
		my $camera = ${$_}{camera};
		my $event = $dbh->quote(${$_}{event});


		my $sql_string = "INSERT INTO events (timestamp, time_ms, camera, event) 
			values (FROM_UNIXTIME($now), $nowms, '$camera', $event);";

		if ($dbh->do($sql_string)) {
			if ($last_id = $dbh->last_insert_id(undef, undef, 'events', 'id')) {
				$db_handlers{$type}->($dbh, $last_id, $_);
			} else {
				s_log(LOG_WARNING, "Error getting last event ID, db_handler $type will not be called " . $dbh->errstr);
			}
		} else {
			s_log(LOG_WARNING, "Error insert event into DB" . $dbh->errstr);
		}
	}
}



sub write_alerts_db {
	return unless $dbh;
	foreach (@buffer) {
		my $id = ${$_}{id};
		my $camera = $dbh->quote(${$_}{camera});
		my $start = ${$_}{start};
		my $end = ${$_}{end};
		my $message = $dbh->quote(${$_}{message});
		
		my $query = 'insert into alerts (id,camera,start_time,end_time,message) values ';
		$query .= "('$id', $camera, FROM_UNIXTIME($start), FROM_UNIXTIME($end), $message)";
		s_log(LOG_DEBUG, "Query: $query");
		$dbh->do($query) or s_log(LOG_WARNING, "Error insert alert into DB" . $dbh->errstr);
	}
}


sub write_alerts_to_file {
	my ($alerts_sav, $alerts_sav_name) = tempfile("$stat_dir/alerts.d/alerts.XXXXXX");
	binmode($alerts_sav, ':utf8');
	foreach (@buffer) {
		my $id = ${$_}{id};
		my $camera = ${$_}{camera};
		my $start = ${$_}{start};
		my $end = ${$_}{end};
		my $message = ${$_}{message};
		print $alerts_sav "$id;$camera;$start;$end;$message\n";
	}
	close($alerts_sav);
	s_log(LOG_WARNING, "Unsaved alerts flush to temporary file $alerts_sav_name");
}

sub write_events_to_file {
	my ($events_sav, $events_sav_name) = tempfile("$stat_dir/events.d/events.XXXXXX");
	binmode($events_sav, ':utf8');
	foreach (@buffer) {
		my $now = ${$_}{now};
		my $nowms = ${$_}{nowms};
		my $event = ${$_}{event};
		my $service_info = ${$_}{service_info};
		my $camera = ${$_}{camera};
		my $type = ${$_}{type};
		print $events_sav "$now;$nowms;$camera;$type;$service_info;$event\n";
	}
	close($events_sav);
	s_log(LOG_WARNING, "Unsaved events flush to temporary file $events_sav_name");
}

sub flush_to_db {
	my ($what, $db_handler, $file_handler) = @_;
	s_log(LOG_DEBUG, "Flush $what to DB " . ($cam_object ? "for $cam_object, " : "") . "buffer size: " . $#buffer);
	my $pid;
	$SIG{ALRM} = 'IGNORE';
	unless (defined ($pid = fork())) {
		s_log(LOG_WARNING, "Can't fork a new child for flush $what to DB: $!");
		next;
	}

	if ($pid) {
		@buffer = ();
		return;
	}

	$0 = $proc_title . ": DB $what flusher";

	my $retry_count = 3;
	$SIG{INT} = $SIG{TERM} = $SIG{PIPE} = sub {
		my $signal = shift;
		log_n_die("Child DB flusher ($what $cam_object) closed due SIG$signal received");
	};
	
	if ($sock) {
		$sock->close or s_log(LOG_WARNING, "Error close socket from DB child");
	}
	close_all_fd;

	until ($dbh = DBI->connect($sql_access{dsn}, $sql_access{user}, $sql_access{password})) {
		$cam_object = "" unless $cam_object;
		my $action = --$retry_count > 0 ? "still try... ($cam_object / $retry_count)" : "given up. ($cam_object)";
		my $pid;
		s_log(LOG_WARNING, "Can't connect to mysql base, $action");
		sleep(10) if $retry_count > 0;

		unless ($retry_count > 0) {
			if ($stat_dir) {
				-d "$stat_dir/$what.d" or mkdir "$stat_dir/$what.d";
				$file_handler->();
			}
			log_n_die("Some $what may lost due DB unaccessibility");
		}
	}
	
	$db_handler->();

	$dbh->disconnect;
	exit(0);
}


sub net_daemon {

	die "Pid file already exist" if -e $daemon_pid_file;

	Proc::Daemon::Init;

	open(PIDFILE, "> $daemon_pid_file") or s_log(LOG_WARNING, "Can't create pid file $daemon_pid_file");
	print PIDFILE $$;
	close(PIDFILE);
}


sub sanity_check {
	my $save_alrm_handler = $SIG{ALRM};
	my @events_raw;
	my @alerts_raw;
	my %event_structure;
	my %alert_structure;
	my $type;
	my $pid;

	$SIG{ALRM} = 'IGNORE';

	unless (defined ($pid = fork())) {
		s_log(LOG_WARNING, "Can't fork a new child for sanity check: $!");
		return;
	}

	if ($pid) {
		waitpid($pid, 0);
		$SIG{ALRM} = $save_alrm_handler;
		return;
	}

	sub grace_exit {
		$sth->finish if $sth;
		$dbh->disconnect if $dbh;
		exit;
	}
	
	s_log(LOG_DEBUG, "Sanity check...");
	$0 = $proc_title . ": Sanity checker";

	my @sav_events = glob("$stat_dir/events.d/*");
	my @sav_alerts = glob("$stat_dir/alerts.d/*");
	grace_exit unless @sav_events || @sav_alerts;

	unless ($dbh = DBI->connect($sql_access{dsn}, $sql_access{user}, $sql_access{password})) {
		s_log(LOG_WARNING, "Still have problems to connect to DB");
		grace_exit;
	}

	foreach my $event_file (@sav_events) {
		@buffer = ();
		open SAV_EVENTS, $event_file or do {
			s_log(LOG_WARNING, "Error opening save events file $event_file");
			next;
		};
		binmode(SAV_EVENTS, ':utf8');
		@events_raw = ();
		chomp(@events_raw = <SAV_EVENTS>);
		close SAV_EVENTS;

		foreach (@events_raw) {
			next unless $_;
			my ($now, $nowms, $camera, $type, $service_info, $event) = split(';', $_, 6);
			$event_structure{now} = $now;
			$event_structure{nowms} = $nowms;
			$event_structure{camera} = $camera;
			$event_structure{service_info} = $service_info;
			$event_structure{type} = $type;
			$event_structure{event} = $event;
			
			push(@buffer, {%event_structure});

			unless ($db_handlers{$type}) {
				s_log(LOG_WARNING, "Can't find DB handler for type $type");
				next;
			}
		}
		
		write_events_db;

		unlink $event_file;
	}
	

	foreach my $alert_file (@sav_alerts) {
		@buffer = ();
		open SAV_ALERTS, $alert_file or do {
			s_log(LOG_WARNING, "Error opening save alerts file $alert_file");
			next;
		};
		binmode(SAV_ALERTS, ':utf8');
		@alerts_raw = ();
		chomp(@alerts_raw = <SAV_ALERTS>);
		close SAV_ALERTS;


		my $alerts_index = 0;
		foreach (@alerts_raw) {
			next unless $_;
			my ($id, $camera, $start, $end, $message) = split(';', $_, 5);
			if ($id =~ /\D/ || $start =~ /\D/ || $end =~ /\D/) {
				s_log(LOG_WARNING, "Wrong record $_, skip");
				next;
			}
			unless ($camera && $start && $end) {
				s_log(LOG_WARNING, "Absence mandatory field in record $_, skip");
				next;
			}
			$alert_structure{id} = $id;
			$alert_structure{camera} = $camera;
			$alert_structure{start} = $start;
			$alert_structure{end} = $end;
			$alert_structure{message} = $message;
			push(@buffer, {%alert_structure});
		}

		write_alerts_db;
		unlink $alert_file;
	}
	
	

	@sav_events = glob("$stat_dir/events.d/*");
	unless (@sav_events) {
		rmdir "$stat_dir/events.d" if -d "$stat_dir/events.d";
	}

	@sav_alerts = glob("$stat_dir/alerts.d/*");
	unless (@sav_alerts) {
		rmdir "$stat_dir/alerts.d" if -d "$stat_dir/alerts.d";
	}

	grace_exit;
}


sub connect_to_sendall {
	close($send2all_sock) if $send2all_sock && $send2all_sock->opened();
	
	my $iaddr = inet_aton($send2all_host);
	my ($c_port, $c_proto) = get_port_proto($send2all_collect_port);
	my $paddr = sockaddr_in($c_port, $iaddr);
	my $c_type = $c_proto eq 'tcp' ? SOCK_STREAM : SOCK_DGRAM;

	socket($send2all_sock, PF_INET, $c_type, getprotobyname($c_proto));
	$send2all_sock->autoflush(1);

	for (1..10) {
		last if connect($send2all_sock, $paddr);
		sleep (0.02);
	}

	binmode($send2all_sock, ':utf8');
}



sub events_udp_listener {
	my ($bind_address, $port) = @_;
	my $what = 'events UDP';
	defined (my $pid = fork()) or log_n_die "Can't fork new child for $what listener: $!";
	return if $pid;

	$SIG{ALRM} = 'IGNORE';
	$SIG{INT} = $SIG{TERM} = sub {
		my $signal = shift;
		alarm 0;
		$flusher->() if $flusher;
		wait;
		log_n_die("$what processor child closed due SIG$signal received");
	};

	$SIG{PIPE} = sub {
		connect_to_sendall;
	};



	$0 = $proc_title . ": $port/udp $what listener and processor";

	my $lsocket = IO::Socket::INET -> new(
		LocalPort => $port,
		Proto => 'udp',
		Bind => $bind_address,
	) or log_n_die "Can't create socket for $what listener";

	@buffer = ();
	$flusher = sub { flush_to_db("events", \&write_events_db, \&write_events_to_file); };

	while (1) {
		my $recv_data;
		my %agent_structure = ();

		$lsocket->recv($recv_data, 1024);
		next unless $recv_data;
		
		%agent_structure = eval {
			recognize_event_agent($lsocket->peerhost());
		};
		unless (%agent_structure) {
			s_log(LOG_WARNING, "Can't recognize agent: $@");
			next;
		}

		$cam_object = $agent_structure{camera};
		event_line_processor(\%agent_structure, $recv_data);

	}
	
	# How I could find yourself here??! 8( )
	alarm 0;
	$flusher->();
	wait;
	$lsocket->close();
	close_all_fd;
	s_log(LOG_DEBUG, "$what listener: closed");
	exit 0;
}


sub bind_to_tcp_port {
	my ($what, $bind_address, $port, $handler) = @_;
	defined (my $pid = fork()) or log_n_die "Can't fork new child for $what listener: $!";
	return if $pid;
	$SIG{ALRM} = 'IGNORE';

	$0 = $proc_title . ": $port/tcp $what listener";
	my $lsocket = IO::Socket::INET -> new(
		LocalPort => $port,
		Listen => 5,
		Proto => 'tcp',
		Bind => $bind_address,
		ReuseAddr => 1,
		Timeout => 5,
	) or log_n_die "Can't create socket for $what listener";



	while (1) {
		next unless $sock = $lsocket->accept;
		defined (my $pid = fork()) or log_n_die "Can't fork new child for $what processor: $!";
		unless ($pid) {
			$SIG{INT} = $SIG{TERM} = sub {
				my $signal = shift;
				alarm 0;
				$flusher->();
				wait;
				log_n_die("$what processor child closed due SIG$signal received");
			};

			$SIG{PIPE} = sub {
				connect_to_sendall;
			};

			$SIG{ALRM} = 'IGNORE';

			$0 = $proc_title . ": TCP $what processor for " . $sock->peerhost;

			$lsocket->close;
			$handler->();
			s_log(LOG_DEBUG, "$what listener: Connection from " . $sock->peerhost . " lost.");
			alarm 0;
			$flusher->();
			wait;
			s_log(LOG_DEBUG, "$what listener: closed");
			exit 0;
		}
		$sock->close;
	}
}

sub alerts_processor {
	my %alert_structure;
	my $when_alarm;

	s_log(LOG_DEBUG, "Alerts agent connected from: " . $sock->peerhost);
	$0 = $proc_title . ": alerts processor for";

	$flusher = sub { flush_to_db("alerts", \&write_alerts_db, \&write_alerts_to_file); };

	while (<$sock>) {
		chomp;
		s/\r//;
		s/\0//;
		s/\s+$//;
		utf8::decode($_);
		next unless $_;
		s_log(LOG_DEBUG, "Get alert line: $_") unless $daemon_mode;
		unless (/(.*?;){2}/) {
			s_log(LOG_WARNING, "Bad format for alert line $_, skip");
			next;
		}
		%alert_structure = ();
		my ($now) = gettimeofday();
		$when_alarm = 0 unless @buffer;
		my ($id, $camera, $message) = split(';', $_, 3);

		unless ($camera) {
			s_log(LOG_WARNING, "Empty camera field, drop this record");
			next;
		}
		unless ($known_agents{$camera}{enable}) {
			s_log(LOG_WARNING, "Camera $camera not found or hasn't alerts enabled");
			next;
		}
		if ($id =~ /\D/) {
			s_log(LOG_WARNING, "Wrong symbols in ID field, drop this record");
			next;
		}

		$alert_structure{id} = $id;
		$alert_structure{camera} = $camera;
		$alert_structure{message} = $message;
		$alert_structure{start} = $known_agents{$camera}{before} ? $now - $known_agents{$camera}{before} : $now;
		$alert_structure{end} = $known_agents{$camera}{after} ? $now + $known_agents{$camera}{after} : $now;
		push(@buffer, {%alert_structure});

		$SIG{ALRM} = $flusher;

		if ($alert_structure{end} == $now || $#buffer > 30) {
			alarm 0;
			kill(14, $$);
		} elsif ($alert_structure{end} < $when_alarm || ! $when_alarm) {
			$when_alarm = $alert_structure{end};
			alarm $known_agents{$camera}{after};
		}

	}
}

sub recognize_event_agent {
	my %agent_structure;
	my $type = "common";
	my $peerhost = shift;

	$peerhost or die("No peer address given");
	s_log(LOG_DEBUG, "Events agent connected from: $peerhost") unless $daemon_mode;

	my $camera = $known_agents{$peerhost}{name} or die("Connection from unknown agent IP: $peerhost");
	s_log(LOG_DEBUG, "Agent detected: $camera") unless $daemon_mode;

	$type = $known_agents{$peerhost}{type} if $known_agents{$peerhost}{type};
	$logger_handlers{$type} or die("Error: can't find a logger handler for agent type \"$type\"");
	$db_handlers{$type} or die("Error: can't find a DB handler for agent type \"$type\"");

	$agent_structure{event_file} = "$events_dir/${camera}_events.txt";
	$agent_structure{encoding} = $known_agents{$peerhost}{encoding} if $known_agents{$peerhost}{encoding};
	$agent_structure{type} = $type;
	$agent_structure{camera} = $camera;

	return %agent_structure;
}


sub event_line_processor {
	my %event_structure;
	my $agent_structure_ref = shift;
	my %agent_structure = %{$agent_structure_ref};
	my $_ = shift;

	chomp;
	s/\r//;
	s/\0//;
	s/\s+$//;

	if ($agent_structure{encoding}) {
		$_ = decode($agent_structure{encoding}, $_);
	} else {
		utf8::decode($_);
	}

	$_ or return; 
	s_log(LOG_DEBUG, "Get events line: $_") unless $daemon_mode;

	my ($now, $nowms) = gettimeofday();

	$event_structure{now} = $now;
	$event_structure{nowms} = $nowms;
	$event_structure{event} = $_;
	$event_structure{service_info} = "";
	$event_structure{camera} = $agent_structure{camera};
	$event_structure{type} = $agent_structure{type};

	$preprocessor_handlers{$agent_structure{type}}->(\%event_structure);

	s_log(LOG_DEBUG, "After preprocess: " . $event_structure{event}) 
		if $event_structure{event} ne $_ && ! $daemon_mode;
	
	return unless $event_structure{event};

	push(@buffer, {%event_structure});
	$logger_handlers{$agent_structure{type}}->($agent_structure{event_file}, \%event_structure);
	send_to_all($agent_structure{camera}, @{$now_events_by_file{$agent_structure{event_file}}}) if $send2all_collect;

	$SIG{ALRM} = $flusher;
	alarm 5;

	if ($#buffer > 30) {
		s_log(LOG_DEBUG, "flush due buffer length: " . $#buffer);
		alarm 0;
		kill(14, $$);
	}
}




sub tcp_events_processor {
	my %agent_structure;

	@buffer = ();

	%agent_structure = eval { 
		recognize_event_agent($sock->peerhost);
	} or log_n_die($@);

	$cam_object = $agent_structure{camera};
	$0 = $proc_title . ": events processor for $cam_object";

	$flusher = sub { flush_to_db("events", \&write_events_db, \&write_events_to_file); };

	while (<$sock>) {
		event_line_processor(\%agent_structure, $_);
	}
	close_all_fd;
}


sub start_udp_listener {
	my ($what, $addr, $port) = @_;

	defined (my $pid = fork()) or log_n_die "Can't fork new child for UDP $what listener: $!";
	return if $pid;
	$SIG{ALRM} = 'IGNORE';
	$SIG{INT} = $SIG{TERM} = $SIG{PIPE} = sub {
		exit;
	};

	my $udp_listener;
	$udp_listener = \&events_udp_listener if $what eq 'events';

	until ($udp_listener) {
		log_n_die("Unknown action $what for UDP listener");
	}

	$0 = $proc_title . ": UDP $what watchdog";

	while (1) {
		$udp_listener->($addr, $port);
		wait;
		s_log(LOG_WARNING, "$what UDP listener crash detected, will restart within 1 sec");
		sleep(1);
	}
}





#############################################################################
# Begin here
# 

my ($new_uid, $new_gid, @new_gids) = drop_privileges($vargus_user) or log_n_die("Could not drop privileges") if not $>;


GetOptions(
	'daemon!' => \$daemon_mode
);

net_daemon if $daemon_mode;

$SIG{INT} = $SIG{TERM} = $SIG{PIPE} = sub {
	my $signal = shift;
	$SIG{INT} = $SIG{TERM} = $SIG{PIPE} = 'IGNORE';
	if (-e $daemon_pid_file) {
		open(PIDFILE, $daemon_pid_file) or die;
		my ($test_pid) = <PIDFILE>;
		close(PIDFILE);
		unlink $daemon_pid_file if $test_pid == $$;
	}
	kill("TERM", 0);
	log_n_die("Exit due SIG$signal received");
};

$SIG{CHLD} = 'IGNORE';

configure;
init_objects;



my $camera_quantity = do_remote_query($vargus_server, 'query camera;quantity');
unless ($camera_quantity) {
	log_n_die("Can't get cameras information from $vargus_server");
}

my $cam_query = 'query camera;' . join(',', 1..$camera_quantity) . ';name,' .
		'events:agent,events:encoding,events:type,' . 
		'alerts:enable,alerts:encoding,alerts:before,alerts:after';

my @cams_info = split(/\n/, do_remote_query($vargus_server, $cam_query));


foreach (@cams_info) {
	my ($name, 
		$agent, $e_encoding, $type, 
		$a_enable, $a_encoding, $before, $after
	) = split(';');

	$known_agents{$agent}{name} = $name if $agent;
	$known_agents{$agent}{encoding} = $e_encoding if $e_encoding;
	$known_agents{$agent}{type} = $type if $type;
	$known_agents{$name}{enable} = 1 if $a_enable;
	$known_agents{$name}{encoding} = $a_encoding if $a_encoding;
	$known_agents{$name}{before} = $before if $before;
	$known_agents{$name}{after} = $after if $after;
	s_log(LOG_DEBUG, "cam $known_agents{$agent}{name} type $known_agents{$agent}{type}")  if $agent;
}

$preprocessor_handlers{"common"} = \&common_preprocessor;
$db_handlers{"common"} = \&common_db_handler;
$logger_handlers{"common"} = \&common_event_logger;

if ($ext_modules) {
	foreach my $module (split(',', $ext_modules)) {
		load "Vargus::Events::$module";
	} 
}

distribute_events if $send2all_distribute;
connect_to_sendall if $send2all_collect;

$SIG{ALRM} = sub { sanity_check; alarm 300; };
alarm 5;
$0 = $proc_title;

bind_to_tcp_port("alerts", $bind_address, $alerts_port, \&alerts_processor);

foreach (@events_listeners) {
	s|/|:|g;
	my ($addr, $port, $proto) = split(/:/);
	bind_to_tcp_port("events", $addr, $port, \&tcp_events_processor) if $proto eq 'tcp';
	start_udp_listener("events", $addr, $port) if $proto eq 'udp';
}


while (1) {
	sleep(10);
}


