Архитектура передачи данных на веб-клиент в режиме реального времени
Цели архитектуры
Архитектура доставки сообщений должна:
- поддерживать несколько соединений на одного пользователя (несколько вкладок)
- обеспечивать разные модели доставки:
- unicast (одному пользователю)
- multicast (группе пользователей)
- broadcast (всем)
- эффективно работать при большом количестве сообщений
- изолировать медленные клиенты (backpressure)
- быть расширяемой без переписывания существующего кода
Общая схема компонентов
MessageSource
↓
TransportSerializer (сериализация в JSON)
↓
MessageEnvelop
↓
MessageRouter
↓
DeliveryStrategy
↓
ConnectionRegistry
↓
ConnectionContext (BoundedChannel)
↓
SendLoop
↓
WebSocket
MessageEnvelop
Все сообщения передаются в едином формате-обёртке:
public sealed class MessageEnvelope
{
public DeliveryType DeliveryType { get; init; }
public Guid? UserId { get; init; } // для Unicast
public IEnumerable<Guid>? UserIds { get; init; } // для Multicast
public ReadOnlyMemory<byte> Payload { get; init; } // готовые байты (JSON)
}
public enum DeliveryType
{
Unicast,
Multicast,
Broadcast
}
На уровне WebSocket-доставки используются только готовые байтовые представления сообщений, что полностью изолирует транспортный слой от формата данных.
TransportSerializer
Доменные моедли сериалзиуются через TransportSerializer:
public static class TransportSerializer
{
public static ReadOnlyMemory<byte> Serialize(OutgoingMessage message)
{
var typeName = message.GetType().Name;
var transportObject = new
{
type = typeName,
data = (object)message
};
return JsonSerializer.SerializeToUtf8Bytes(transportObject);
}
}
// Example
var message = new UnitStatusPack
{
UnitId = id,
Status = "Working",
Progress = 42
};
var payload = TransportSerializer.Serialize(message);
var envelope = new MessageEnvelope
{
DeliveryType = DeliveryType.Broadcast,
Payload = payload
};
Сериализация происходит до создания MessageEnvelope. Т.к. транспортный слой работает только с байтами.
MessageRouter
Ответственность заключается в выборе стратегии доставки по типу сообщения.
Не знает ничего о реализации отправки, что способствует для последующией миграции на другие протоколы.
public sealed class MessageRouter : IMessageRouter
{
private readonly IReadOnlyDictionary<DeliveryType, IMessageDeliveryStrategy> _strategies;
public Task RouteAsync(MessageEnvelope message)
{
if (!_strategies.TryGetValue(message.DeliveryType, out var strategy))
throw new InvalidOperationException($"No strategy for {message.DeliveryType}");
return strategy.SendAsync(message);
}
}
Delivery Strategy
Каждая модель доставки реализуется отдельной стратегией.
public interface IMessageDeliveryStrategy
{
Task SendAsync(MessageEnvelope message);
}
| Стратегия | Назначение | Параметры MessageEnvelope |
|---|---|---|
| UnicastStrategy | Сообщение одному пользователю | UserId |
| MulticastStrategy | Группе пользователей | UserIds |
| BroadcastStrategy | Всем подключениям | - |
Реализация:
- Стратегии получают список соединений из ConnectionRegistry
- Проверяют состояние WebSocket перед отправкой
- Записывают сообщение в Channel каждого соединения через TryWrite
- Логируют результаты доставки (успешные/неуспешные)
Если
TryWriteвозвращает false (канал полон или закрыт), сообщение логируется как неуспешное, но не блокирует доставку другим соединениям.
ConnectionRegistry
Единый источник информации о подключениях.
public interface IConnectionRegistry
{
void Add(ConnectionContext ctx);
void Remove(ConnectionContext ctx);
IEnumerable<ConnectionContext> GetUserConnections(Guid userId);
IEnumerable<ConnectionContext> GetUsersConnections(IEnumerable<Guid> userIds);
IEnumerable<ConnectionContext> GetAllConnections();
}
Один пользователь может иметь несколько активных соединений.
Жизненный цикл:
- Подключение:
- Создаётся
ConnectionContextдля WebSocket - Вызывается
ConnectionRegistry.AddConnection(ctx) - Обновляются структуры _userConnections, _allConnections
- Создаётся
- Отключение:
- WebSocket закрыт или исключение в SendLoop/RecieveLoop
- Вызывается
ConnectionRegistry.RemoveConnection(ctx) - Удаляется из всех словарей
ConnectionContext
Каждое WebSocket-соединение оборачивается в ConnectionContext:
public sealed class ConnectionContext
{
public Guid ConnectionId { get; } = Guid.NewGuid();
public Guid UserId { get; }
public WebSocket Socket { get; }
public Channel<ReadOnlyMemory<byte>> Outgoing { get; }
}
Назначение:
- изолировать медленные соединения
- обеспечить backpressure
- избежать блокировки при массовой рассылке
Очередь сообщений (backpressure)
Используется bounded channel с настраиваемым размером:
Channel.CreateBounded<ReadOnlyMemory<byte>>(
new BoundedChannelOptions(channelBufferSize) // из конфигурации, по умолчанию 100
{
FullMode = BoundedChannelFullMode.DropOldest
}
);
Поведение:
- если клиент не успевает читать → старые сообщения отбрасываются
- сервер не растёт по памяти
Важно: С
BoundedChannelFullMode.DropOldest:
- Если канал полон →
TryWriteвозвращаетtrue, новое сообщение записывается, старое сообщение вытесняется и теряется- Если канал закрыт →
TryWriteвозвращаетfalse, сообщение не записывается и логируется как неуспешноеТаким образом, стратегии не блокируются при полном канале, но старые сообщения теряются для медленных клиентов. Это обеспечивает изоляцию медленных соединений и предотвращает рост памяти сервера.
SendLoop
Для каждого WebSocket-соединения создаётся независимый асинхронный цикл отправки (SendLoop), который читает сообщения из ограниченной очереди. Это обеспечивает изоляцию клиентов, предотвращает блокировку при массовой рассылке и защищает сервер от неконтролируемого роста памяти.
public static async Task RunAsync(
ConnectionContext ctx,
CancellationToken cancellationToken)
{
var socket = ctx.Socket;
var reader = ctx.Outgoing.Reader;
try
{
while (!cancellationToken.IsCancellationRequested &&
socket.State == WebSocketState.Open)
{
var payload = await reader.ReadAsync(cancellationToken);
await socket.SendAsync(
payload,
WebSocketMessageType.Text,
endOfMessage: true,
cancellationToken);
}
}
catch (OperationCanceledException)
{
// нормальный shutdown
}
catch (ChannelClosedException)
{
// канал закрыт
}
catch (WebSocketException)
{
// клиент отвалился
}
finally
{
await SafeCloseAsync(socket);
}
}
- Eсли сообщений нет → поток освобождён
- Loop “просыпается” только когда появляется сообщение
- Медленный клиент блокирует только свой SendLoop
- Массовая рассылка не ждёт
await SendAsyncдругих соединений
Жизненный цикл соединения
Подключение:
- HTTP → WebSocket upgrade
- JWT Аутентификация пользователя
- Создание ConnectionContext
- Регистрация в ConnectionRegistry
- Запуск SendLoop
Возможная расширяемость
Добавить новый тип доставки:
- Создать новую стратегию, реализующую IMessageDeliveryStrategy
- Добавить новый DeliveryType в enum
- Зарегистрировать стратегию в DI
- Обновить MessageRouter для маппинга типа на стратегию
Добавить обработку входящих сообщений
- Расширить WaitForCloseAsync для обработки текстовых сообщений
- Добавить десериализацию и обработку команд от клиента
Добавить поддержку UnitConnections
- Расширить ConnectionContext полем UnitId
- Добавить методы
GetUnitConnectionsвConnectionRegistry - Обновить стратегии для фильтрации по UnitId
No comments to display
No comments to display