#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2012-2017  Vít Suchomel, Miloš Jakubíček
from __future__ import unicode_literals
from __future__ import print_function

import sys
import os
import getopt
import subprocess
from threading import Thread
from time import sleep
from collections import deque

try: binary_stdin = sys.stdin.buffer
except AttributeError: binary_stdin = sys.stdin

try: binary_stdout = sys.stdout.buffer
except AttributeError: binary_stdout = sys.stdout

import re
def read_big_structures(fp, structure_tag, buffer_size=10000000):
    """
    Returns an iterator over structures in a vertical file, represented as
    a file-like object. The contents of each structure are returned as string.
    """
    # TODO: take ending tags into account
    structure_start_re = re.compile(('^<%s[ >]' % structure_tag).encode("utf-8"), re.M)
    buffer = b""
    while True:
        new_data = fp.read(buffer_size)
        if not new_data:
            break
        buffer += new_data
        starting_positions = [m.start() for m in structure_start_re.finditer(buffer)]
        if starting_positions == []:
            continue
        for i in range(len(starting_positions) - 1):
            start = starting_positions[i]
            end = starting_positions[i + 1]
            yield buffer[start:end]
        buffer = buffer[starting_positions[-1]:]
    if buffer:
        yield buffer


STDIN_MARKER = '@IN' 
STDOUT_MARKER = '@OUT'
JOB_MARKER = '@JOBID'
DEFAULT_MAX_DOCS_PER_SUBPROCESS = 0 #unlimited
DEFAULT_STRUCT_TAG = 'doc'
DEFAULT_STRUCT_BUF_SIZE = 16*1024*1024
FIFO_PATH = '/tmp/run_parallel-%s' % os.getpid()
IO_WAIT_DELAY = 0.2
MAX_STRUCTS_IN_QUEUE = 1000
QUEUE_WAIT_DELAY = 1.0
SUBPROC_CHECK_DELAY = 1.0

def usage():
    print("""Usage: %(progname)s [OPTIONS] MULTI_FACTOR SUBPROGRAM < XML_INPUT

Runs multiple instances of a xml (vertical) processing program concurrently.
The input is chunked into pieces marked by the structure tag and sent to 
the subprograms. The subprograms' output is gathered and printed out.
Optionally use '%(stdin_marker)s' and '%(stdout_marker)s' to explicitly set 
the input file and the output file (respectively) for the subprograms. 
Does not keep the order of the input structures.
You can use the '%(job_marker)s' in the command for getting the job ID
starting with 0.

  MULTI_FACTOR number of concurrent programs to run
  SUBPROGRAM   the full execution line of the subprogram including parameters
  XML_INPUT    the input xml (vertical) file
  OPTIONS:
    -t STRING  input structure tag
                 default: %(default_struct_tag)s
    -T STRING  subprogram's output structure tag
                 default: %(default_struct_tag)s
    -m INT     send at most INT structures to a subprocess
    -b INT     structure buffer size in bytes
                 default: %(default_struct_buf_size)s
    -l DIR     directory for subprocess' stderr output
                 default: none
    -v         print some info to stderr
    -h, --help display this help
    
Simple usage example:
%(progname)s 12 ./unitok.py -l english -e utf-8 -s < in.vert > out.vert
Example with explicit input and output files:
%(progname)s 7 tt/bin/tree-tagger -token -lemma -sgml tt/lib/english-utf8.par %(stdin_marker)s %(stdout_marker)s < in.vert > out.vert
    """ % {
        'progname': sys.argv[0],
        'stdin_marker': STDIN_MARKER,
        'stdout_marker': STDOUT_MARKER,
        'default_struct_tag': DEFAULT_STRUCT_TAG,
        'default_struct_buf_size': DEFAULT_STRUCT_BUF_SIZE,
		'job_marker': JOB_MARKER
    }, file=sys.stderr)

class SubprocessHandler():
    def __init__(self, sub_id, sub_args, in_queue, out_queue, max_docs_per_subprocess, struct_out_tag, struct_buf_size, verbose, logdir):
        self._id = sub_id
        self._stop = False
        self._max_docs_per_subprocess = max_docs_per_subprocess
        self._struct_out_tag = struct_out_tag
        self._struct_buf_size = struct_buf_size
        self._verbose = verbose
        self._struct_count = 0
        
        #prepare the subprocess's I/O
        args = ' '.join(sub_args)
        if JOB_MARKER in args:
            sub_args = map(lambda x: x.replace(JOB_MARKER, str(sub_id)), sub_args)
        if STDIN_MARKER in args:
            #use a named pipe for sending data to the subprocess
            self._use_stdin = False
            #prepare the named pipe
            self._fifo_w_path = '%s-%d-W' % (FIFO_PATH, self._id)
            os.mkfifo(self._fifo_w_path)
            #prepare the I/O related subprocess arguments
            sub_args = map(lambda x: x.replace(STDIN_MARKER, self._fifo_w_path), sub_args)
            sub_in = None
        else:
            #use the subprocess's stdin for sending data
            self._use_stdin = True
            sub_in = subprocess.PIPE
        if STDOUT_MARKER in args:
            #use a named pipe for receiving data from the subprocess
            self._use_stdout = False
            #prepare the named pipe
            self._fifo_r_path = '%s-%d-R' % (FIFO_PATH, self._id)
            os.mkfifo(self._fifo_r_path)
            #prepare the I/O related subprocess arguments
            sub_args = map(lambda x: x.replace(STDOUT_MARKER, self._fifo_r_path), sub_args)
            sub_out = None
        else:
            #use the subprocess's stdout for receiving data
            self._use_stdout = True
            sub_out = subprocess.PIPE

        #start the subprocess
        if self._verbose:
            sys.stderr.write('Executing %s\n' % ' '.join(sub_args))
        if logdir:
            errlog = open("%s/%s.err" % (logdir, sub_id), "w")
        else:
            errlog = None
        self._subproc = subprocess.Popen(sub_args, bufsize=self._struct_buf_size, stdin=sub_in, stdout=sub_out, stderr=errlog)
        
        #start writing to and reading from the subprocess
        self._thread_w = Thread(target=self._write, name='proc-%d-W' % self._id, args=(in_queue,))
        self._thread_w.start()
        self._thread_r = Thread(target=self._read, name='proc-%d-R' % self._id, args=(out_queue,))
        self._thread_r.start()
        
    def sub_poll(self):
        return self._subproc.poll()

    def stop_writing(self):
        self._stop = True
        self._thread_w.join()
    
    def stop_reading(self):
        self._subproc.wait()
        self._thread_r.join()

    #send input data to the subprocess
    def _write(self, queue):
        if self._use_stdin:
            fd = self._subproc.stdin
        else:
            fd = open(self._fifo_w_path, 'wb')
        while self._max_docs_per_subprocess > self._struct_count or 0 == self._max_docs_per_subprocess:
            try:
                fd.write(queue.popleft())
                self._struct_count += 1
            except IndexError: #empty queue
                if self._stop:
                    break
                sleep(IO_WAIT_DELAY)
        fd.close()
        if not self._use_stdin:
            os.remove(self._fifo_w_path)

    #receive output data from the subprocess
    def _read(self, queue):
        if self._use_stdout:
            fd = self._subproc.stdout
        else:
            fd = open(self._fifo_r_path, 'rb')
        #wait until some data is sent to the subprocess
        while 1 > self._struct_count and not self._stop:
            sleep(IO_WAIT_DELAY)
        #read the subprocess' output
        if 0 < self._struct_count:
            for struct in read_big_structures(fd, structure_tag=self._struct_out_tag, buffer_size=self._struct_buf_size):
                queue.append(struct)
        if not self._use_stdout:
            fd.close()
            os.remove(self._fifo_r_path)
        if self._verbose:
            sys.stderr.write('Subprocess stopped, %d structures written\n' % self._struct_count)

#print out the output from all subprocesses
def out_writer(queue, stop):
    while True:
        try:
            binary_stdout.write(queue.popleft())
        except IndexError: #empty queue
            if stop[0]:
                break
            sleep(IO_WAIT_DELAY)

#start subprocesses if needed
def subproc_starter(subproc_handlers, multi_factor, subcall_args, in_queue, out_queue, max_docs_per_subprocess, struct_out_tag, struct_buf_size, stop, verbose, logdir):
    while True:
        for i in range(0, multi_factor):
            start_new_subprocess = True
            if i in subproc_handlers:
                sub_return_value = subproc_handlers[i].sub_poll()
                if None == sub_return_value:
                    start_new_subprocess = False
                elif 0 != sub_return_value:
                    sys.stderr.write('The subprocess returned a non-zero value!\n')
            if start_new_subprocess and in_queue:
                if verbose:
                    sys.stderr.write('Starting a new subprocess\n')
                subproc_handlers[i] = SubprocessHandler(i, subcall_args, in_queue, out_queue, max_docs_per_subprocess, struct_out_tag, struct_buf_size, verbose, logdir)
        if stop[0]:
            break
        sleep(SUBPROC_CHECK_DELAY)

def vertfork(
        multi_factor,
        subcall_args,
        max_docs_per_subprocess=DEFAULT_MAX_DOCS_PER_SUBPROCESS,
        struct_in_tag=DEFAULT_STRUCT_TAG,
        struct_out_tag=DEFAULT_STRUCT_TAG,
        struct_buf_size=DEFAULT_STRUCT_BUF_SIZE,
        verbose=False,
        logdir=None):
    #init
    in_queue = deque()
    out_queue = deque()
    subproc_handlers = {}
    #start the out_writer thread
    stop_out = [False]
    out_thread = Thread(target=out_writer, name='out', args=(out_queue, stop_out))
    out_thread.start()
    stop_substart = [False]
    substart_thread = Thread(target=subproc_starter, name='out', args=(subproc_handlers, multi_factor, subcall_args, in_queue, out_queue, max_docs_per_subprocess, struct_out_tag, struct_buf_size, stop_substart, verbose, logdir))
    substart_thread.start()
    #read structures from the input file into in_queue
    q = 0
    for struct in read_big_structures(binary_stdin, structure_tag=struct_in_tag, buffer_size=struct_buf_size):
        #prevent loading all input data in memory in case of slow subprocesses
        while MAX_STRUCTS_IN_QUEUE < q:
            q = len(in_queue)
            if MAX_STRUCTS_IN_QUEUE < q:
                sleep(QUEUE_WAIT_DELAY)
        q += 1
        #add the read struct to the queue
        in_queue.append(struct)
    #wait for the subprocesses to read all structures from in_queue
    while in_queue:
        sleep(QUEUE_WAIT_DELAY)
    #stop all subprocesses and I/O threads
    stop_substart[0] = True
    substart_thread.join()
    for sh in subproc_handlers.values():
        sh.stop_writing()
    for sh in subproc_handlers.values():
        sh.stop_reading()
    stop_out[0] = True
    out_thread.join()

def main():
    try:
        opts, args = getopt.getopt(sys.argv[1:], 'm:t:T:b:l:vh', ['help'])
    except getopt.GetoptError as err:
        sys.stderr.write(str(err))
        usage()
        sys.exit(2)
    opt_kw_args = {}
    for o, a in opts:
        if '-m' == o:
            opt_kw_args['max_docs_per_subprocess'] = int(a)
        elif '-t' == o:
            opt_kw_args['struct_in_tag'] = a
        elif '-T' == o:
            opt_kw_args['struct_out_tag'] = a
        elif '-b' == o:
            opt_kw_args['struct_buf_size'] = int(a)
        elif '-l' == o:
            opt_kw_args['logdir'] = a
        elif '-v' == o:
            opt_kw_args['verbose'] = True
        elif '-h' == o or '--help' == o:
            usage()
            sys.exit()
        else:
            raise Exception ('Error: unhandled option')
    if 2 > len(args):
        sys.stderr.write('Error: MULTI_FACTOR and SUBPROGRAM must be specified.\n')
        usage()
        sys.exit(2)
    opt_kw_args['multi_factor'] = int(args[0])
    opt_kw_args['subcall_args'] = args[1:]
    vertfork(**opt_kw_args)

if __name__ == "__main__":
    main()

# vim: ts=4 sw=4 sta et sts=4 si syntax=python:
