Алгоритм балансировки очередей с квотами и гарантией неперебивания ресурсов
Условие задачи
Дано:
- Канал для отправки сообщений (полезная нагрузка) с заданной пропускной способностью n, определяемой как максимальное количество сообщений, которое можно отправить в него за один раз.
- Произвольное количество очередей с сообщениями (m). Сообщения поступают в очереди асинхронно и в разных количествах. Могут появляться новые очереди с сообщениями.
- Каждая очередь имеет идентификатор (целое число).
- Для каждой очереди известно количество сообщений в ней на момент запроса.
- С каждой очередью ассоциировано вещественное положительное число (квота, вес), представляющее собой минимальную долю в пропускной способности канала.
Задача: составить алгоритм, который на каждой итерации рассчитывает, сколько необходимо взять сообщений из каждой очереди для отправки в канал так, чтобы выполнялись следующие условия:
- Work-conserving: утилизация канала должна быть максимально полной. Например, если общее количество сообщений по всем очередям превышает пропускную способность канала, то количество отправляемых сообщений равно пропускной способности канала.
- Starvation-free: не должно быть resource starvation относительно очередей с малой долей. Например, отправка идёт с двух очередей: у одной пропускная способность 1000, у другой — 0.00001. В этом случае сообщения должны браться из второй очереди.
- Fairness: гарантия того, что каждая очередь получает доступ к каналу согласно своей квоте.
Допустим, пропускная способность канала равна 10. Примеры того, как мог бы работать алгоритм на произвольной итерации:
- Одна очередь с квотой 0.5, в очереди 100 сообщений. Так как очередь одна, независимо от квоты, в канал отправляется 10 сообщений за раз.
- Две очереди с квотой 0.5, по 100 сообщений в каждой. Берётся по 5 сообщений из каждой очереди и отправляется в канал.
- Две очереди, квоты 0.2 и 0.8. В каждой по 100 сообщений. Из первой берётся 2 сообщения, из второй — 8 сообщений.
- Две очереди, квоты 0.25 и 1. В первой очереди по 2 сообщения, во второй — 8 сообщений.
- 100 очередей, все с квотой 1, по 100 сообщений в каждой. Берётся по одному сообщению из каждой десятой очереди, затем из каждой десятой со смещением на 1, и так далее.
php// Основной цикл отправки сообщений, на каждой итерации которого вычисляется нужное количество сообщений по каждой очереди.
function dispatcher(Source $source) {
$messages = ;
while (true) {
// Имитируем поступление сообщений в очереди
$source->next();
// Рассчитываем количество сообщений которое нужно извлечь из каждой очереди
$batchSizes = calculateMessageBatchSizes($source->queueSizes(), $source->queueQuotas());
$source->printStats($batchSizes);
// Извлекаем нужное количество сообщений из каждой очереди и складываем их в общий массив
foreach ($batchSizes as $queueId => $batchSize) {
if ($batchSize <= 0) {
continue;
}
$messages = array_merge(
$messages,
$source->extractMessagesFromQueue($queueId, $batchSize)
);
}
// Отправляем сообщения если они есть
if ($messages) {
sendMessages($messages);
$messages = ;
}
}
}
/**
* @param array<int, int> $queueSizes
* @param array<int, float> $queueQuotas
* @param int $bandwidth
* @return array
*/
function calculateMessageBatchSizes(array $queueSizes, array $queueQuotas, int $bandwidth = 20): array
{