11.07.10 Предотвращение перегрузки сервера - управление потоком входящих данных.

Для предотвращения перегрузки используем простой подход: если системный вызов
send/write/writev вернул EAGAIN, то перестаём ставить новые сообщения в очередь
отправки соединения до тех пор, пока не будут отправлены все данные, которые уже
находятся в очереди.

Отправку данных выполняет Sender. Есть две активных связки Sender'ов:
    ImmediateConnectionSender -> ConnectionSenderImpl
    DeferredConnectionSender  -> ConnectionSenderImpl

В общем случае о получении EAGAIN узнаём в произвольном месте (необязательно
именно в потоке, опбрабатывающем события для данного соединения). После
получения EAGAIN нужно перестать принимать новые данные из соединения (а нужно
ли?). Рассмотрим этот вопрос подробнее.

Q: Что нужно сделать для предотвращения перегрузки сервера, если выясняется, что
   мы имеем дело с медленным клиентом?

A: Во-первых, нужно начать отбрасывать сообщения, предназначенные клиенту.
   О том, какие сообщения можно отбрасывать, известно только на уровне
   бизнес-логики. Поэтому именно на уровне бизнес-логики нужно управлять работой
   при перегрузке.

   Во-вторых, нужно ограничить приём новых сообщений от медленного клиента. Это
   даст клиенту понять, что сервер испытвает затруднения с его обслуживанием
   (через механизм flow control протокола TCP). С другой стороны, сообщения типа
   ping reply нужно принимать от клиента постоянно. А вот отвечать на ping
   request не стоит, потому что немедленные ответы на ping request невозможно
   отбросить, что даст клиенту возможность для искуственной перегрузки очереди
   отправки сообщений на сервере.

   Можно было бы ввести отдельный порог для прекращения приёма данных от
   клиента. Например, предельное количество сообщений (или объём данных) в
   очереди отправки. Это означает, что есть два отдельных режима управления
   перегрузкой. Первый - после получения EAGAIN при записи в сокет - ограничение
   на отправку данных в сокет бизнес-логикой. Второй - заполнение очереди
   отправки клиенту - ограничение на приём данных от клиента. Это - хорошее
   разделение. Принимаю его. Можно ввести и третий уровень - двухкратное
   превышение длины очереди отправки. При этом можно выполнить нештатный разрыв
   соединения.

Как бы я ни выполнил механизм flow control, он всегда будет неполным. Можно
очень долго выдумывать всевозможные правила, помогающие справиться с перегрузкой
на сервере. Поэтому принятое решение неизбежно будет компромиссным и не будет
поддаваться чистой абстракции. Нужно выбрать для описанных выше правил наиболее
простое и логичное представление.

Основной узел, принимающий отдельные сообщения на отправку - это Sender.
На уровне Sender'а нужно принимать решения о прохождении барьеров, привязанных
к длине очереди отправки. Можно ввести набор фиксированных состояний очереди:
    ConnectionReady - нормальное состояние, нет перегрузки;
    ConnectionOverloaded - получен EAGAIN при записи в сокет;
    QueueSoftLimit - достигнут первый порог заполнения очереди;
    QueueHardLimit - достигнут второй порог заполнения очереди.

Эти состояния связаны и представляют собой набор возможных состояний соединения
с точки зрения отправки данных. Как построить управление перегрузкой на основе
этих состояний?

Нужно уведомлять о переходах между состояниями код бизнес-логики. Для этого
подойдёт Frontend Sender'а. Заводим в нём callback "sendStateChanged", который
будет вызываться каждый раз при смене состояния соединения. Первоначальное
соединение, подразумевающееся до первого вызова callback'а - ConnectionReady.

Q: Как именно будет выполняться уведомление?

A: В общем случае момент перехода из одного состояния в другой - произвольный.

   Переход к состоянию с меньшими ограничениями производится только после
   отправки данных. Переход к состоянию с большими ограничениями выполняется
   после отправки или после постановки данных в очередь. Отправка данных и
   постановка в очередь происходят в произвольный момент.
   
   Нужно реализовать механизм защиты от перегрузки так, чтобы затраты на
   синхронизацию были минимальными. Самый простой и эффективный способ -
   выполнять уведомления с удержанием мьюеткса состояния Sender'а. При этом
   от обработчиков уведомлений требуется, чтобы они не вызывали никаких методов
   Sender'а.

Лимиты soft и hard устанавливаются индивидуально для каждого Sender'а
бизнес-логикой. Лимит 0 означает отсутствие лимита.

Q: В каких единицах устанавливать лимиты на длину очереди?

A: Есть два доступных измерения: объём данных в очереди и кол-во условных
   сообщений. Объём данных может не отражать реального кол-ва ресурсов,
   необходимых для поддержания очереди. Кол-во сообщений, в свою очередь,
   ничего не говорит о требуемом для хранения этих сообщений объёме памяти.
   Использую пару чисел, разделённых запятой: размер в байтах, кол-во сообщений.

Одна из особенностей видеопотока состоит в том, что можно "прореживать" очередь
отправки сообщений при обнаружении перегрузки соединения. Наиболее простой
вариант реализации - при отправке сообщений помечать их как "droppable" (так и
сделаю: это полностью покроет все текущие потребности).

Q: На каком уровне бизнес-логики возможно управление механизмом flow control?

A: Нужно иметь возможность выполнять следующие действия:
     * Прореживать очередь отправки. Отдавать распоряжение об отбросе
       некритичных сообщений, предназначенных для отправки клиенту;
     * Блокировать приём данных от клиента и запрашивать повторное уведомление
       о доступности входящих данных;
     * Разрывать соединение с клиентом.

    Наиболее сильное требование - управление приёмом данных. Receiver в общем
    случае не может генерировать повторные уведомления о готовности данных так,
    чтобы не нарушались условия синхронизации (все уведомлелия от Receiver'а
    должны поступать из одного потока). Один из вариантов решения - ввести в
    Receiver метод triggerInput(), который в случае Moment ставил бы в PollGroup
    отложенное задание на передачу управления Receiver'у.

    Расширение интерфейса Receiver позволит сосредоточить основную часть логики
    flow control для RTMP-соединений в классе RtmpConnection. Это практически
    полностью освободит модули более высокого уровня от необходимости следить
    за перегрузкой. Так и сделаю.
      - Всё-таки принятие решений об отправке видеосообщения должно приниматься
        в mod_rtmp.

------

Реализация отложенных заданий в PollGroup. DeferredProcessor.

Задача: реализовать механизм отложенной передачи управления в потоке обработки
сообщений. Механизм должен быть достаточно общим и охватывать все возможные
циклы обработки сообщений. Нежелательно использовать динамическое выделение
памяти при постановке очередного задания в очередь.

Решение - механизм DeferredProcessor. Очередь отложенных заданий - это список
элементов DeferredProcessor::Registration. Объекты Registration связаны по
времени жизни с объектами, которые ставят задания в очередь. DeferredProcessor
поддерживает три операции: постановка задания в очередь, снятие с очереди
(вспомогательная операция), обработка накопившихся в очереди заданий. Все эти
операции асинхронные.

Игнорирую возможности для оптимизации блокировок (использование mutex'а
PollGroup), т.к. они нарушают декомпозицию, хотя и возможно впоследствии
прибегнуть к такой оптимизации.

В Moment механизм DeferredProcessor используется на уровне ServerApp.
! Не удастся ввести на уровне ServerApp. Нужно рассмотреть это подробнее
(см. ниже).

Постановка отложенных заданий выполняется через объект Registration, потому что
регистрации впоследствии могут мигрировать с одного DeferredProcessor на другой.

Ещё одно применение DeferredProcessor - защита от насыщения при обработке
очереди отправки в ConnectionSenderImpl.

(Из записей в черновике - решил, что для реализации миграции Registration
с одного DeferredProcessor на дргой нужно заводить mutex в самом Registration.
Также считаю, что автоматическая синхронизация событий ввода/вывода - это
не очень удобно, т.к. невозможно использовать аналог MyCpp::ImmediateScheduler)

Q: На каком уровне можно ввести DeferredProcessor?

A: Допустим, что нужно выдержать текущее требование к обработке событий
   ввода-вывода: синхронизацию вызова обработчиков без использования примитивов
   синхронизации на наиболее частом пути исполнения. При приёме соединения нужно
   знять, куда оно пойдёт - в какой поток, - чтобы знать, с каким
   DeferredSender'ом выполнить первоначальное связывание. При перебалансировке
   соединений по потокам нужно уметь обновлять объекты Registration.

   Эти условия указывают на то, что DeferredProcessor должен находиться близко
   к PollGroup, так как именно PollGroup (или Poller) распределяет соединения
   по потокам и занимается перебалансировкой. Единственное место, в котором
   можно провести перевоначальную инициализацию Registration - это метод
   PollGroup::addPollable(). Он будет принимать на вход указатель на объект
   Registration и инициализировать этот объект, связывая его с нужным
   DeferredProcessor'ом.

Для того, чтобы избавить DeferredConnectionSender от проблемы насыщения, за одну
иитерацию будем обрабатывать только ту часть очереди, которая скопилась к началу
итерации.

