#!/usr/bin/perl

#
#  This is part of Cleo batch system project.
#  (C) Sergey Zhumatiy (serg@parallel.ru) 1999-2006
#
#
# You can redistribute and/or modify this program
# under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# See the GNU Library General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#

#  Cleo monitor (works on the node)
#
#  Usage: cleo-mon [path/to/config-file]
#
#  Default config - /etc/cleo-mon.conf
#
use lib '/usr/lib/cleo';

use strict;
use Exporter;
use Fcntl;
use IO::Handle;
use IO::Select;
use IO::Socket;
use IO::File;
#use IO::Pipe;
use POSIX;
use Sys::Syslog;

use Cleo::Conn;

use vars qw($VERSION @ISA @EXPORT);

BEGIN {
  $VERSION = 5.11;              #(a)
  die $@ if ($@);
}

sub qlog($);

#############################################
#
#  Task info incapsulation
#
# Methods:
#   new
#   kill
#   free
#   check_pid
#   add_pid
#   del_pid
#   check_deads
#   is_head
#   set_head
#   get_pids
#   mark_dead
#   is_dead
#   set_attr
#   get_attr
#
#############################################
{
  package Task;

  # constructor
  sub new {
    my $self ={};
    $self->{pids} = {};
    $self->{attr}->{user} = 'nobody';
    $self->{attr}->{temp_dir} = '/tmp';
    $self->{dead}=0;

    bless($self);
    return $self;
  }

  # check if pid belongs to this task
  #
  # arg - testing pid
  # ret: 1 if pid belongs to task/ 0 - if not
  sub check_pid {
    my $self=shift;
    return 1 if(exists($self->{pids}->{$_[0]}));
    return 0;
  }

  # adds pid to task
  sub add_pid {
    my $self=shift;
    if ($_[0]>1 and !main::is_deprecated_pid($_[0])) {
#      main::qlog "Added pid $_[0]\n";
      $self->{pids}->{$_[0]}=1;
    }
  }

  # kill all task processes with signal
  #
  # arg (opt): signal (def=SIG_TERM)
  sub kill {
    my $self=shift;
    my $signal;
    if ($_[0] ne '') {
      $signal=$_[0];
    } else {
      $signal='TERM';
    }
    main::qlog "Kill pids $signal:".join(';',keys(%{$self->{pids}}),"\n")
      if(main::get_setting('log_kills')!=0);

    kill $signal, keys(%{$self->{pids}});
  }

  # delete pid from this task
  #
  # arg: pid
  sub del_pid {
    my $self=shift;
    if ($_[0]>1) {
#      main::qlog "Deleted pid $_[0]\n";
      delete $self->{pids}->{$_[0]};
    }
  }

  # check if all task pids are alive
  #
  # arg: autodel (if nonzero, automatically delete dead pids)
  # ret: list of 'dead' pids
  sub check_deads {
    my $self=shift;
    my $ad=shift;

    my @ret;
    foreach my $i (keys(%{$self->{pids}})) {
      unless(kill 0, $i){
        push @ret, $i;
        delete $self->{pids}->{$i} if($ad);
      }
    }
    return @ret;
  }

  # remove all information about pids
  #
  sub free{
    my $self=shift;
    $self->{pids} = {};
  }

  # check, if pid is task head pid
  #
  # arg: pid
  # ret: 1 if pid is head task pid, 0 if not
  sub is_head {
    my $self=shift;
    my $pid=shift;

    return ($pid==$self->{head})?1:0;
  }

  # set pid as task head pid
  #
  # arg: pid
  sub set_head {
    my $self=shift;
    my $pid=shift;

    return if($pid<1);
    $self->{head}=$pid;
    $self->{pids}->{$pid}=1;
  }

  # return list of all pids
  #
  sub get_pids{
    my $self=shift;
#    main::qlog "Get pids ".join(',',keys(%{$self->{pids}}))."\n";
    return keys(%{$self->{pids}});
  }

  # mark task as dead
  #
  sub mark_dead{
    my $self=shift;
    $self->{dead}=1;
  }

  # check if task is dead
  #
  sub is_dead{
    my $self=shift;
    return $self->{dead};
  }

  # set attribute
  #
  sub set_attr{
    my $self=shift;
    my $attr=shift;
    my $val=shift;
#    main::qlog "Set attr $attr=$val\n";
    $self->{attr}->{$attr}=$val;
  }

  # get attribute
  #
  sub get_attr{
    my $self=shift;
    my $attr=shift;
#    main::qlog "Get attr $attr=$self->{attr}->{$attr}\n";
    return $self->{attr}->{$attr};
  }

  # list available attributes
  #
  sub list_attrs{
    my $self=shift;
    return keys(%{$self->{attr}});
  }
}


{
  package XMLNODE;

  sub new(;$){
    my $self ={};
    if($_[0] ne ''){
      $self->{name}=$_[0];
      $self->{index}=0;
    }

    $self->{nodes}=[];

    bless($self);
    return $self;
  }

  sub get_name{
    my $self=shift;
    return $self->{name};
  }

  sub set_name{
    my $self=shift;
    $self->{name}=$_[0];
  }

  sub get_val{
    my $self=shift;
    return $self->{val};
  }

  sub set_val($){
    my $self=shift;
    $self->{val}=$_[0];
  }

  sub add_node($){
    my $self=shift;
    push @{$self->{nodes}}, $_[0];
  }

  sub to_first_node(){
    my $self=shift;
    $self->{index}=0;
#    return $self->{nodes}->[0];
  }

  sub next_node(){
    my $self=shift;
    if(exists($self->{index})){
      return $self->{nodes}->[$self->{index}++];
    }
    return undef;
  }

  sub count_nodes(){
    my $self=shift;
      return scalar @{$self->{nodes}};
  }
}




use vars qw(@for_server $SRV $LST $srv_select $my_name
            $init_conn_time $init_conn_timeout $port $server $pipe_error
            %ran $all_pids %child_pids %server_recievers
            %useruid %groupid @reaped @reapcode
            $attach_parent_mask $attach_exe_mask $attach_user @attach_collected
            %delayed_requests $smart_port $global_rsh_command @attach_requests
            @pre_reaped1 @pre_reaped2 @pre_reapcode1 @pre_reapcode2
            $reaping $reaping2 $new_reaped $SH $shell_conn
            $rsh_from $rsh_hash $rsh_num $rsh_start_time @fake_rshells
            $reaper_child %error_codes @serv_buffer %kill_pids %kill_em
            $STATUS %delayed_kills $last_ran_check @delayed_attaches
            %global_settings %new_global_settings %def_global_settings
            %opt_types @deprec_pids @deprec_uids
            %tasks %delayed_task_kills
            $attach_in_progress $attach_tmout $attach_owner $attach_id);

use vars qw(%_d_flush_nolog %_d_rcv_nolog $debug_level); # debug purpose
use vars qw($_d_alarm_log $_debug_log_head $_debug_yahoo); # debug purpose

use vars qw($DO_NOT_FORCE);
$DO_NOT_FORCE = 1;              # debug!!!

sub attach_handler_second_stage( $ );
sub update_childs;
sub load_config( $;$ );
sub load_state;
sub get_setting( $ );
sub pack_value( $ );
sub unpack_value( $$;$ );
sub save_state();

eval { &O_LARGEFILE(); };
if ($@) {
  eval "sub O_LARGEFILE(){return 0;}";
}

sub _print_tasks{
    foreach my $i (keys(%tasks)){
        qlog "__TASK=$i: ".join(';',$tasks{$i}->get_pids(),"\n");
    }
}

sub print_stack{
    my ($rep,$package, $filename, $line, $subroutine,$i);
    for($i=1; $i<5; ++$i){
        (undef, $filename, $line, $subroutine)=caller($i);
        $rep.="$i [${filename}:${line} ${subroutine}]; ";
    }
    qlog "STACK: $rep\n";
}

@deprec_pids = ( 1, 8 );        # format: min1,max1, min2,max2, ... minN,maxN
@deprec_uids = ( 0,100 );       # see above

$debug_level=0;

%def_global_settings = (
                        path_prepend            => '',
                        path_append             => '',
                        smart_port              => 8855,
                        global_rsh_command      => '/usr/bin/ssh',
                        mon_save                => '/var/log/cleo-mon.save',
                        init_conn_timeout       => 5,
                        port                    => 5588,
                        hard_kill_after_head    => 15,
                        suexec_gid              => 65535,
                        debug_pc                => 0,
                        hard_kill_delay         => 60,
                        last_ran_check_interval => 3600,
                        log_kills               => 0,
                        pids_update_interval    => 15 );

%opt_types = (
              #                  type:safe:cumulative:sections
              # types: n-umeric, t-ext, h-ash, l-ist (via space), L-st (via comma)
              # sections: ''=all, 'q'=queues, 'g'=global, 'u'=users, 'p'=profiles,
              #           'U'=clusterusers, 'l'=local_user_file
              # sections are ignored here, but must be specified :)

              path_prepend         => [ 't', 'y', '', 'g' ],
              path_append          => [ 't', 'y', '', 'g' ],
              smart_port           => [ 'n', 'y', '', 'g' ],
              global_rsh_command   => [ 't', 'y', '', 'g' ],
              mon_save             => [ 't', 'y', '', 'g' ],
              init_conn_timeout    => [ 'n', 'y', '', 'g' ],
              hard_kill_after_head => [ 'n', 'y', '', 'g' ],
              hard_kill_delay      => [ 'n', 'y', '', 'g' ],
              suexec_gid           => [ 'n', 'y', '', 'g' ],
              debug_pc             => [ 'n', 'y', '', 'g' ],
              last_ran_check_interval => [ 'n', 'y', '', 'g' ],
              log_kills            => [ 'n', 'y', '', 'g' ],
              pids_update_interval => [ 'n', 'y', '', 'g' ],
              port                 => [ 'n', 'y', '', 'g' ] );

%_d_flush_nolog = (

                   #                 'init' =>1,
                                    'run'  =>1,
                                    'kill' =>1,
                   #                 'stat' =>1,
                   #                 'exit' =>1,
                                    'init_attach' =>1,
                                    'attach' =>1,
                   'ping' => 1 );
%_d_rcv_nolog = (

#                 'init' =>1,
                 'run'  =>1,
                 'kill' =>1,
#                 'stat' =>1,
#                 'exit' =>1,
                 'init_attach' =>1,
                 'attach' =>1,
                 'ping' => 1
                );

$_d_alarm_log = 0;

%error_codes = ();
for ( my $i = 1; $i < 128; ++$i ) {
  $! = $i;
  if (   ( $! eq "Bad file descriptor" )
         or ( $! eq "File too large" )
         or ( $! eq "Broken pipe" )
         or ( $! eq "Machine is not on the network" )
         or ( $! eq "Communication error on send" )
         or ( $! eq "Protocol error" )
         or ( $! eq "Network is down" )
         or ( $! eq "Network is unreachable" )
         or ( $! eq "Network dropped connection on reset" )
         or ( $! eq "Software caused connection abort" )
         or ( $! eq "Connection reset by peer" )
         or ( $! eq "Cannot send after transport endpoint shutdown" )
         or ( $! eq "Connection timed out" ) ) {
    $error_codes{$i} = 1;
  }
}

sub do_syslog( $ ) {
  openlog "cleo-mon", 'pid', 'daemon';
  syslog 'info', $_[0];
  closelog;
}

sub qlog( $ ) {
  my $t = localtime(time);
  printf $STATUS "\[%s\] %-8s: %s", $t, $my_name, $_[0];
  $STATUS->flush();

  #  do_syslog( $_[0] );
}

#
#
#  Reopen log files
#
sub usr1_processor() {
  my ( $io1, $io2, $io3, $log_file );
  eval { $STATUS->close(); };
  eval {

    # trick to avoid reopen STDIN/OUT/ERR
    $io1 = IO::File->new();
    $io1->open('/dev/null');
    $io2 = IO::File->new();
    $io2->open('/dev/null');
    $io3 = IO::File->new();
    $io3->open('/dev/null');
  };

  $log_file = get_setting('log');
  $log_file |= "/var/log/cleo-mon.log";

  $STATUS = IO::File->new();
  unless (
          $STATUS->open(
            $log_file, O_LARGEFILE | O_WRONLY | O_APPEND | O_CREAT )
         ) {
    do_syslog("Cannot open '$log_file' ($!). Try /tmp");
    unless (
            $STATUS->open(
                "/tmp/cleo-mon.log",
                O_LARGEFILE | O_WRONLY | O_APPEND | O_CREAT )
           ) {
      do_syslog("Cannot open /tmp/cleo-mon.log ($!). Try /dev/null");
      $STATUS->open( "/dev/null", O_WRONLY );
    }
  }
  $STATUS->autoflush(1);

  eval {
    $io1->close();
    $io2->close();
    $io3->close();
  };
}


#
#
#  Change log level
#
sub usr2_processor() {
    ++$debug_level;
    if($debug_level>3){
        $debug_level=0;
    }
    qlog "Debug level=$debug_level\n";
}

sub REAPER {
  eval {
    while ( ( $reaper_child = waitpid( -1, &WNOHANG ) ) > 0 ) {
      qlog "SIGCHLD: $reaper_child ($?)\n";
      if ($reaping2) {
        push @pre_reaped1,   $reaper_child;
        push @pre_reapcode1, $?;
      } else {
        push @pre_reaped2,   $reaper_child;
        push @pre_reapcode2, $?;
      }
    }
    qlog "Signal processing done\n";
    if ($reaping) {
      qlog "Reaping while reaper.\n";
      $new_reaped = 1;
    } else {
      push @reaped,   @pre_reaped2;
      push @reapcode, @pre_reapcode2;
      @pre_reaped2   = ();
      @pre_reapcode2 = ();
    }
    qlog "1st reaper done\n";
    $SIG{CHLD} = \&REAPER;      # still loathe sysV
  };
  if ($@) {
    qlog "Reaper exeption: $@\n";
  }
}                               # REAPER

# check if some processes are dead, but did not send sigchild
sub ran_check() {

  foreach my $i ( values(%tasks) ) {
    my @deads = $i->check_deads(1);
    foreach my $j (@deads) {

      # child is dead...
      $reaping = 1;
      push @reaped,   $j;
      push @reapcode, 255;
      $reaping = 0;
    }
  }
}

#
#  Do actual reaping tasks
#
#
sub do_reap {
  my ( $child, $tmp, $code, $i, $count );

  eval {
    $reaping = 1;
    if ( @reaped > 0 ) {
      update_pids();
      update_childs();
    }

    # do not reap while attaching
    unless($delayed_requests{init_attach}->{blocked}){

    MAIN_REAPER:
      while ( $child = shift @reaped ) {
        $code = shift @reapcode;
        qlog "Reaping: pid=$child, code=$code\n";
        foreach $tmp ( keys(%tasks) ) {

          # is dead pid one of my tasks?
          if ($tasks{$tmp}->check_pid($child) ) {

            $tasks{$tmp}->del_pid($child);
            next MAIN_REAPER if $tasks{$tmp}->is_dead;

            # is it task head or last pid?
            $count = scalar($tasks{$tmp}->get_pids());
            if ($tasks{$tmp}->is_head($child) or $count<2 ) {

              qlog 'Task head died ('.$tasks{$tmp}->get_attr('id').'/'.
                $tasks{$tmp}->get_attr('owner').'/'.$tasks{$tmp}->get_attr('user').' '.
                  join(',', sort($tasks{$tmp}->get_pids())).")\n";

              # probably do not force kill other task pids
              next MAIN_REAPER if $DO_NOT_FORCE and $count>1;

              qlog "Soft kill $tmp\n";
              $tasks{$tmp}->kill('TERM');
              $delayed_task_kills{$tmp}=
                time+$tasks{$tmp}->get_attr('hard_kill_after_head');
              $tasks{$tmp}->mark_dead;

            }
            next MAIN_REAPER;
          }
        }
        # pid not belongs to any task
        qlog "PID $child not belongs to any task. Skip.\n";
      }
    } # skipping if attach is in progress

    if ($new_reaped) {
      qlog "New reaped\n";
      push @reaped,   @pre_reaped1;
      push @reapcode, @pre_reapcode1;
      @pre_reaped1   = ();
      @pre_reapcode1 = ();
      $reaping2      = 1;
      push @reaped,   @pre_reaped2;
      push @reapcode, @pre_reapcode2;
      @pre_reaped1   = ();
      @pre_reapcode1 = ();
      $reaping2      = 0;
      $new_reaped    = 0;
    }
    $reaping = 0;
  };

  if ($@) {
    qlog "Reaper2 exeption: $@\n";
  }
}                               # do_reap

#
# Launch the program after waiting the time interval.
# Returns immediately.
#
# Args:
#      - time to wait
#      - command line
#      - uniq id
#      - [opt] uid
#      - [opt] gid
#
#
#######################################################################
sub launch( $$$;$$ ) {

  # Time_interval, command_line, uniq_id

  my ( $time, $prog, $id, $uid, $gid ) = @_;
  my ( $i,    $p );

  $p = fork();
  return unless defined $p;     # FAIL 8(
  return if ( $p < 0 );         # FAIL 8(
  if ($p) {                     # successful launch
    return;
  }

  # Created process

  $0 = "CLEO LAUNCH";
  $SIG{PIPE} = sub { $pipe_error = 1; };
  $SIG{CHLD} = 'Ignore';
  $SIG{USR1} = 'Ignore';
  $SIG{USR2} = 'Ignore';
  $SIG{HUP}  = 'Ignore';
  $SIG{ABRT} = 'Ignore';
  $SIG{TERM} = 'Ignore';
  $SIG{QUIT} = 'Ignore';
  $SIG{BUS}  = 'Ignore';
  $SIG{SEGV} = 'Ignore';
  $SIG{FPE}  = 'Ignore';
  $SIG{INT}  = 'Ignore';
  $SIG{ILL}  = 'Ignore';

  # try to create child process (daemonize)
  for ( $i = 0; $i < 10; ++$i ) {
    select( undef, undef, undef, 0.1 );
    $p = fork();
    next unless defined $p;
    next if $p < 0;
    last;
  }
  exit 0 unless defined $p;
  exit 0 if ( $p != 0 );

  if (1) {                      # || POSIX::setsid()!=-1) {
    unlink "/tmp/q-launch.$id"; # delete possible symlink
    open X, ">/tmp/q-launch.$id" or exit(1); # create 'lock-file'
    close X;
    for ( ; $time > 0; --$time ) {
      sleep 1;
      exit(0)
        unless -f "/tmp/q-launch.$id";
      # exit, if launch is not nessesary
    }
    unlink "/tmp/q-launch.$id"; # delete 'lock-file'
    qlog "LAUNCHING($p) '$prog'\n";
#    eval { close $LST; };
#    eval { close $To_server; };
#    eval { close $From_server; };
#    eval { close $SH; };
    $SRV->disconnect;
    eval { close STATUS; };
    $ENV{PATH} = '/bin:/usr/bin:/usr/local/bin';

    if ($uid) {
      $< = $> = $uid;
      $( = $) = $gid;
    }
    setpriority( 0, $$, 0 );
    exec($prog);                # THE CULMINATION!
  }
  exit 1;
}                               # launch

#
#   Makes this program a daemon
#
# arg: none
# ret - none
#
#####################################################################
sub daemonize() {
  my ( $pid, $i );

  for ( $i = 0; $i < 10; ++$i ) {
    $pid = fork();
    if ( defined $pid ) {
      last if $pid >= 0;
    }
  }
  unless ( defined $pid ) {
    qlog "Cannot daemonize! So die...\n";
    exit(1);
  }
  unless ( $pid >= 0 ) {
    qlog "Cannot daemonize! So die...\n";
    exit(1);
  }
  exit(0) if $pid > 0;
  if ( POSIX::setsid() != -1 ) {
    return;
  }
  return;
  qlog "CANNOT DAEMONIZE!\n";
  exit(1);
}                               # daemonize

sub is_deprecated_pid( $ ) {
  my $i = 0;
  my ( $min, $max );
  while ( $i <= $#deprec_pids ) {
    $min = $deprec_pids[$i];
    $max = $deprec_pids[ $i + 1 ];
    return 1 if ( ( $_[0] >= $min ) and ( $_[0] <= $max ) );
    $i += 2;
  }
  $i=0;
  while ( $i <= $#deprec_uids ) {
    $min = $deprec_uids[$i];
    $max = $deprec_uids[ $i + 1 ];
    if(exists($all_pids->{$_[0]})){
        return 1 if(($all_pids->{$_[0]}->{uid} >= $min)
            and ($all_pids->{$_[0]}->{uid} <= $max));
    }
    $i += 2;
  }
  return 0;
}

#
#   Kill pid
#
#   With argument 'all' kills all controlled pids (used for shutdown)
#
#####################################################################
sub kill_pid( $ ) {
  my $pid = $_[0];

  if( $pid eq 'all' ){

    # kill all tasks
    foreach my $i (keys(%tasks)){
      $tasks{$i}->kill;
      $delayed_task_kills{$i}=
        time+$tasks{$i}->get_attr('hard_kill_after_head');
      $tasks{$i}->mark_dead;
    }
  }
  else{
    kill 'TERM', $pid unless(is_deprecated_pid($pid));
    $delayed_kills{$pid}=time+get_setting('hard_kill_delay');
  }
}                               # kill_pid

#
#   Returns a pid of task by owner and id
#
# arg: owner  - owner queue
#      id     - id of task in owner queue
# ret - list of tasks pids
#     - () if no task found
#
#####################################################################
#sub get_pids( $$ ) {
#  my ( $owner, $id ) = @_;
#
#  qlog "get_pids: $owner,$id\n";
#  if(exists($tasks{"$owner:$id"})){
#    return $tasks{"$owner:$id"}->get_pids();
#  }
#  return ();
#}                               # get_pids

{
  #####################################################################
  #
  # Gets the block from channel (ends with 'end\n')
  #
  # arg: f - the Cleo::Conn
  #      s - if nonzero - lines must ends with "__end\n"
  # ret - pointer to list of lines without 'end\n' as last line...
  #     - empty list if nothing were readed...
  #####################################################################
  sub get_block($;$ ) {
    my ( $h, $statmode ) = @_;
    my ( $tmp, $tmpchar, $err );

    $tmp=$h->read;

    if(!defined $tmp){
        #error
        qlog("Channel is dead.[$!]\n");
        return undef;
    }

    $tmp =~ s{^(.*?end\n)}{}s;

    # got full message?
    if( $1 ne '' ){
        # another message follows. save it.
        if($tmp ne ''){
            $h->unread($tmp);
        }
        return split( /\n/, $1 );
    }
    # not full message yet.
    $h->unread($tmp) if($tmp ne '');
    return ();
  }

};

#####################################################################
#
# 'Send' answer to server (actually only queue it, see flust_to_server)
#
# args: to
#       hash
#       type
#       id
#       [parameters] - like 'param1',$p1,'param2',$abc ...
#
#####################################################################
sub answer_to_server($$$$;@ ) {

  #
  my ( $to, $h, $type, $id, %params ) = @_;
  my ( $e, $k, $v);

  # are we connected?
  return unless defined $SRV;

  if ( $to eq '' ) {
    qlog "EMPTY TO!\n";
    print_stack;
    return;
  }
#  $e = {
#        'to'      => $to,
#        'type'    => $type,
#        'id'      => $id,
#        'hash'    => $h,
#        'status'  => 'done',    # by default
#        'success' => 1          # by default
#       };
  if(!defined($params{'success'})){
    $e="[1]";
  }else{
    $e=$params{'success'};
  }
  qlog ">> to=$to, type=$type, id=$id, hash=$h, success=$e\n"
    unless ( $_d_flush_nolog{$type} );

  $SRV->send("\*$my_name:$to:$h\n$type\n");
  $e = pack_value($id);
  $SRV->send("id:$e\n");

  # default values...
  if(!defined($params{'status'})){
      $e = pack_value('done');
      $SRV->send("status:$e\n");
  }
  if(!defined($params{'success'})){
      $e = pack_value(1);
      $SRV->send("success:$e\n");
  }
  while ( ( $k, $v ) = each( %params ) ) {
    $e = pack_value($v);
    qlog "Packed ($k) as '$e'\n" if get_setting('debug_pc');
    qlog "SENDING: $k: $e\n" unless ( $_d_flush_nolog{$type} );
    $SRV->send("$k:$e\n");
  }

  $SRV->send("end\n");
  $SRV->flush;
}                               # answer_to_server

#####################################################################
#
# Actually send all messages to server
#
# args: NONE
#
#####################################################################
#sub flush_to_server() {
#  my ( $to, $type, $hash, $i, $n, $cur, $k, $v, $e );

#  return unless defined $SRV;

#  for $cur (@for_server) {
#    ( $to, $type, $hash ) = ( $cur->{to}, $cur->{type}, $cur->{hash} );

#    qlog "_SENDING to master($to) $type/$cur->{hash}/$cur->{success}\n"
#      unless ( $_d_flush_nolog{$type} );

#    delete $cur->{to};
#    delete $cur->{type};
#    delete $cur->{hash};

#    $SRV->send("\*$my_name:$to:$hash\n$type\n");
#    while ( ( $k, $v ) = each( %{$cur} ) ) {
#      $e = pack_value($v);
#      qlog "Packed ($k) as '$e'\n" if get_setting('debug_pc');
#      qlog "SENDING: $k: $e\n" unless ( $_d_flush_nolog{$type} );
#      $SRV->send("$k:$e\n");
#    }

#    $SRV->send("end\n");
#  }
#  flush_server_channel();
#}                               # flush_to_server

#sub flush_server_channel() {
#    return unless $SRV;

#    $SRV->flush;
#}

#####################################################################
#
# Register a procedure for receiving messages of given type
#
# args: type
#       procedure ( prototype is: sub handler( $$$$$$ ), where args are:
#                   hash, status, from, \%args)
#
#####################################################################
sub register_mon_rcv( $$ ) {
  my ( $type, $handler ) = @_;
  push @{ $server_recievers{$type} }, $handler;
}                               # register_mon_rcv

#####################################################################
#
# Unregister a procedure for receiving messages of given type
#
# args: type
#       procedure
#
#####################################################################
sub unregister_mon_rcv( $$ ) {
  my ( $type, $handler ) = @_;
  my $i;
  for ( $i = 0; $i <= scalar( @{ $server_recievers{$type} } ); ++$i ) {
    if ( $server_recievers{$type}[$i] eq $handler ) {
      splice( @{ $server_recievers{$type} }, $i, 0 );
      last;
    }
  }
}                               # unregister_mon_rcv

#####################################################################
#
# Receive messages from server and dispatch them (call handlers)
#
# args: NONE
# ret:  NONE
#
#####################################################################
sub rcv_from_server() {
  my (@outs, $from, $type, $tmp,    $to,
      $hash, $i,    %args, @errors, $unpacked );

    return if (!defined $SRV or ($SRV->get_state ne 'ok'));
    for ( ;; ) {

        # Read the message block
        %args = ();
        $hash = get_parsed_block( $SRV, \%args );
        last if ( $hash eq '-' or $hash eq '' );

        ( $from, $to, $type ) =
          ( $args{_from}, $args{_to}, $args{_type} );

        delete $args{_from};
        delete $args{_to};
        delete $args{_hash};
        delete $args{_type};

        foreach $tmp ( keys(%args) ) {
          next if ( $tmp eq 'success' );
          undef $unpacked;
          unpack_value( \$unpacked, $args{$tmp} );
          $args{$tmp} = $unpacked;
          qlog "Unpacking: '$tmp' ($args{$tmp}) as '$unpacked,'\n"
            if get_setting('debug_pc');
        }
        if ( ref( $server_recievers{$type} ) eq 'ARRAY' ) {
          for (
               $i = 0;
               $i < scalar( @{ $server_recievers{$type} } );
               ++$i
              ) {
            qlog "checking $type / $i for code...\n"
              unless ( $_d_rcv_nolog{$type} );
            if ( ref( $server_recievers{$type}[$i] ) eq 'CODE' ) {
              qlog "Yes! call it! ($type,$hash,$from)\n"
                unless ( $_d_rcv_nolog{$type} );
              $server_recievers{$type}[$i]
                ->( $type, $hash, $from, \%args );
            } else {
              qlog "No. Its "
                . ref( $server_recievers{$type}[$i] ) . "\n";
            }
          }
        }
    }                         # messages reading loop
#    }
}                               # rcv_from_server

#####################################################################
#
# Gets the block from handle and returns a hash with arguments
#  In result hash '_from','_to','_hash','_type' are special
#
# args: Cleo::Conn
#       return_hash_ref   (\%ret)
# ret:  ''   - no more blocks
#       '-'  - an error occured
#       other- the hash of this block
#
#####################################################################
sub get_parsed_block( $$ ) {
  my ( $handle, $out ) = @_;

  my ( @o, $type, $from, $to, $tmp, $hash, $i );

  @o = get_block($handle);
  return '' unless (@o);

  if ( scalar(@o) == 1 ) {
    qlog "Warning! [$#o] Strange end of message from master... Skipping.\n";
    qlog join( ";;", @o ) . "\n";
    return '-';
  }
  chomp @o;

  # Check it...
  qlog "HEADER: '$o[0]'\n" if ($_debug_log_head);
  ( $from, $to, $hash ) = ( $o[0] =~ /^\*([^:]+)\s*:([^:]+)\s*:(\S+)$/ );
  unless ( $from && $to ) {
    qlog "Warning! Strange message. No from or to. ($o[0]) Skipping.\n";
    qlog join( ";;", @o ) . "\n";
    return '-';
  }

  # Get the type
  shift @o;
  $type = $o[0];
  shift @o;
  qlog "GOT: $type;\n";
  unless ( $_d_rcv_nolog{$type} ) {
    $tmp = join( '#', @o );
    $tmp =~ s/\0/^/g;
    qlog "CONTENT: $tmp;\n";
  }

  foreach $i (@o) {
    next unless ( $i =~ /^([^:]+)\s*:\s*(.*?)\s*$/ );
    if ( $1 eq '' ) {
      qlog "Warning! Bad line: '$i'\n";
      next;
    }
    $out->{$1} = $2;
  }
  $out->{_from} = $from;
  $out->{_to}   = $to;
  $out->{_hash} = $hash;
  $out->{_type} = $type;
  return $hash;
}                               # get_parsed_block

#
#  returns uid by username/uid
#
sub get_uid( $ ){
  return $_[0] if($_[0] =~ /^\d+$/);

  if(exists($useruid{$_[0]})){
    return $useruid{$_[0]};
  }

  # nonpriveleged user
  return 65534;
}

#
#  returns group id by group/gid
#
sub get_gid( $ ){
  return $_[0] if($_[0] =~ /^\d+$/);

  if(exists($groupid{$_[0]})){
    return $groupid{$_[0]};
  }

  # nonpriveleged group
  return 65534;
}


#####################################################################
#
# Updates %useruid and %groupid.
#
# args: NONE
# ret:  NONE
#
#####################################################################
sub get_users() {
  my ( $v, $u, $g );
  %useruid = %groupid = ();

  while ( ( $u, undef, $v ) = getpwent() ) {
    $useruid{$u} = $v;
  }
  endpwent();
  while ( ( $g, undef, $v ) = getgrent() ) {
    $groupid{$g} = $v;
  }
  endgrent();
}                               # get_users

# #####################################################################
# #
# # Returns a list of children pids of given pid
# #
# # args: pid
# # ret:  \@pid_list
# #
# #####################################################################
# sub get_children_pids( $ ) {
#   my $pid = $_[0];
#   my @ret = ();

#   my ( $i, $process );

#   while ( ( $i, $process ) = each(%all_pids) ) {
#     push @ret, $i if ( $process->{ppid} == $pid );
#   }
#   return @ret;
# }                               # get_children_pids

#####################################################################
#
# Processes the output from childs
#
# args: NONE
# ret:  NONE
#
#####################################################################
sub read_from_childs() {
#   my ( $i, $c, $str );

#   foreach $i ( values(%ran) ) {
#     if ( defined( $i->{stdout} ) ) {
#       undef $str;
#       while ( sysread( $i->{stdout}, $c, 1 ) == 1 ) {
#         $str .= $c;
#       }
#       if ( $str ne '' ) {
#         qlog "GOT FROM CHILD '$str'\n";
#       }
#     }
#     if ( defined( $i->{stderr} ) ) {
#       undef $str;
#       while ( sysread( $i->{stderr}, $c, 1 ) == 1 ) {
#         $str .= $c;
#       }
#       if ( $str ne '' ) {
#         qlog "GOT ERR FROM CHILD '$str'\n";
#       }
#     }
#   }
}

# #####################################################################
# #
# #  Get all children of given pid
# #
# # args: pid
# # ret:  list of child pids
# #
# #####################################################################

# sub get_childs_pids( $ ) {
#   my $pid = $_[0];

#   my ( @all_childs, @cur_childs );

#   if ( exists( $child_pids{$pid} ) ) {
#     @cur_childs = @{ $child_pids{$pid} };
#   } else {
#     return ();
#   }
#   while ( scalar(@cur_childs) ) {
#     shift @cur_childs;
#     if ( exists( $child_pids{$_} ) ) {
#       push @cur_childs, $child_pids{$_};
#       push @all_childs, $child_pids{$_};
#     }
#   }
#   return @all_childs;
# }

#####################################################################
#
#  Gets pids by exe_mask, parent_mask and user
#
# args: parent_mask
#       exe_mask
#       user
# ret:  list of found pids
#
#####################################################################

sub collect_pids( $$$ ) {
  my ( $parent_mask, $exe_mask, $user ) = @_;
  my ( @parents, @ret, $p, $uid );

  update_pids();
  $uid = get_uid($user);
  if ( $parent_mask ne '' ) {
    eval{
      foreach $p ( keys(%{$all_pids}) ) {
        push @parents, $p
          if ( $all_pids->{$p}->{cmdline} =~ /$parent_mask/ );
      }
    };

    #      return @ret unless @parents;
  }
  if ( $exe_mask ne '' ) {
    eval{
      foreach $p ( keys(%{$all_pids}) ) {
        if ( $all_pids->{$p}->{cmdline} =~ /$exe_mask/ ) {
          if ((@parents == 0
               or grep { $all_pids->{$p}->{ppid} eq $_ } @parents
              )
              and ( $all_pids->{$p}->{uid} == $uid )
             ) {
            push @ret, $p;
          }
        }
      }
    };
  }
  return @ret;
}

#####################################################################
#
# Updates {childs} arrays of all %ran entries
#
# args: none
#
# NOTE: call update_pids before!!!
#
#####################################################################

sub update_childs( ) {
  my ( $p, $i, $j, %new_childs, %all_my_pids,
       %check_pids, $flag, $adding );

  $flag=0;
  eval {
    local $SIG{__DIE__}=sub {;};

    # remember all tasks pids
    %all_my_pids=();
    foreach $i (keys(%tasks)){
#      qlog "111 ($i)\n";
      foreach $p ($tasks{$i}->get_pids()){
#        qlog "222 ($i/$p)\n";
        # value = task name!
        $all_my_pids{$p}=$i;
      }
    }

    # find direct childs of our tasks pids
    foreach $p (keys(%{$all_pids})){
      next if(exists($all_my_pids{$p}));
      if(exists($all_my_pids{$all_pids->{$p}->{ppid}})){
        # remember it with task name!
        $new_childs{$p}=$all_my_pids{$all_pids->{$p}->{ppid}};
      }
      else{
        $check_pids{$p}=1;
      }
    }

    # find other childs
    $adding = 1;
    while ($adding) {
      $adding = 0;
      foreach $p (keys(%check_pids)){
        if(exists($new_childs{$all_pids->{$p}->{ppid}})){
          # remember it with task name!
          $new_childs{$p}=$new_childs{$all_pids->{$p}->{ppid}};
          delete $check_pids{$p};
          $adding=1;
        }
      }
    }

    # now %new_childs contains full list of new tasks pids
    # every value eq task name...

    # add them to tasks!
    foreach $p (keys(%new_childs)){
      if(exists $tasks{$new_childs{$p}}){
        $tasks{$new_childs{$p}}->add_pid($p);
        qlog "Added: $p to $new_childs{$p}\n";
        $flag=1;
      }
      else{
        qlog "ERROR! No such task: $new_childs{$p} (for pid $p)\n";
      }
    }
  };
  if ($@) {
    qlog "Update childs: '$@' (p=$p; new_childs: ".join(',',keys(%new_childs)).")\n";
  }
  save_state() if $flag;
}# ~update_childs

#####################################################################
#
#  Updates a table with parent-child dependenses and pids info
#
# args: none
#
#####################################################################

sub update_pids() {
  my ( $p, @lines, $new_pids, $i );

  opendir( PROC, '/proc' ) or return;
  $new_pids   = {};
  foreach $p ( readdir(PROC) ) {
    next if ( $p !~ /^\d+$/ );
    next unless ( open( P, "</proc/$p/status" ) );
    while (<P>) {
      if (/Uid:\s+(\d+)\s+(\d+)/) {
        $new_pids->{$p}->{uid}  = $1;
        $new_pids->{$p}->{euid} = $2;
        next;
      }
      if (/Gid:\s+(\d+)\s+(\d+)/) {
        $new_pids->{$p}->{gid}  = $1;
        $new_pids->{$p}->{egid} = $2;
        next;
      }
      if (/PPid:\s+(\d+)/) {
        $new_pids->{$p}->{ppid} = $1;
        next;
      }
      if (/Name:\s+(\S+)/) {
        $new_pids->{$p}->{name} = $1;
        next;
      }
    }
    close P;
    next unless ( open( P, "</proc/$p/cmdline" ) );
    @lines = <P>;
    $new_pids->{$p}->{cmdline} = join( ' ', @lines );
    $new_pids->{$p}->{cmdline} =~ s/\0/ /g;
    close P;
  }
  closedir(PROC);
 UPDATE_PIDS_LOOP:
  foreach $p (keys(%{$all_pids})) {
    next if ( exists $new_pids->{$p} );
    foreach $i (@reaped) {
      next UPDATE_PIDS_LOOP if $i == $p;
    }
    push @reaped,   $p;
    push @reapcode, 127;
    qlog "DEAD $p ($all_pids->{$p}->{cmdline})\n";
  }
  $all_pids=$new_pids;
}

#####################################################################
#
#  Substitute pseudo-varibles by actual values
#  (actually - only $node)
#
# args: text
#       struct
#
#####################################################################

sub subst_task_prop( $$ ) {
  my ($_node) = $my_name;
  my ( $text, $child ) = @_;

  $$text =~ s/\$([\w\d_]+)/'$_'.$1/gee;
}

#################################################################################
##
##
##           LOCAL ACTIONS
##
#################################################################################

{
  my $last_pids_update;
  my $last_users_update;

  sub local_checks() {
    my $t = time;
    my $i;

    my $save_needed=0;

    if ( $t > $last_pids_update + get_setting('pids_update_interval') ) {
      update_pids();
      update_childs();

      #
      # hard pid kills
      #
      for $i ( keys(%delayed_kills) ) {
        if ( $delayed_kills{$i} <= $t ) {
          qlog "Delayed hard kill pid $i\n";
          if ( kill(0, $i) and !is_deprecated_pid($i) ) {
            kill 'KILL', -$i;
            kill 'KILL', $i;
          }
          delete $delayed_kills{$i};
        }
      }

#       #
#       # soft kill pids
#       #
#       for $i ( keys(%kill_em) ) {
#         if ( $kill_em{$i} <= $t ) {
#           qlog "Delayed kill of $i\n";
#           kill_pid($i);
#           delete $kill_em{$i};
#         }
#       }
      #
      # hard kill tasks
      #
      foreach $i ( keys(%delayed_task_kills) ) {
        if ( $delayed_task_kills{$i} <= $t ) {
          qlog "Delayed task kill: $i\n";
          $tasks{$i}->kill('KILL');
          unless (
                  deldir($tasks{$i}->get_attr('tmpdir'),
                         $tasks{$i}->get_attr('user'))
                 ) {
            qlog 'Cannot delete temp dir '.
              $tasks{$i}->get_attr('tmpdir').' by '.
                $tasks{$i}->get_attr('user')."\n";
          }
          answer_to_server(
                           'main',     0,
                           'finished', $tasks{$i}->get_attr('id'),
                           'code',     127,
                           'com_line', $tasks{$i}->get_attr('com_line'),
                           'owner',    $tasks{$i}->get_attr('owner'),
                           'is_rsh',   $tasks{$i}->get_attr('is_rsh'));#,
                           #'pid',      $i );
          delete $delayed_task_kills{$i};
          delete $tasks{$i};
          $save_needed=1;
        }
      }

      $last_pids_update = $t;
    }

    #
    # update users list every hour
    #
    if ( $t > $last_users_update + 3600 ) {
      get_users();
      $last_users_update = $t;
    }

    #
    # HARD kill pids
    #
    for $i ( keys(%kill_pids) ) {
      if ( $kill_pids{$i}->{time} > $t ) {
        kill_pid_action($i);
        delete $kill_pids{$i};
      }
    }

    #
    #  Check for delayed init attaches
    #
    if( $delayed_requests{init_attach}->{timeout} > $t ) {

      # request timed out

      $delayed_requests{init_attach}->{blocked} = 0;
      $delayed_requests{timeout} = 0;
      qlog
        "INIT_ATTACH timed out. No attach request was sent. Forget, try next.\n";
    }

    if ( @delayed_attaches
         and ( $delayed_requests{init_attach}->{blocked} == 0 ) ) {
      my $args = shift @delayed_attaches;
      qlog "Delayed init_attach ($args->[0])\n";
      init_attach_real_handler(@$args);
    }

    #
    # Do attach actions
    #
    if (@attach_requests) {
      for ( $i = 0; $i < @attach_requests; ) {
        attach_handler_second_stage( $attach_requests[$i] );
        if ( $attach_requests[$i]->{tmout} < $t ) {
          answer_to_server(
                           $attach_requests[$i]->{from},
                           $attach_requests[$i]->{hash},
                           'attach',
                           $attach_requests[$i]->{id},
                           'success',
                           '1' );
          splice( @attach_requests, $i, 1 );
          $delayed_requests{init_attach}->{blocked} = 0;
          $delayed_requests{init_attach}->{timeout} = 0;
        } else {
          ++$i;
        }
      }
    }

    #
    #  Check childs, who didn't send sigchild
    #
    if ( $t > $last_ran_check + get_setting('last_ran_check_interval') ) {
      ran_check();
      $last_ran_check = $t;
    }

    #
    #  read, send, check deads
    #
    read_from_childs();
    #flush_server_channel();
    Cleo::Conn::allflush;
    do_reap();

    save_state() if $save_needed;
    #select( undef, undef, undef, 0.2 );
  }
}
;

##############################################################################
##############################################################################
##############################################################################
##
##
##           THE MAIN PROGRAMM
##
##############################################################################
##############################################################################
##############################################################################

eval {
  my ($parent, $s,      @s,      $pid,       $i, $foreground,
      $f,      %answer, $config, $conf_load, $log_file );

  $SIG{CHLD}    = \&REAPER;
  $SIG{USR1}    = \&usr1_processor;
  $SIG{USR2}    = \&usr2_processor;
  $SIG{TERM}    = sub { qlog "GOT SIGTERM\n"; exit_handler(0,0,0,0);};
  $SIG{QUIT}    = sub { qlog "Error! QUIT...\n"; };
  $SIG{BUS}     = sub { qlog "Error! BUS...\n"; };
  $SIG{SEGV}    = sub { qlog "Error! SEVG...\n"; };
  $SIG{FPE}     = sub { qlog "Error! FPE...\n"; };
  $SIG{ILL}     = sub { qlog "Error! ILL...\n"; };
  $SIG{PIPE}    = sub { qlog "Error! PIPE...\n"; };
  $SIG{XCPU}    = sub { qlog "Error! XCPU...\n"; };
  $SIG{ALRM}    = sub { qlog "Error! Unhadled alarm\n"; die "alarm\n"; };
  $SIG{__DIE__} = sub { qlog "A!!!!!!!!!! I'm dying: '$_[0]'\n"; print_stack;};

  $foreground=0;

  $config = '/etc/cleo-mon.conf';
  while ( $ARGV[0] ) {
    if($ARGV[0] =~ /^-(\w)/){
      if($1 eq 'f'){
        $foreground=1;
      }
    }
    else{
      $config = $ARGV[0];
    }
    shift;
  }

  unless($foreground){
    open( STDIN,  '</dev/null' );
    open( STDOUT, '>/dev/null' );
    open( STDERR, '>/dev/null' );

    #  close STDIN;
    #  close STDOUT;
    #  close STDERR;
  }

  $server         = 'localhost';
  $last_ran_check = 0;

  %global_settings = %def_global_settings;
  if ( !load_config($config) ) {
    $conf_load = 1;

    #qlog "Loaded config from $config\n";
    foreach $i ( keys(%new_global_settings) ) {
      $global_settings{$i} = $new_global_settings{$i};
    }
  }

  if($foreground){
    $STATUS=*STDOUT;
  }
  else{
    usr1_processor();
  }
  $my_name = `uname -n`;
  chomp $my_name;

  setpriority( 0, $$, -19 );

  qlog "Started $my_name\n";

  qlog "Failed load conf file '$config'. Use defaults\n" unless $conf_load;

  load_state();

  qlog("Checking old ran tasks...\n");
  ran_check();
  qlog("Done checking old ran tasks...\n");

    $port = get_setting('port');
    $LST = Cleo::Conn->new_listen($port,16);
    unless(defined $LST){
        qlog "Cannot create listening socket. Exit.\n";
        exit(1);
    }

    if($LST->listen){
        qlog "Cannot create listening socket on $port. Exit.\n";
        exit(1);
    }

  daemonize() unless $foreground;

  if ( $my_name eq '' or $my_name eq '(none)' ) {

    # empty name!
    $0 = 'cleo-mon WAITING FOR HOSTNAME';

    while ( $my_name eq '' ) {
      sleep 10;
    }
  }

  $0 = 'cleo-mon';

#  $port = get_setting('smart_port');
#  $SH   = make_listen_socket($port)
#    or die "Cannot make listen socket on smart port '$port'\n";

  register_mon_rcv( 'init',          \&init_handler );
  register_mon_rcv( 'ping',          \&ping_handler );
  register_mon_rcv( 'run',           \&run_handler );
  register_mon_rcv( 'run_first',     \&run_first_handler );
  register_mon_rcv( 'kill',          \&kill_handler );
  register_mon_rcv( 'kill_pid',      \&kill_pid_handler );
  register_mon_rcv( 'stat',          \&stat_handler );
  register_mon_rcv( 'exit',          \&exit_handler );
  register_mon_rcv( 'init_attach',   \&init_attach_handler );
  register_mon_rcv( 'attach',        \&attach_handler );
  register_mon_rcv( 'internal_info', \&int_info_handler );
  register_mon_rcv( 'signal',        \&signal_handler );

  unless ( open( PID, ">/var/run/qmon.pid" ) )
    {                           # or die "Cannot write pid to /var/run/qmon.pid!\n";
      open( PID, ">/tmp/qmon.pid" );
    }
  print PID $$;
  close PID;

  #####################################################################
  #
  # The main loop
  #
  #####################################################################
  for ( ;; ) {

    #
    #  Check new connections from server, if needed
    #

    if(!defined $SRV or ($SRV->get_state ne 'ok')){
        if(defined $SRV){
            qlog "Lost server connection\n";
            $SRV->disconnect;
            undef $SRV;
        }
        if($LST->get_state eq 'dead'){
            $LST->listen;
        }
        if($LST->get_state eq 'listen'){
            $SRV=$LST->accept;
            if(defined $SRV){
                qlog "Connection from server ".$SRV->get_peer()."!\n";
                $init_conn_time = time;
            }
        }
    }
    #
    #  Check new messages from server
    #
    rcv_from_server();

    #
    #  Check new shells
    #
#    $shell_conn = $SH->accept();
#    if ($shell_conn) {
#      update_pids();
#      update_childs();
#      eval {
#        qlog "New rshell!\n";
#        $SIG{ALRM} = sub { die "rsh"; };
#        alarm 2;
#        @s = <$shell_conn>;
#        close $shell_conn;
#        $s = shift @s;
#        if ( $s =~ /(\d+)/ ) {
#          $pid = $1;
#          qlog "PID=$pid !\n";
#          push @fake_rshells, $pid;
#          $parent = 0;
#        RSH_LOOP:
#          foreach $i ( keys(%tasks) ) {
#            if($tasks{$i}->check_pid($pid)){
#              qlog "Belongs to task $i\n";
#              last RSH_LOOP;
#            }
#          }
#        }
#
#        $s = shift(@s);
#        qlog "Got line: '$s'\n";
#
#        unless ($parent) {
#
#          alarm 0;
#          die "FAKE RSH - no parent!!!($s)\n";
#        }
#
#        if ( $s eq '' ) {
#          qlog "empty rsh answer!\n";
#          alarm 0;
#          return;
#        }
#        $s .= "\n" if ( substr( $s, -1, 1 ) ne "\n" );
#
#        %answer             = ();
#        $answer{user}       = $tasks{$i}->get_attr('user');
#        $answer{id}         = $tasks{$i}->get_attr('id');
#        $answer{owner}      = $tasks{$i}->get_attr('owner');
#        $answer{stdout}     = $tasks{$i}->get_attr('stdout');
#        $answer{stderr}     = $tasks{$i}->get_attr('stderr');
#        $answer{stdin}      = $tasks{$i}->get_attr('stdin');
#        $answer{outfile}    = $tasks{$i}->get_attr('outfile');
#        $answer{rsh_string} = $s;
#
#        $answer{pid} = $pid;
#
#        qlog "Success!\n";
#        answer_to_server( 'main', 0, 'run_sh', 1, %answer );
#      };
#      alarm 0;
#      qlog "Ops: $@\n" if $@;
#    }

    #
    #  Make all local checks
    #
    local_checks();

    #
    #  Send answers to server
    #
    #flush_to_server();
    Cleo::Conn::allflush;
    select(undef,undef,undef,0.2);
  }
};                              # ~eval

qlog "Monitor has die. Reason:$@\n";
print_stack;
close STATUS;
die "Ooops... I cant belive it! ($@) [$ENV{PATH}]\n";

#####################################################################
#####################################################################
#####################################################################
#####################################################################
#####################################################################

#####################################################################
#
#  Handlers
#
#####################################################################

#
#   INIT         HANDLER
#
#####################################################################
sub init_handler( $$$$ ) {
  my ( $type, $hash, $from, $args ) = @_;
  my $i;

  qlog "HANDLER: INIT\n";
#  qlog "Creating connection to $server:$args->{port}\n";
  foreach $i ( keys(%$args) ) {
    next if ( $i eq 'port' or $i eq 'auth' );
    $global_settings{$i} = $args->{$i};
    qlog "Setted parameter '$i' to '$args->{$i}'\n";
  }
#  $To_server = IO::Socket::INET->new(
#                                     PeerAddr => $server,
#                                     PeerPort => $args->{port},
#                                     Proto    => 'tcp' );
#  unless ($To_server) {
#    qlog "Cannot create socket: $!\n";
#    return;
#  }
#  qlog "Success\n";
  $init_conn_time = 0;          # Reset timeout
}

#
#   PING         HANDLER
#
#####################################################################
sub ping_handler( $$$$ ) {
  my ( $type, $hash, $from, $args ) = @_;

  qlog "HANDLER: PING\n" if($debug_level>1);
  answer_to_server( $from, $hash, 'ping', 1 );
}

#
#   RUN_FIRST          HANDLER
#
#####################################################################
sub run_first_handler( $$$$ ) {
  my ( $type, $hash, $from, $args ) = @_;

  my ( %answer, @files, @files2, $i, $j, $f, $found_file );
  my ( $e, $user, $group, $pid );

  qlog "HANDLER: RUN_FIRST (id=$args->{id}, owner=$args->{owner})\n";

  $answer{id}    = $args->{id};
  $answer{owner} = $args->{owner};

  if ( $args->{file_mask} ne '' ) {
    statfiles( $args->{file_mask}, \@files );
    $pid = execute_task( $args, \%answer );
    if ( $pid < 0 ) {
      $answer{reason} = 'cannot execute';
      answer_to_server( $from, $hash, 'run_first', 0, %answer );
      return;
    }
    sleep 3;
    statfiles( $args->{file_mask}, \@files2 );
    $found_file = '';
  RFH_LOOP:
    while ( $j = pop @files2 ) {
      for ( $i = 0; $i <= $#files; ++$i ) {
        if ( $j eq $files[$i] ) {
          splice( @files, $i, 1 );
          next RFH_LOOP;
        }
      }
      $found_file = $j;
      last;
    }
    if ( $found_file ne '' ) {
      if ( open( F, "<$found_file" ) ) {
        my @lines = <F>;
        close F;
        my $l = pack( 'u', join( '', @lines ) );
        $l =~ s/\n//g;
        $answer{file} = $l;
        answer_to_server( $from, $hash, 'run_first', 1, %answer );
        return;
      }
    }
    $answer{reason} = 'no file by mask';
    answer_to_server( $from, $hash, 'run_first', 0, %answer );
  } else {

    #  use 'smart' rsh replacements
    eval {
      $pid = execute_task( $args, \%answer );
      if ( $pid < 1 ) {
        qlog "Exec failed!\n";
        alarm 0;
        $answer{reason} = 'exec failed';
        answer_to_server( $from, $hash, 'run_first', 0, %answer );
        return;
      }
      answer_to_server( $from, $hash, 'run_first', 1, %answer );

      #       $rsh_num=$args->{nproc}-($args->{count_first}?1:0);
      #       $rsh_hash=$hash;
      #       $rsh_from=$from;
    };
    alarm 0;
    update_pids();
    update_childs();
  }
}

#
#   RUN          HANDLER
#
#####################################################################
sub run_handler( $$$$ ) {
  my ( $type, $hash, $from, $args ) = @_;

  my ( %answer, $pipe_stdin, $pipe_stdout, $pipe_stderr, $pid );
  my ( $e,      $user,       $group , $t);

  qlog "HANDLER: RUN (id=$args->{id}, owner=$args->{owner})\n";

  $answer{id}    = $args->{id};
  $answer{owner} = $args->{owner};

  $args->{node} = $my_name;
  if ( $args->{second_run} eq $my_name ) {
    unless(exists($tasks{"$args->{owner}:$args->{id}"})) {
      $answer{reason} = 'no such task!';
      answer_to_server( $from, $hash, 'run', 0, %answer );
      qlog
        "NO SUCH TASK! ($args->{id},$args->{owner},rsh_pid=$args->{rsh_pid})\n";
      return;
    }
    my $new_pid = execute_task( $args, \%answer );
    if ( $new_pid < 1 ) {
      $answer{reason} = 'cannot execute';
      answer_to_server( $from, $hash, 'run', 0, %answer );
    }
    $t="$args->{owner}:$args->{id}";
    $tasks{$t}=Task::new();
    $tasks{$t}->set_head($new_pid);
    $tasks{$t}->set_attr('rsh_pid',$args->{rsh_pid});
    $tasks{$t}->set_attr('user',$args->{user});
    $tasks{$t}->set_attr('id',$args->{id});
    $tasks{$t}->set_attr('hard_kill_after_head',get_setting('hard_kill_after_head'));

    $answer{pid} = $new_pid;
    qlog
      "New TASK RUNNED!($args->{id})[$new_pid/$ran{$new_pid}->{rsh_pid}]\n";
  } else {
    $pid = execute_task( $args, \%answer );
    if ( $pid < 1 ) {
      $answer{reason} = 'cannot execute';
      answer_to_server( $from, $hash, 'run', 0, %answer );
    }

    # successfully runned task. fill attributes
    $answer{pid} = $pid;
    $t="$args->{owner}:$args->{id}";
    $tasks{$t}=Task::new();
    $tasks{$t}->set_head($pid);
    $tasks{$t}->set_attr('rsh_pid',$args->{rsh_pid});
    $tasks{$t}->set_attr('user',$args->{user});
    $tasks{$t}->set_attr('id',$args->{id});
    $tasks{$t}->set_attr('hard_kill_after_head',get_setting('hard_kill_after_head'));

    qlog "TASK RUNNED! [$pid]($args->{rsh_pid}/$args->{id})\n";
  }
  select( undef, undef, undef, 0.1 );
  update_pids();
  update_childs();
  answer_to_server( $from, $hash, 'run', 1, %answer );
}                               # run_handler

#
#   KILL         HANDLER
#
#   kills by id and owner.
#
#####################################################################
sub kill_handler( $$$$ ) {
  my ( $type, $hash, $from, $args ) = @_;

  my ( $pid, @pids, $i, $name );

  qlog "HANDLER: KILL (id=$args->{id}, owner=$args->{owner}, ".
       "user=$args->{user}, task=$args->{task})\n";
#  qlog "SOFT KILL: " . join( ':', %$args ) . ";\n";
  update_pids();
  update_childs();

  $name="$args->{owner}:$args->{id}";
  if(exists($tasks{$name})){
    # task exists now

    # do soft kill
    unless($tasks{$name}->is_dead){
      $tasks{$name}->kill('TERM');
      $delayed_task_kills{$name}=
        time+$tasks{$name}->get_attr('hard_kill_after_head');
      $tasks{$name}->mark_dead;
    }
    answer_to_server( $from, $hash, 'kill', $args->{id}, 'success', 1 );
  }
  else{
    answer_to_server( $from, $hash, 'kill', $args->{id}, 'success', 1,
                      'reason', 'No such task!' );
  }

  save_state();
  qlog "SOFT KILL finished\n";
} # kill_handler

#
#   KILL_PID     HANDLER
#
#   kills by pid
#
#####################################################################
sub kill_pid_handler( $$$$ ) {
  my ( $type, $hash, $from, $args ) = @_;

  my ( $pid, @pids );

  qlog "HANDLER: KILL_PID (id=$args->{id}, pid=$args->{pid}, ".
       "owner=$args->{owner}, user=$args->{user}, task=$args->{task})\n";
#  qlog "KILL_PID: " . join( ':', %$args ) . ";\n";
  if ( $args->{pid} ) {
    $args->{wait_secs} = 0 unless defined $args->{wait_secs};
    $kill_pids{ $args->{pid} }->{time} = time + $args->{wait_secs};
    $kill_pids{ $args->{pid} }->{hash} = $hash;
    $kill_pids{ $args->{pid} }->{from} = $from;
    $kill_pids{ $args->{pid} }->{id}   = $args->{id};
  } else {
    answer_to_server( $from, $hash, 'kill_pid', $args->{id}, 'success',
                      1 );
  }
  save_state();
} # kill_pid_handler

sub kill_pid_action( $ ) {
  my $pid = $_[0];
  kill_pid($pid);
  answer_to_server(
                   $kill_pids{$pid}->{from},
                   $kill_pids{$pid}->{hash},
                   'kill_pid', $kill_pids{$pid}->{id},
                   'success', 1 );
}

#
#   EXIT         HANDLER
#
#   exits the monitor.
#
#####################################################################
sub exit_handler( $$$$ ) {

  qlog "Exiting...\n";
  save_state();
  kill_pid('all');
  qlog "Shutdown.\n";
  exit(0);
}                               # exit_handler

#
#   STAT         HANDLER
#
#####################################################################
sub stat_handler( $$$$ ) {
  my ( $type, $hash, $from, $args ) = @_;

  answer_to_server( $from, $hash, 'stat', 0, 'reason',
                    'Not implemented yet' );
}                               # kill_handler

#
#   INTERNAL_INFO         HANDLER
#
#####################################################################
sub int_info_handler( $$$$ ) {
  my ( $type, $hash, $from, $args ) = @_;
  my @val;

  # update node name...
  $my_name = `uname -n`;
  chomp $my_name;

  foreach my $i ( keys(%tasks) ) {
#    $ran{$i}->{is_rsh} = 0 unless ( $ran{$i}->{is_rsh} );
    my @pids=$tasks{$i}->get_pids();
    my $pid=scalar(@pids)>0?$pids[0]:0;

    # empty task?
    if($pid==0){
        qlog "Empty task found ($i)! detele info...\n";
        delete $tasks{$i};
        next;
    }
    my $is_rsh=$tasks{$i}->get_attr('is_rsh');
    $is_rsh=0 if($is_rsh eq '');
    my $owner=$tasks{$i}->get_attr('owner');
    if($owner eq ''){
        $owner='_none_';
        $tasks{$i}->set_attr('owner','_none_');
    }
    my $id=$tasks{$i}->get_attr('id');
    if($id eq ''){
        $id='0';
        $tasks{$i}->set_attr('id',0);
    }

    push @val,
      "id:$id owner:$owner is_rsh:$is_rsh pid:$pid";
  }
  push @val, "ver:$VERSION";
  answer_to_server( $from, $hash, 'internal_info', 1, 'val',
                    join( '#', @val ) );
}                               # int_info_handler

#
#  PART of INIT_ATTACH HANDLER, which does actual work
#
#
####################################################################
sub init_attach_real_handler($$$$$$) {
  my ( $hash, $from, $attach_parent_mask, $attach_exe_mask, $attach_user,
       $tmout )
    = @_;
  my (%x,$t);

  qlog
    "REAL init_attach  ($attach_parent_mask/$attach_exe_mask/$attach_user)\n";

  %x = ( 'node' => $my_name );
  subst_task_prop( \$attach_exe_mask,    \%x );
  subst_task_prop( \$attach_parent_mask, \%x );
  @attach_collected =
    collect_pids( $attach_parent_mask, $attach_exe_mask, $attach_user );
  qlog
    "init_attach successfull ($attach_parent_mask/$attach_exe_mask/$attach_user)"
      . scalar(@attach_collected) . "\n";
  qlog "PIDS: " . join( ';', @attach_collected ) . "\n";

  answer_to_server( $from, $hash, 'init_attach', 0, 'success', '1' );
  $delayed_requests{init_attach}->{timeout} = time + $tmout - 1;
  $delayed_requests{init_attach}->{blocked} = 1;
}

#
#   INIT_ATTACH         HANDLER
#
#####################################################################
sub init_attach_handler( $$$$ ) {
  my ( $type, $hash, $from, $args ) = @_;

  $attach_parent_mask = $args->{parent_mask};
  $attach_exe_mask    = $args->{exe_mask};
  $attach_user        = $args->{user};
  $attach_id          = $args->{id};
  $attach_owner       = $args->{owner};

  qlog "HANDLER: INIT_ATTACH1 (id=$args->{id}, ".
       "owner=$args->{owner}, user=$args->{user})\n";
#  qlog "init_attach  ($attach_parent_mask/$attach_exe_mask/$attach_user)\n";
  if ( $delayed_requests{init_attach}->{blocked} ) {

    # some attach is in progress... wait.

    qlog "Another attach in progress. Switch!!!\n";

    push @delayed_attaches,
      [
       $hash,            $from,        $attach_parent_mask,
       $attach_exe_mask, $attach_user, $args->{tmout} ];
    return;
  }

  my $t="$args->{owner}:$args->{id}";
  $tasks{$t}=Task::new();

  init_attach_real_handler( $hash, $from, $attach_parent_mask,
                            $attach_exe_mask, $attach_user, $args->{tmout} );
  return;
}                               # init_attach_handler

#
#   ATTACH         HANDLER
#
#####################################################################
sub attach_handler( $$$$ ) {
  my ( $type, $hash, $from, $args ) = @_;
  my @collected;
  my %args2 = %$args;


  qlog "HANDLER: ATTACH (id=$args->{id}, owner=$args->{owner}, ".
       "user=$args->{user}, tmout=$args->{tmout}, all=$args->{all})\n";
#  qlog
#    "ATTACH: id=$args->{id}/owner=$args->{owner}/tmout=$args->{tmout}/all=$args->{all}\n";

  push @collected, @attach_collected;
  $args2{collected}= \@collected;
  $args2{from}     = $from;
  $args2{hash}     = $hash;
  $args2{id}       = $args->{id};
  $args2{owner}    = $args->{owner};
  $args2{user}     = $args->{user};
  $args2{user}     = $attach_user if ($args2{user} eq '');
  $args2{tmout}    = 60 if ( $args->{tmout} < 1 );
  $args2{tmout} += time;

  push @attach_requests, \%args2;

  #qlog ":: $attach_requests[0]->{tmout}\n";

  sleep 1;
  attach_handler_second_stage( \%args2 );
}                               # attach_handler

#
#   ATTACH         HANDLER (SECOND STAGE)
#
#####################################################################
sub attach_handler_second_stage( $ ) {
  my $args = $_[0];
  my ( @new_coll, @new_attached, $collected, $p, $times, $i, $t );

  $collected = $args->{collected};

  #  return if ( scalar(@$attached) > 0 and $args->{all} == 0 );


  $t="$args->{owner}:$args->{id}";

  qlog "ATT2: '$t' ($args->{user})\n";

  if(!defined $tasks{$t}){
      qlog "Warning! Task '$t' does not exists yet (init_attach failed?)\n";
      $tasks{$t}=Task::new();
  }
#  $tasks{$t}->set_head($new_pid);
  $tasks{$t}->set_attr('rsh_pid',$args->{rsh_pid});
  $tasks{$t}->set_attr('id',$args->{id});
  $tasks{$t}->set_attr('user',$args->{user});
  $tasks{$t}->set_attr('owner',$args->{owner});
  $tasks{$t}->set_attr('hard_kill_after_head',get_setting('hard_kill_after_head'));

  if($args->{temp_dir} ne ''){
    launch(0,"mkdir $args->{temp_dir}", "mkdir-$t", $args->{user});
    launch(0,"chmod 0700 $args->{temp_dir}", "chmod-$t", $args->{user});
  }

  update_pids();
  update_childs();
  @new_coll =
    collect_pids( $attach_parent_mask, $attach_exe_mask, $attach_user );
  if ( @new_coll > @$collected ) {
    qlog "Collected($attach_parent_mask,$attach_exe_mask,$attach_user): "
      . scalar(@new_coll) . "\n";
    qlog "Was collected: " . scalar(@$collected) . "\n";
  ATTACH_LOOP:
    foreach $p (@new_coll) {

      foreach $i (values(%tasks)){
        # skip already runned tasks
        next ATTACH_LOOP if($i->check_pid($p));
        # skip cleo launching processes
        next ATTACH_LOOP if($all_pids->{$p}->{name} =~ /^CLEO LAUNCH/);
      }

      #skip already counted
      next if grep { $_ eq $p } @$collected;

      # take in account
      push @new_attached, $p;
      qlog "Attach to $args->{owner}:$args->{id} successfull - $p".
           " ($all_pids->{$p}->{cmdline})\n";

      unless ( $args->{all} ) {
        qlog "Exit attaching!\n";
        last;
      }
    }
  }
  $args->{collected} = \@new_coll;

  # all new pids were collected
  return unless (@new_attached);

  # add pids to the task
  foreach $p (@new_attached) {
    $tasks{$t}->add_pid($p);

#    qlog "ATT: $p: $ran{$p}->{id}/$ran{$p}->{owner}/$ran{$p}->{name};\n";
    return unless ( $args->{all} );
  }
  save_state();

  #  answer_to_server($from,$hash,'attach',$args->{id},'success','1');
} # attach_handler_second_stage

#
#   SIGNAL (FREEZE)      HANDLER
#
#####################################################################
sub signal_handler( $$$$ ) {
  my ( $type, $hash, $from, $args ) = @_;

  my ( $pid, @pids, $i, $name );

  qlog "HANDLER: SIGNAL (id=$args->{id}, owner=$args->{owner}, ".
       "user=$args->{user}, task=$args->{task}, val=$args->{val})\n";
  update_pids();
  update_childs();

  $name="$args->{owner}:$args->{id}";
  if(exists($tasks{$name})){
    # task exists now

    # do freeze
    unless($tasks{$name}->is_dead){
        qlog "DO signal '$args->{val}' on task '$name'\n";
        $tasks{$name}->kill($args->{val});
    }
    answer_to_server( $from, $hash, 'signal', $args->{id}, 'success', 1 );
  }
  else{
    answer_to_server( $from, $hash, 'signal', $args->{id}, 'success', 1,
                      'reason', 'No such task!' );
  }

  save_state();
  qlog "SIGNAL finished\n";
} # freeze_handler


sub check_delayed_requests() {
  my $i;

  foreach $i ( keys(%delayed_requests) ) {
    if ( scalar( @{ $delayed_requests{$i}->{req} } ) < 1 ) {
      delete $delayed_requests{$i};
      next;
    }
    next if $delayed_requests{$i}->{blocked};

    #    $

  }

}

sub execute_task($$ ) {
  my ( $args, $answer ) = @_;

  my ($the_suexec_gid, $user,        $group,
      $pipe_stdin,     $pipe_stdout, $pipe_stderr, $pid,
      $e,              $v,           $g, $t );

 RUN_H_CRADDLE:
  {                             # ^
    if ( defined $args->{suexec_gid} ) {
      $args->{suexec_gid} =~ /^(\d+)/;
      $the_suexec_gid = $1;
    } else {
      $the_suexec_gid = get_setting("suexec_gid");
    }

    $answer->{error} = "Cannot chdir to $args->{dir}";
    chdir $args->{dir} or last RUN_H_CRADDLE;
    $user=get_uid($args->{user});
    foreach $g ( split( /\s+/, $args->{group} ) ) {
      $group .= get_gid($g).' ';
    }
    if ( $args->{pre_exec} =~ /\S/ ) {
      eval {
        local $SIG{ALRM} = sub { die "q_pre timeout\n"; };
        alarm 10;
        qlog "alarm 10\n" if ($_d_alarm_log);
        system("$args->{pre_exec}");
        qlog "pre-exec ($args->{pre_exec}) Succeed\n";
        alarm 0;
        qlog "alarm 0\n" if ($_d_alarm_log);
      };
    }
    if ( $args->{just_exec} =~ /\S/ ) {
      launch( 10, $args->{just_exec}, "$args->{owner}.$args->{id}" );
    }

    qlog "forking!\n";
    $pid = fork();
    unless ( defined $pid ) {
      $answer->{error} = "Cannot fork.";
      last RUN_H_CRADDLE;
    }
    if ( $pid < 0 ) {
      $answer->{error} = "Cannot fork.";
      last RUN_H_CRADDLE;
    }

    if ( $pid > 0 ) {           # parent
      qlog "My child->$pid ($args->{owner}/$args->{id})\n";

      $t="$args->{owner}:$args->{id}";
      $tasks{$t}=Task::new();
      $tasks{$t}->set_head($pid);
      $tasks{$t}->set_attr('rsh_pid',$args->{rsh_pid});
      $tasks{$t}->set_attr('id',$args->{id});
      $tasks{$t}->set_attr('user',$args->{user});
      $tasks{$t}->set_attr('owner',$args->{owner});
      $tasks{$t}->set_attr('hard_kill_after_head',get_setting('hard_kill_after_head'));
      $tasks{$t}->set_attr('group',$args->{group});
      $tasks{$t}->set_attr('com_line',$args->{com_line});
      $tasks{$t}->set_attr('temp_dir',$args->{temp_dir});
      $tasks{$t}->set_attr('env',$args->{env});
      $tasks{$t}->set_attr('dir',$args->{dir});

      undef $answer->{error};
      save_state();
      return $pid;
    } else {                    # child

      $my_name .= "[ch]";
      qlog "Child ($$)!\n";

      # Change process group
      #
      #      setpgrp($$,$$);

      $< = $> = get_uid($args->{user});

      # Override temp dir
      if ( -e $args->{temp_dir} ) {
        qlog
          "Warning! Temp dir '$args->{temp_dir}' already exists! Reset to /tmp\n";
        $args->{temp_dir} = "/tmp";
      }
      unless ( mkdir( $args->{temp_dir}, 0700 ) ) {
        qlog
          "Warning! Cannot create temp dir '$args->{temp_dir}'! Reset to /tmp\n";
        $args->{temp_dir} = "/tmp";
      }
      $ENV{TEMP_DIR} = $args->{temp_dir};

      # Change path...
      $ENV{PATH} =
        get_setting('path_prepend') . ':'
          . $ENV{PATH} . ':'
            . get_setting('path_append');
      $ENV{P4_RSHCOMMAND} =
        get_setting('global_rsh_command'); #'/home/root/q4/rsh';

      #qlog "PATH=$ENV{PATH};\n";

      qlog "Use uid=$<,$>;gid=$(,$)\n";

      chdir( $args->{dir} ) or chdir('/tmp');

      foreach $e ( @{ $args->{env} } ) {
        unless ( $e =~ /^(\S+)\s*=\s*(.*)$/ ) {
          qlog "ERROR! Bad env: '$e'\n";
          next;
        }
        $ENV{$1} = $2;
        qlog "ENV '$1'=$2.\n";
      }

      # Execute!
      qlog "Executable: '$args->{com_line}'\n";

#      eval { close $LST; };
#      eval { close $SH; };
#      eval { close $From_server; };
#      eval { close $To_server; };
        $SRV->disconnect;

      #      close $STATUS;
      #      $STATUS=new IO::File(">/tmp/qqq");
      qlog "Opening stdin ($args->{stdin})\n";
      if ( ( $args->{stdin} =~ /\S/ ) && ( $args->{stdin} ne '-' ) ) {
        qlog "Opening stdin ($args->{stdin})!!!!\n";
        $args->{stdin} =~ tr/\|\`\&\#\$\@\<\>//;

        #sysopen(STDIN,"$args->{stdin}",O_RDONLY)
        #  or qlog "Cannot open input file ($args->{stdin})\n";
        my $fd = POSIX::open( "$args->{stdin}", O_RDONLY | O_CREAT );
        POSIX::dup2( $fd, 0 );
        qlog "opened stdin ($args->{stdin})\n";

        #        usr1_processor();
      } else {

        #qlog "redirect stdin to pipe ".(0+$pipe_stdin->fileno)."/".(0+fileno(STDIN))."\n";
        qlog "redirect stdin to null\n";

        # redirect to /dev/null
        my $fd = POSIX::open( '/dev/null', O_RDONLY );
        POSIX::dup2( $fd, 0 );

        #        open(STDIN,'</dev/null');
        #          if(!defined (POSIX::dup2(INULL->fileno,0))){
        #            qlog "Cannot redirect stdin to pipe [$!]\n";
        #          }
      }
      qlog "Use3\n";
      if ( ( $args->{stderr} =~ /\S/ ) && ( $args->{stderr} ne '-' ) ) {
        $args->{stderr} =~ tr/\|\`\&\#\$\@\<\>//;

        #        sysopen(STDERR,"$args->{stderr}",O_WRONLY|O_APPEND|O_CREAT|O_LARGEFILE)
        #          or die "Cannot open error file ($args->{stderr})\n";
        my $fd =
          POSIX::open( "$args->{stderr}",
                       O_WRONLY | O_APPEND | O_CREAT | O_LARGEFILE );
        POSIX::dup2( $fd, 2 );
        qlog "Opened ($args->{stderr}) for stderr\n";
      } else {
        if ( $args->{outfile} =~ /\S/ ) {

          #          sysopen(STDERR,"$args->{outfile}",O_WRONLY|O_APPEND|O_CREAT|O_LARGEFILE)
          #            or qlog "Cannot open errors file ($args->{outfile})\n";
          my $fd =
            POSIX::open( "$args->{outfile}",
                         O_WRONLY | O_APPEND | O_CREAT | O_LARGEFILE );
          POSIX::dup2( $fd, 2 );
          qlog "Opened errors file ($args->{outfile})\n";
        } else {

          # redirect to pipe!
          #          open(STDERR,'>/dev/null');
          my $fd =
            POSIX::open( '/dev/null',
                         O_WRONLY | O_APPEND | O_CREAT | O_LARGEFILE );
          POSIX::dup2( $fd, 2 );
          qlog "stderr -> /dev/null\n";

        }
      }

      #      qlog  "Use2\n";
      if ( ( $args->{stdout} =~ /\S/ ) && ( $args->{stdout} ne '-' ) ) {
        $args->{stdout} =~ tr/\|\`\&\#\$\@\<\>//;

        #        sysopen(STDOUT,"$args->{stdout}",O_WRONLY|O_APPEND|O_CREAT|O_LARGEFILE)
        #          or die "Cannot open stdout file ($args->{stdout})\n";
        my $fd =
          POSIX::open( "$args->{stdout}",
                       O_WRONLY | O_APPEND | O_CREAT | O_LARGEFILE );
        POSIX::dup2( $fd, 1 );
        qlog "Opened ($args->{stdout}) for stdout\n";
      } else {
        if ( $args->{outfile} =~ /\S/ ) {

          #          sysopen(STDOUT,"$args->{outfile}",O_WRONLY|O_APPEND|O_CREAT|O_LARGEFILE)
          #            or qlog "Cannot open output file ($args->{outfile}) [$!]\n";
          my $fd =
            POSIX::open( "$args->{outfile}",
                         O_WRONLY | O_APPEND | O_CREAT | O_LARGEFILE );
          POSIX::dup2( $fd, 1 );
          qlog "Opened ($args->{outfile}) for outfile\n";
        } else {

          # redirect to pipe!
          qlog "Redirect output to null\n";

          #          open(STDOUT,'>/dev/null');
          my $fd =
            POSIX::open( '/dev/null',
                         O_WRONLY | O_APPEND | O_CREAT | O_LARGEFILE );
          POSIX::dup2( $fd, 1 );

          #          POSIX::dup2(ONULL->fileno,1);
          #          POSIX::dup2($pipe_stdout->fileno(),fileno(STDOUT));
          #          fcntl(STDOUT,F_SETFL,fcntl(STDOUT,F_GETFL,0)|O_WRONLY);
          qlog "redirect out to pipe\n";
        }
      }

      no strict;
      qlog "Execute...\n";
      eval { close $STATUS; };
      setpriority( 0, $$, 0 );
      exec( $args->{com_line} );
      qlog "print \"Cannot execute '$args->{com_line}' on $my_name\n";
      exit(1);
    }
  }
  qlog "Error: $answer->{error}\n";
  return -1;
}

sub statfiles($$ ) {
  my ( $mask, $list ) = @_;

  my ( $path, $filemask, $i, $j );

  #  qlog "Statfiles $mask\n";
  $mask =~ m{(/.*)/([^/]+)};
  ( $path, $filemask ) = ( $1, $2 );
  chdir($path) or return;
  opendir( D, "." ) or return;

  #  qlog "Current dir: $path; mask=$filemask\n";
  foreach $i ( readdir(D) ) {
    next unless -f $i;
    eval "(\$j)=\$i =~ m{^($filemask)\$};";
    if ( $j ne '' ) {

      #      qlog ">> $j\n";
      push @$list, $i;
    }
  }
  closedir(D);

  #  qlog "Files:".join(';',@$list)."\n";
}

#
#  Saves the line, named by first argument with
#  value REFERENCED by second agrument.
#
#############################################################
sub save_xml($$ ) {
  if (   ( ref( $_[1] ) eq 'REF' )
         || ( ref( $_[1] ) eq 'CODE' )
         || ( ref( $_[1] ) eq 'GLOB' ) ) {
    return;
  }
  my $sav=$_[1];

  print SAV " <$_[0]>\n";
  if ( ref( $_[1] ) eq 'ARRAY' ) {
    if(scalar(@{$sav})>0){
      print SAV " <array><el>" .
        join( '</el><el>', @{ $sav } ).
          '</el></array>';
    }
    else{
      print SAV ' <array></array>';
    }
  } elsif ( ref( $sav ) eq 'HASH' ) {
    print SAV ' <hash>' .
      join( '', map {'<$_>$sav{$_}</$_>'} keys(%{$sav})).
        '</hash>';
  } elsif ( $_[1] =~ y/\0\n\r// ) {
    my $p = pack( 'u', $sav );
    $p =~ y/\n\r//d;
    print SAV "<packed>$p</packed>";
  } else {
    print SAV "<scalar>$sav</scalar>";
  }
  print SAV "</$_[0]>\n";
}                               #~save_line

#
#  Loads the REST of line (only value!).
#  Args: rest of line
#  Returns readed scalar or the ref to a hash or an array.
#
###########################################################
sub load_line( $ ) {
  if ( substr( $_[0], 0, 2 ) eq "\0A" ) {
    my @x = split( "\0", substr( $_[0], 2 ) );
    qlog "Loaded array: " . join( ';', @x ) . "\n";
    return \@x;
  } elsif ( substr( $_[0], 0, 2 ) eq "\0H" ) {
    my %x = split( "\0", substr( $_[0], 2 ) );
    qlog "Loaded hash: " . join( ';', %x ) . "\n";
    return \%x;
  } elsif ( substr( $_[0], 0, 2 ) eq "\0U" ) {
    my $x = unpack( 'u', substr( $_[0], 2 ) );
    qlog "Unpacked: $x\n";
    return $x;
  } else {
    my $x = $_[0];
    return $x;
  }
}                               #~load_line

#
#  Saves the monitor state
#
#########################################################
sub save_state() {

  my ( $i, $attr, $val );


  $i = get_setting('mon_save');
  if ( !rename( $i, "$i.bak" ) ) {
    qlog "Cannot to create backup of '$i'\n";
  }
  if ( !open( SAV, ">$i" ) ) {
    qlog "Cannot save status to '$i'\n";
    return;
  }
  qlog "Save status to '$i' ". ( caller(1) )[2] . "  " . ( caller(1) )[3] . "\n",;

  print SAV "<xml>\n";
  foreach $i ( keys(%tasks) ) {
    if ( $i eq '' ) {
      qlog "Warning! Empty task found. Deleted\n";
      delete $tasks{$i};
      next;
    }
#    if ( $i < 1 ) {
#      qlog "Warning! Bad task found ($i). Deleted\n";
#      delete $tasks{$i};
#      next;
#    }
    qlog "save $i\n";
    print SAV "<task>\n <task_id>$i</task_id>\n";
    my @pids=$tasks{$i}->get_pids();
    save_xml( 'pids', \@pids );
    foreach $attr ($tasks{$i}->list_attrs()){
      save_xml($attr, $tasks{$i}->get_attr($attr));
    }
    print SAV "</task>\n";
  }
  print SAV "</xml>\n";
  close SAV;
  qlog "Saving done\n";
}

#
#  Loads the monitor state
#
#########################################################
sub old_load_state() {

  my ( $key, $val, %e );

  $val = get_setting('mon_save');
  unless ( open( SAV, "<$val" ) ) {
    qlog "Cannot LOAD status from '$val'\n";
    return;
  }

  <SAV>;
  %e   = ();
  %ran = ();
  qlog "Loading state ($val)\n";
  while (<SAV>) {
    chomp;
    if (/^\#/) {                # new entry
      if ( $e{pid} > 0 ) {
        %{ $ran{ $e{pid} } } = %e;
        qlog "$e{pid} loaded\n";
      } else {
        qlog "Warning! Bad pid! (" . join( ';', %e ) . "\n";
      }
      %e = ();
      next;
    }
    m/^(\S+):\s(.*)/;
    ( $key, $val ) = ( $1, $2 );
    $e{$key} = load_line($val);
  }
  if ( $e{pid} > 0 ) {
    %{ $ran{ $e{pid} } } = %e;
    qlog "$e{pid} loaded\n";
  } else {
    qlog "Warning! No pid! (" . join( ';', %e ) . "\n";
  }

  close SAV;
  qlog "Loading done\n";
  update_pids();
  update_childs();
}

sub load_array( $ ){
    my $attr_content=shift;
    my (@array,$val);

    $attr_content->to_first_node();
    while(defined($val=$attr_content->next_node())){
        push @array, $val->get_val();
    }
    return \@array;
}

sub load_hash( $ ){
    my $attr_content=shift;
    my (%hash,$val);

    $attr_content->to_first_node();
    while(defined($val=$attr_content->next_node())){
        $hash{$val->get_name()}=$val->get_val();
    }
    return \%hash;
}

sub load_state() {

  my ( $state, $name, $node, $top, @stack, @lines,
       $val, $topnode, $task, $attr, $attr_content,
       $info, $task_id, $value);

  $val = get_setting('mon_save');
  unless ( open( SAV, "<$val" ) ) {
    qlog "Cannot LOAD status from '$val'\n";
    return;
  }

  @lines=<SAV>;
  $state=join(' ',@lines);
  undef @lines;
  close SAV;

  $stack[0]=XMLNODE::new('root');
  $top=0;
  while(1){
#    qlog "DEBUG: parsing '$state'\n";
    if($state =~ s/^\s*<([\w:.\/-]+)>//){
      $name=$1;

      # closing?
      if($name =~ s/^\///){
        if($name eq $stack[$top]->get_name()){
          undef $stack[$top]; # it is saved in upper node info
          --$top;
        }
        else{
          # error
          qlog "Error closing: '$name' found, but '".
            $stack[$top]->get_name()."' expected.\n";
        }
        next;
      }

      # new element

      my $node=XMLNODE::new($name);
      $stack[$top]->add_node($node);
      $stack[++$top]=$node;
    }
    # not <> => data
    elsif($state =~ s/^\s*([^<]+)</</){
      $stack[$top]->set_val($1);
    }
    # end of text or ERROR
    else{
      if( $state !~ /^\s*$/ ){
        qlog "Error: cannot parse '$state'\n";
      }
      last;
    }
  }

  # Loaded data into xml tree...
  # Now decode it!

  $top=0;
  $stack[0]->to_first_node();
  $topnode=$stack[0]->next_node();
  if(!defined $topnode){
    qlog "Warning! Cannot load saved state! Ignore...\n";
    return;
  }
  if($topnode->get_name ne 'xml'){
    qlog "ERROR! Bad saved state: no <xml> top node\n";
    return;
  }

  # foreach all tasks...
  $topnode->to_first_node();
  while(defined ($task=$topnode->next_node())){
    if($task->get_name() eq 'task'){

      $task->to_first_node();
      $info=Task::new;

      $task_id='';
      while(defined ($attr=$task->next_node())){
        #$attr=$task->get_val();

        $value=$attr->get_val();
        qlog "attr: ".$attr->get_name()."; ".$attr->get_val()."\n";

        #is it complicated?
        if($attr->count_nodes()>0){
          $attr->to_first_node();
          $attr_content=$attr->next_node();
          if($attr_content->get_name eq 'array'){
            $value=load_array($attr_content);
          }
          elsif($attr_content->get_name eq 'hash'){
            $value=load_hash($attr_content);
          }
          elsif($attr_content->get_name eq 'packed'){
            $attr_content->to_first_node();
            $val=$attr_content->next_node();
            $val=$val->get_val();

            $value=unpack('u',$val);
          }
          elsif($attr_content->get_name eq 'scalar'){
            #$attr_content->to_first_node();
            #$val=$attr_content->next_node();
            #$val=$val->get_val();
            $val=$attr_content->get_val();

            $value=$val;
          }
          else{
            $value=$attr_content->get_val();
          }
        }

        if($attr->get_name eq 'task_id'){
          $task_id=$value;
        }
        elsif($attr->get_name eq 'pids'){
            foreach my $p (@$value){
                $info->add_pid($p);
            }
        }
        elsif($attr->get_name eq 'is_dead'){
          if($value>0){
            $info->mark_dead();
          }
        }
        else{
          $info->set_attr($attr->get_name(),$value);
        }
      }
      if($task_id eq ''){
        qlog "ERROR: Loaded task with empty id! Ignored.\n";
        next;
      }
      if($info->get_attr('owner') eq '' or
         $info->get_attr('id') eq '' or
         $info->get_attr('user') eq ''
        ){
        qlog "ERROR: Bad task loaded: owner=".
          $info->get_attr('owner').
            ' id='.$info->get_attr('id').
              ' user='.$info->get_attr('user').". Ignored.\n";
        next;
      }

      # add info to tasks
      $tasks{$task_id}=$info;
    }
    # if task name is NOT 'task'...
    else{
      qlog 'Warning! Non-task info on top level ('.
        $task->get_name()."). Ignored.\n";
    }
  }
  # all tasks loaded...

  qlog "Loading done\n";
  update_pids();
  update_childs();
}


#
#  Loads config file.
#  args: 1 - file name
#        2 - (opt) safety (1 - unsafe load, 0 - safe)
#
#  ret:  0 if success, 1 if fail to open file
#
################################################################
sub load_config( $;$ ) {
  my ( $file, $unsafe ) = @_;
  my ( $var,  $val );

  open( IN, "<$file" ) or return 1;

  while (<IN>) {
    chomp;
    next if (/^\s*\#/);
    next if (/^\s*$/);

    unless ( $_ =~ m/^\s*(\S+)\s*\=\s*(.*)$/ ) {
      qlog "Bad string in config file: $_\n";
      next;
    }
    ( $var, $val ) = ( $1, $2 );
    $val =~ s/^\s+//;
    $val =~ s/\s+$//;
    if (assign_new_value(
                         \%new_global_settings,
                         $var, $val, $unsafe, 'g' )
       ) {
      qlog "Bad option in config file: '$_'\n";
    }
  }
  close IN;
  return 0;
}

#
#  Assigns new value to hash element according %opt_types
#
#  args: 1 hash reference
#        2 hash key
#        3 value to be assigned
#        4 unsafe flag (0 - safe assign, 1 - forced)
#        5 (opt) section name (a letter)
#
#  ret:  returns 0 if succeed, nonzero if not
#
###########################################################
sub assign_new_value( $$$$;$ ) {
  my ( $hash, $key, $val, $unsafe, $s ) = @_;

  if ( exists $opt_types{$key} ) {
    my ( $type, $safe, $cumul, $section ) = @{ $opt_types{$key} };
    if ( !$unsafe && ( $safe ne 'y' ) ) {
      return 1;
    }
    if ( ( $s ne '' ) and ( $section ne '' ) and ( $section !~ m/$s/ ) ) {
      return 1;
    }

    if ( $type eq 't' ) {       # text
      if ($cumul) {
        $hash->{$key} .= $val;
      } else {
        $hash->{$key} = $val;
      }
    } elsif ( $type eq 'n' ) {  # numeric
      if ( $val !~ /^(\d+)/ ) {
        qlog
          "Warning! Bad value for $key (must be numeric, but '$val' found)\n";
        return 1;
      }
      $hash->{$key} = $1;
    } elsif ( $type eq 'h' ) {  # hash
      if ( $val !~ /^(\S+)\s+(.*)$/ ) {
        qlog
          "Warning! Bad value for $key (must be hash, but '$val' found)\n";
        return 1;
      }
      $hash->{$key}->{$1} = $2;
    } elsif ( $type eq 'l' ) {  # list via space
      if ($cumul) {
        push @{ $hash->{$key} }, split( /\ +/, $val );
      } else {
        @{ $hash->{$key} } = split( /\ +/, $val );
      }
    } elsif ( $type eq 'L' ) {  # list via coma, semicolon or space
      if ($cumul) {
        push @{ $hash->{$key} }, split( /[\s\;\,]/, $val );
      } else {
        @{ $hash->{$key} } = split( /[\s\;\,]/, $val );
      }
      qlog "LIST '$key' :" . join( ';', @{ $hash->{$key} } ) . ";\n";
    } elsif ( $type eq "\@" ) { # list via coma
      if ($cumul) {
        push @{ $hash->{$key} }, split( /\,/, $val );
      } else {
        @{ $hash->{$key} } = split( /\,/, $val );
      }
    }
  } else {
    return 1;
  }
  return 0;
}

sub get_setting( $ ) {
  my ($sname) = @_;

  return $global_settings{$sname} if ( exists $global_settings{$sname} );
  return undef;
}

#
#  deletes dir recursively as user
#  returns true if succeed, false if not
#
sub deldir($;$ ) {              #delete directory recursively
  my ( $arg, $u ) = @_;
  my $file;

  $arg =~ s!/+$!!;              # remove trailing slash(es)

  $arg =~ tr{\;\*\&\\~\?\<\>\|\'\"\`}{}d; # remove uliked symbols
  $arg =~ s!/\.\./!!g;          # remove updirs

  qlog "DELDIR '$arg'\n";
  unless ($arg) {
    qlog "Invalid arg for deldir: '$arg'\n";
    return 1;
  }

  if( ($arg eq '/') or
      ($arg eq '/usr') or
      ($arg eq '/bin') or
      ($arg eq '/sbin') or
      ($arg eq '/var') or
      ($arg eq '/etc')) {
    qlog "DANGER!!! try to remove $arg\n";
    return 3;
  }
  unless ( -d "/$arg" ) {
    qlog "No such dir '$arg'\n";
    return 2;
  }

  $file = "/bin/rm -rf '/$arg'";

  {
    local $ENV{PATH} = '/bin:/usr/bin:/usr/local/bin';
    if ($u) {
      launch( 0, $file, '', get_uid{$u}, get_gid{$u} );
    } else {
      launch( 0, $file, '' );
    }
  }
  return 0 if ( -e "/$arg" );
  return 1;
}                               # deldir

sub pack_value( $ ) {
  my ( $tmp, $i );

  if ( ref( $_[0] ) eq 'ARRAY' ) {
    $tmp = "\0A";
    for ( $i = 0; $i < scalar( @{ $_[0] } ); ++$i ) {
      $tmp .= pack_value( ${ $_[0] }[$i] );
    }
    $tmp .= "\0E";
    return $tmp;
  } elsif ( ref( $_[0] ) eq 'HASH' ) {
    $tmp = "\0H";
    foreach $i ( keys( %{ $_[0] } ) ) {
      $tmp .= "$i" . pack_value( $_[0]->{$i} );
    }
    $tmp .= "\0E";
    return $tmp;
  } elsif ( ref( $_[0] ) eq 'REF' ) {
    return undef;
  } elsif ( ref( $_[0] ) eq 'CODE' ) {
    return undef;
  } elsif ( ref( $_[0] ) eq 'GLOB' ) {
    return undef;
  } elsif ( $_[0] =~ y/\0\n\r// ) {
    $tmp = pack( 'u', $_[0] );
    $tmp =~ s/\n//g;
    return "\0U${tmp}\0E";
  }
  return "\0S" . $_[0] . "\0E";
}

#
#  Unpacks string encoded by pack_value
#  Args:
#         1 - reference to result variable (must be scalar!)
#         2 - string to decode
#         3 - (optional) index to start from
#  Ret:
#         new index in source string (just after final \0[E])
#
#######################################################
sub unpack_value( $$;$ ) {
  my ( $res, $val, $index ) = @_;
  my ( $tmp, $i2, $my_res );

  $index ||= 0;
  undef $$res;
  $tmp = substr( $val, $index, 1 );
  qlog "UPCK: '" . substr( $val, $index ) . "'\n"
    if get_setting('debug_pc');
  if ( $tmp eq "\0" ) {         # complex type
    ++$index;
    $tmp = substr( $val, $index, 1 );
    ++$index;
    if ( $tmp eq 'E' ) {        #end mark
      return $index;
    } elsif ( $tmp eq "A" ) {   #array
      @$my_res = ();
      for ( ;; ) {
        if ( substr( $val, $index, 2 ) eq "\0E" ) { #end
          $index += 2;
          last;
        }
        $index = unpack_value( \$tmp, $val, $index );
        push @$my_res, $tmp;
      }
      qlog "Unpacked array\n" if get_setting('debug_pc');
    } elsif ( $tmp eq "H" ) {   #hash
      my $key;
      %$my_res = ();
      for ( ;; ) {
        if ( substr( $val, $index, 2 ) eq "\0E" ) { #end
          $index += 2;
          last;
        }
        $i2 = index( $val, "\0", $index );
        $i2 = length($val)
          if ( $i2 < 0 );       #Ooops! Not found terminator
        $key = substr( $val, $index, $i2 - $index );

        $index = unpack_value( \$tmp, $val, $i2 );
        $my_res->{$key} = $tmp;
      }
      qlog "Unpacked hash " . join( ';', keys(%$my_res) ) . "\n"
        if get_setting('debug_pc');
    } elsif ( $tmp eq "U" ) {   #uuencode
      $i2 = index( $val, "\0E", $index );
      $i2 = length($val) if ( $i2 < 0 ); #Ooops! Not found terminator
      $my_res = unpack( 'u', substr( $val, $index, $i2 - $index ) );
      $index = $i2 + 2;
      qlog "Unpacked uu '$my_res'\n" if get_setting('debug_pc');
    } elsif ( $tmp eq "S" ) {   #simple scalar
      $i2 = index( $val, "\0E", $index );
      $i2 = length($val) if ( $i2 < 0 ); #Ooops! Not found terminator
      $my_res = substr( $val, $index, $i2 - $index );
      $index = $i2 + 2;
      qlog "Unpacked '$my_res'\n" if get_setting('debug_pc');
    } else {
      qlog "Warning! error while decoding in pos $index '$val'\n";
      $my_res = '';
    }
  } else {                      #simple scalar
    qlog "Malformed scalar! (" . substr( $val, $index, -1 ) . "\n";
  }
  $$res = $my_res;
  return $index;
}

