#!/usr/bin/perl
# Cleo batch system
#
# client side (qsub/mpirun...)
#
#

# exit codes:
# 1 - invalid usage (or simply usage print)
# 2 - no action selected
# 3 - cannot create socket/connect
# 4 - bad options
# 5 - action failed

#use IO::Socket::INET;
use lib '/usr/lib/cleo';
use Cleo::Conn;
use Cwd;
use Sys::Syslog qw(:DEFAULT setlogsock);

use strict;

use vars qw($port $user $dir $answer $status $tmpfd
            %opts $mode $usage @answer $S $line $i);

$port = 5252;

######################################################################

sub getline( $ ){
    my $conn=shift;
    my ($readed,$str);

    for(;;){
        $conn->flush;
        $str=$conn->read;
        return undef if (!defined $str and ($readed eq ''));
        $readed.=$str;

        if($readed =~ s/^(.*?\n)//s){
            $conn->unread($readed);
            return $1;
        }
        select(undef,undef,undef,0.1);
    }
}


if (open(CONF,"</etc/cleo.conf"))
{
  my $section='server';
  while (<CONF>)
  {
    next if(/^\#/ || /^\s*$/);
    if (/^\s*\[(.+)\]/)
    {
      $section=$1;
      next;
    }
    if ($section eq 'server')
    {
      if (/^\s*port\s*\=\s*(\d+)/)
      {
        $port=$1;
        last;
      }
    }
  }
  close(CONF);
}
$port = $ENV{QS_PORT} if $ENV{QS_PORT};

GetOptsTillCan_hash(\%opts,
                    'v=',
                    's=',
                    'G=s',
                    '1=s',
                    '2=s',
                    '3=s',
                    'L=s',
                    'A=',
                    'b=s',
                    'V=s',
                    'u=s',
                    'B=s',
                    'U=s',
                    'M=s',
                    'd=',
                    'f=',
                    'C=s',
                    'n=s',
                    'c=s',
                    'w=s',
                    'p=s',
                    'm=s',
                    'q=s',
                    'o=s',
                    'E=',
                    'r=s',
                    't=s',
                    'P=s',
                    'l=s',
                    'h=',
                    'F=',
                    'R=',
                    'L=s',
                    'a=s',
                    'T=',
                    'O=s',
                    'y=s',
                    'Y=s',
                    'Z=s',
                    'z=s',
                    'S=s',
                    'k=s',
                    'g=s'
                   );

$usage="Usage: $0 [-c .. -n ..|-v|-d|-m|-C] <options>\nOptions: -h      This message\n".
"         -G comm Generic command. Send 'comm' to server.\n".
"                 Rest arguments are treated as pairs parameter/value.\n".
"         -v      View tasks\n".
"         -V fl   View tasks (new version)\n".
"                 flags:  f=foreign tasks, o=own tasks,\n".
"                 p=processors stat, P=global processors stat,\n".
"                 m=default timelimit, M=maximum timelimit,\n".
"                 O=other stats, s=running mode, B=blocked pe,\n".
"                 b=blocked tasks, u=u1;u2;... list of users,\n".
"                 for every task: c=used cpus, C=custom fields,\n".
"                 >=outfile, r=repfile, w=workdir, F=full task line\n".
"         -s      View statistics\n".
"         -R      Use recursion (for view, mode, and debug)\n".
"         -F      Use full mode (for view)\n".
"         -T      Use technical mode (for view)\n".
"         -L list Use this list in command (via comma)\n".
"         -b id   Block task <id>\n".
"         -u id   Unblock task <id>\n".
"         -a val  Autoblock tasks for users (see -L). Values 0-5\n".
"                 (unblock/block/unnonblock/nonblock/allblock/unallblock)\n".
"         -B p    Block processor <p>\n".
"         -U p    Unblock processor <p>\n".
"         -p port Use server port <port>\n".
"         -d id(s) Delete task(s) <id(s)>\n".
"         -f      Force deletion of task (dangerous!)\n".
"         -L prof Use profile 'prof'\n".
"         -c com  Add to queue task with command <com>\n".
"         -A      Add to queue task. Task command line is taken\n".
"                 from the rest of comand line\n".
"         -C id   Change parameters for task with id=id\n".
"         -q xxx  Add task to queue 'xxx'\n".
"         -1 file Use file as stdin for added task\n".
"         -2 file Use file as stdout for added task\n".
"         -3 file Use file as stderr for added task\n".
"         -n num  Use <num> processors\n".
"         -w dir  Use <dir> as workdir\n".
"         -N nice Use nice\n".
"         -E      Pass all the environment for new task\n".
"         -o out  Use output file out (template)\n".
"         -r rep  Use report file rep (template)\n".
"         -t tmp  Use temporary dir tmp (template)\n".
"         -l lim  Use time limit as lim (seconds)\n".
"         -P pri  Use priority pri for this task\n".
"         -M str  gives additional parameters for mode, del, block_task, block_cpu or view\n".
"         -O str  gives another parameters for block_task\n".
"         -S name operate as user 'name' (block_task)\n".
"         -y list wait for all tasks from list (via comma) to be runned\n".
"         -Y list wait for all tasks from list (via comma) to be successfully completed\n".
"         -Z list wait for all tasks from list (via comma) to be unsuccessfully completed\n".
"         -z logical operation for wait conditions \'a\'[nd] or \'o\'[r]\n".
"         -k str  choose the strategy of processor selection\n".
"         -m mod  Change server operating mode:\n".
"                   run    - allow to run new tasks\n".
"                   norun  - allow only to queue new tasks\n".
"                   sane   - activate autoreboot\n".
"                   nosane - cancel autoreboot\n".
"                   view   - DONT change mode, only view current mode\n".
"                   queue_disable - disable to queue new tasks\n".
"                   queue_enable  - enable to queue new tasks\n".
"                   conf_reload   - reload config file\n".
"                   update_pid    - update pid file\n".
"                   version       - show version info\n".
"                   newversion    - reload new version of server\n".
"         -g command Debug command (for experts only)\n".
" You can set default values by environment variables:\n".
" QS_PORT     : server port\n".
" QS_QUEUE    : queue name\n".
" QS_PRIORITY : default priority\n".
" QS_TEMP     : temp dir template\n".
" QS_OUT      : out file template\n".
" QS_REP      : report file\n".
" QS_TIMELIM  : default timelimit\n";

if ($opts{h})
{
  print $usage;
  exit(1);
}
unless(
       ($opts{'d'} ne '') or
       ($opts{'m'} ne '') or
       ($opts{'c'} ne '') or
       ($opts{'C'} ne '') or
       ($opts{'n'} ne '') or
       ($opts{'B'} ne '') or
       ($opts{'b'} ne '') or
       ($opts{'U'} ne '') or
       ($opts{'u'} ne '') or
       ($opts{'g'} ne '') or
       ($opts{'G'} ne '') or
       defined($opts{'a'}) or
       defined($opts{'s'}) or
       defined($opts{'v'}) or
       defined($opts{'V'})
      ){
  print "No valid action (v,V,d,m,s,c,C,n,B,b,U,u,a,g,G) specified!!!\n".join(';',keys(%opts))."\n"; #$usage";
  exit(2);
}

$port = $opts{'p'} if($opts{'p'});

$!=3;
#$S = IO::Socket::INET->new(
#        PeerAddr => 'localhost',
#        PeerPort => $port,
#        Proto    => 'tcp',
#    );

$S = new Cleo::Conn('localhost', $port);

unless(defined $S){
  print "Cannot connect to server: $!\n";
  exit(3);
}
#$S->autoflush(1);
#$S->blocking(0);

# try to connect 10 seconds
for($i=0;$i<100;++$i){
    $S->connect;
    last if($S->get_state eq 'ok');
    select(undef,undef,undef,0.1);
}
if($i==100){
  print "Cleo connection timed out...\n";
  exit(3);
}
#unless(socket(S, PF_INET(), SOCK_STREAM(), getprotobyname('tcp'))){
#  print "Cannot create socket: $!\n";
#  exit(3);
#}
#unless(connect(S,sockaddr_in($port, inet_aton('localhost')))){
#  print "Cannot connect to server: $!\n";
#  exit(3);
#}

$user=getpwuid($<);
#print "User: $user port=$port\n";
$dir = $opts{'w'} || cwd();

$opts{'q'} = $ENV{QS_QUEUE} unless $opts{'q'} ne '';
$opts{'q'} = 'main'         unless $opts{'q'} ne '';
$opts{'t'} = $ENV{QS_TEMP}  unless $opts{'t'} ne '';
$opts{'t'} = '/tmp'         unless $opts{'y'} ne '';
$opts{'o'} = $ENV{QS_OUT}   unless $opts{'o'} ne '';
$opts{'r'} = $ENV{QS_REP}   unless $opts{'r'} ne '';

$!=4;

eval{
  setlogsock('unix');
  openlog('cleo_client','nowait,pid','user');

  $status = 'Cannot connect to server...';
 MAIN:  {
    $SIG{'ALRM'}= sub {die "alarm\n";};
    alarm(30);

    #
    #  MODE
    #
    if ($opts{'m'} ne ''){
      $mode='m';
      unless ($opts{'m'} eq 'view' || $opts{'m'} eq 'version' || $opts{'m'} eq 'update_pid')
      {
        print "Changing server mode to $opts{'m'}\n";
      }
      $opts{'M'} ||= '1';
      $S->send("mode:$user:$$:+all\n");
      $S->send("queue: $opts{'q'}\n") if($opts{'q'});
      $S->send("recurs: 1\n") if($opts{'R'});
      $S->send("mode_$opts{'m'}: $opts{'M'}\nend\n");

    }
    #
    #  GENERIC COMMAND
    #
    elsif ($opts{'G'}){
      $mode='G';
      my ($val,$var);

      # eliminate hack attempts...
      $opts{'G'} =~ y/-a-zA-Z_0-9.//c;
      $S->send("$opts{'G'}:$user:$$:+all\n");
      $S->send("queue: $opts{'q'}\n") if($opts{'q'});
      for(;;){
        $var=shift @ARGV;
        $val=shift @ARGV;
        last if($var eq '');
        $var =~ y/-a-zA-Z_0-9.//c;
        $val =~ y/\n\r\0//;
        $S->send("$var: $val\n");
      }
      $S->send("end\n");
    }
    #
    #  VIEW (old style)
    #
    elsif ($opts{'v'}){
      $mode='v';
      print "Use recursion\n" if($opts{'R'});
      print "Use full mode\n" if($opts{'F'});

      $S->send("view:$user:$$:+all\n");
      $S->send("queue: $opts{'q'}\n") if($opts{'q'});
      $S->send("showsub: 1\n") if($opts{'R'});
      $S->send("tech: 1\n") if($opts{'T'});
      $S->send("full: 1\n") if($opts{'F'});
      $S->send("end\n");
    }
    #
    #  VIEW (new style)
    #
    elsif ($opts{'V'}){
      $mode='v';
      print "Use recursion\n" if($opts{'R'});
      print "Use full mode\n" if($opts{'F'});

      $S->send("view:$user:$$:+all\n");
      $S->send("queue: $opts{'q'}\n") if($opts{'q'});
      $S->send("showsub: 1\n") if($opts{'R'});
      $S->send("flags: $opts{'V'}\n");
      $S->send("end\n");
    }
    #
    #  AUTOBLOCK
    #
    elsif (defined $opts{'a'}){
      $mode='v';
      print "Use recursion\n" if($opts{'R'});
      $S->send("autoblock:$user:$$:+all\n");
      $S->send("queue: $opts{'q'}\n") if($opts{'q'});
      $S->send("recurse: 1\n") if($opts{'R'});
      $S->send("users: $opts{'L'}\n");
      $S->send("val: $opts{'a'}\n");
      $S->send("end\n");
    }
    #
    #  DEL TASK
    #
    elsif ($opts{'d'}){
      local $,=' ';
      $mode='d';
      print "Del @ARGV";
      print "for user(s) $opts{'L'}" if($opts{'L'});
      print "\n";
      if($#ARGV<0){
        $status= "No task id!";
        die "No task id!\n";
      }
      #      $S->send("del:$user:$$:+$opts{'d}\nid:$opts{'d}\nqueue:$opts{'q}\nend\n");
      $S->send("del:$user:$$:+\n");
      $S->send("queue: $opts{'q'}\n") if($opts{'q'});
      $S->send("recurs: 1\n") if($opts{'R'});
      $S->send("userlist: $opts{'L'}\n") if($opts{'L'});
      $S->send("forced: 1\n") if($opts{'f'});
      $S->send("id: @ARGV\n");
      if ($opts{'M'})
      {
        # First two characters - rmask. Others - mask
        my ($rmask,$mask);
        $rmask=substr($opts{'M'},0,2);
        $mask=substr($opts{'M'},2);
        $S->send("rmask: $rmask\n");
        $S->send("mask: $mask\n");
      }
      $S->send("end\n");
    }
    #
    #  DEBUG COMMAND
    #
    elsif ($opts{'g'})
    {
      $mode='C';
      print "Debug\n";
      $opts{'g'} =~ tr/\n\r\0//d;
      $S->send("debug:$user:$$:+$opts{'b'}\n");
      $S->send("command:$opts{'g'}\n");
      $S->send("queue:$opts{'q'}\n") if($opts{'q'});
      $S->send("recurse:$opts{'r'}\n") if($opts{'r'});
      $S->send("end\n");
    }
    #
    #  BLOCK TASKS
    #
    elsif ($opts{'b'}){
      $mode='C';
      print "Block $opts{'b'}\n";
      unless (($opts{'b'} =~ /^[0-9,]+$/) or ($opts{'b'} eq 'all'))
      {
        $status= "Illegal id!";
        die "Illegal id!\n";
      }
      $S->send("block:$user:$$:+$opts{'b'}\n");
      $S->send("id:$opts{'b'}\n");
      $S->send("queue:$opts{'q'}\n") if($opts{'q'});
      $S->send("userlist: $opts{'L'}\n") if($opts{'L'});
      $S->send("mask: $opts{'M'}\n") if($opts{'M'});
      $S->send("reason: $opts{'O'}\n") if($opts{'O'});
      $S->send("username: $opts{'S'}\n") if($opts{'S'});
      $S->send("val: 1\n");
      $S->send("end\n");
    }
    #
    #  UNBLOCK TASKS
    #
    elsif ($opts{'u'}){
      $mode='C';
      print "Unblock $opts{'u'}\n";
      unless (($opts{'u'} =~ /^[0-9,]+$/)  or ($opts{'u'} eq 'all'))
      {
        $status= "Illegal id!";
        die "Illegal id!\n";
      }
      $S->send("block:$user:$$:+$opts{'u'}\n");
      $S->send("id:$opts{'u'}\n");
      $S->send("queue:$opts{'q'}\n") if($opts{'q'});
      $S->send("userlist: $opts{'L'}\n") if($opts{'L'});
      $S->send("mask: $opts{'M'}\n") if($opts{'M'});
      $S->send("reason: $opts{'O'}\n") if($opts{'O'});
      $S->send("username: $opts{'S'}\n") if($opts{'S'});
      $S->send("val: 0\n");
      $S->send("end\n");
    }
    #
    #  BLOCK CPUS
    #
    elsif ($opts{'B'}){
      $mode='C';
      print "Block processor(s) $opts{'B'} [$opts{'O'}]\n";
      unless ( $opts{'B'} =~ /^\S+$/ ){
        $status= "Illegal processor name!";
        die $status;
      }
      $S->send("block_pe:$user:$$:+$opts{'B'}\n");
      $S->send("queue:$opts{'q'}\n") if defined $opts{'q'};
      $S->send("id:$opts{'B'}\nval:1\n");
      $S->send("recurs: $opts{'R'}\n") if defined $opts{'R'};
      $S->send("reason:$opts{'M'}\n") if defined $opts{'M'};
      $S->send("safe:$opts{'O'}\n") if defined $opts{'O'};
      $S->send("end\n");
    }
    #
    #  UNBLOCK CPUS
    #
    elsif ($opts{'U'}){
      $mode='C';
      print "Unblock processor(s) $opts{'U'}\n";
      unless ( $opts{'U'} =~ /^\S+$/ )
      {
        $status= "Illegal processor name!";
        die $status;
      }
      $S->send("block_pe:$user:$$:+$opts{'B'}\n");
      $S->send("queue:$opts{'q'}\n") if defined $opts{'q'};
      $S->send("id:$opts{'U'}\nval:0\n");
      $S->send("recurs: $opts{'R'}\n") if defined $opts{'R'};
      $S->send("reason:$opts{'M'}\n") if defined $opts{'M'};
      $S->send("safe:$opts{'O'}\n") if defined $opts{'O'};
      $S->send("end\n");
    }
    #
    #  CHANGE TASK ATTRIBUTES
    #
    elsif ($opts{'C'} ne '') {
      $mode='C';
      #
      #  Priority
      #
      if ($opts{'P'} ne ''){
        $S->send("priority:$user:$$:+$opts{'C'}\nid:$opts{'C'}\nqueue:$opts{'q'}\nval:$opts{'P'}\nend\n");
      }
      elsif ($opts{'l'} ne ''){
        $S->send("chattr:$user:$$:+$opts{'C'}\nid:$opts{'C'}\n");
        $S->send("queue:$opts{'q'}\n") if defined $opts{'q'};
        $S->send("attribute:timelimit\nval:$opts{'l'}\nend\n");
      } else {
        die "No attributes to change!";
      }
    }
    #
    #  ADD TASK
    #
    elsif (($opts{'c'} ne '' or $opts{'A'}) and $opts{'n'} ne ''){
      $mode='a';

      $opts{'P'} = $ENV{QS_PRIORITY} unless $opts{'P'} ne '';
      $opts{'l'} = $ENV{QS_TIMELIM} unless $opts{'l'} ne '';

      print "Using queue $opts{'q'}\n";

      $S->send("add:$user:$$:\+$opts{'n'}\n");
      $S->send("np:$opts{'n'}\n");
      $S->send("command:$opts{'c'}\n") if($opts{'c'} ne '');
      $S->send("dir:$dir\n");   #$opts{'w'}\n") if($opts{'w'});
      $S->send("path:".`pwd`);
      $S->send("queue:$opts{'q'}\n") if($opts{'q'});
      $S->send("timelimit:$opts{'l'}\n") if($opts{'l'});
      $S->send("outfile:$opts{'o'}\n") if($opts{'o'});
      $S->send("repfile:$opts{'r'}\n") if($opts{'r'});
      $S->send("tmpdir:$opts{'t'}\n") if($opts{'t'});
      $S->send("nice:$opts{'N'}\n") if($opts{'N'});
      $S->send("priority:$opts{'P'}\n") if($opts{'P'});
      $S->send("stdin:$opts{'1'}\n") if($opts{'1'});
      $S->send("stdout:$opts{'2'}\n") if($opts{'2'});
      $S->send("stderr:$opts{'3'}\n") if($opts{'3'});
      $S->send("profile:$opts{'L'}\n") if($opts{'L'});
      $S->send("wait_for_run:$opts{'y'}\n") if($opts{'y'});
      $S->send("wait_for_ok:$opts{'Y'}\n") if($opts{'Y'});
      $S->send("wait_for_fail:$opts{'Z'}\n") if($opts{'Z'});
      $S->send("wait_cond_type:$opts{'z'}\n") if($opts{'z'});
      $S->send("pe_select:$opts{'k'}\n") if($opts{'k'});
      my $index=0;
      while(@ARGV){
          $S->send("args${index}:$ARGV[0]\n");
          ++$index;
          shift @ARGV;
      }

      if ($opts{'E'}){
        my $e;
        my $env;
        foreach $e (keys(%ENV)){
          $env .= "$e =$ENV{$e}\0";
        }
        $e=pack("u",$env);
        $e =~ s/\n//g;
        $S->send("env: $e\n");
      }
      $S->send("end\n");
    }
    else {
      $status = "No action (v,V,d,m,s,c,C,n,B,b,U,u,a) specified\n$usage";
      die "\n";
    }

    $S->flush;
#    $tmpfd=select(S);$| = 1; select STDOUT;
    $answer=getline($S);
    if(!defined($answer)){
        print "Cleo server does not send the answer...\n";
        exit(4);
    }
    chomp $answer;

    #authorization fase
    if ($answer =~ /^\+auth:(.*)/)
    {
      last MAIN if($1 eq '');
      $0=$1;

      #confirm authorization is done
      $S->send("+ok\n");
      $S->flush;
      $answer=getline($S);
      chomp $answer;

#      select S; $|=0; select STDOUT;
      #is the answer correct?
      if ($answer =~ /^(\-|\+)(.+)/){
        undef $status;
        while(defined ($line=getline($S))){
            push @answer, $line;
        }
#        @answer=$S->getlines();
        if ($mode eq 'v' || $mode eq 's'){
          if ($1 eq '+'){
          } else {
            $status=$2;
#            my @x=<S>;
          }
        } elsif ($mode eq 'd' || $mode eq 'a' || $mode  eq 'C') {
          # del/add/change
          if ($1 eq '+') {
            print "$2\n";
#            @answer=<S>;
#            print join('',@answer);
          } else {
            $status=$2;
          }
        } elsif ($mode eq 'm'){
          #mode
          if ($1 eq '-'){
            $status=$2;
          }
#          @answer=<S>;#chomp @answer;
#          print join('',@answer);
        } elsif ($mode eq 'G'){
          #generic
          if ($1 eq '-'){
            $status=$2;
          }
          else{
              print "Command $opts{'G'} OK.\n";
          }
        } else {
          if ($1 eq '-'){
            $status=$2;
          } else {
            print "Succeed. But what command it was?\n";
          }
        }
        print join('',@answer,"\n") unless $status;
      } else {
        $status = "oops. Internal error: server answered '$answer'\n";
      }
    }
  }                             # ~MAIN
};

if($status){
  syslog('info','Error: %s (%m)',$status);
  closelog();
  print join('',"Error: $status\n",@answer,"\n");
  exit(5);
}

#
#  Gets opts like this: ('X=i',) (this means "option '-X 10' to variable $options{X}=10)
#  The scans command line for options till founds argument '--' or non-specified
#  option, or not '-' prefixed argument.
#  Specifications of options (what goes after 'X='):
#  i - integer
#  s - string
#  + - cumulative value (variable MUST be a list)
#  nothing - flag
#
sub GetOptsTillCan_hash{
  # \%hash,"arg1","arg2,...
  my $hash=shift @_;
  my @args=@_;
  my (%args,$arg,$a_key,$a_value,$a,$next,%types);

  foreach $arg (@args)
  {
    $arg =~ /^(\S+)(\=)(.*)/ or next;
    $a_key=$1;
    $a_value=$args{$arg};
    $types{$a_key} = $3;

    delete $args{$arg};
    $args{$a_key} = $a_value;
  }

  while ($next=shift @ARGV)
  {
    #    print ">>$next<[$ARGV[0]]\n";
    last if(substr($next,0,1) ne '-');
    last if($next eq '--');
    $a=substr($next,1);
    last unless(exists $args{$a});
    undef $next;
    if (($types{$a} eq 'i') || ($types{$a} eq 's'))
    {
      $hash->{$a}=shift @ARGV;
    } elsif ($types{$a} eq '')
    {
      $hash->{$a}=1;
    } elsif ($types{$a} eq '+')
    {
      push @{$hash->{$a}}, shift @ARGV;
    }
  }
  unshift @ARGV, $next if(defined $next);
}
