Архитектура передачи данных на веб-клиент в режиме реального времени
Цели архитектуры
Архитектура доставки сообщений должна:
- поддерживать несколько соединений на одного пользователя (несколько вкладок)
- обеспечивать разные модели доставки:
- unicast (одному пользователю)
- multicast (группам: users / units)
- broadcast (всем)
- эффективно работать при большом количестве сообщений
- изолировать медленные клиенты (backpressure)
- быть расширяемой без переписывания существующего кода
Общая схема компонентов
MessageSource (API, Jobs)
↓
MessageRouter
↓
DeliveryStrategy
↓
ConnectionRegistry
↓
ConnectionContext (Queue)
↓
SendLoop
↓
WebSocket
MessageEnvelop
Все сообщения передаются в едином формате-обёртке
public class MessageEnvelope
{
public DeliveryType DeliveryType { get; init; }
public string MessageType { get; init; } // e.g. UnitStatusDto
public string? UserId { get; init; }
public IEnumerable<string>? UserIds { get; init; }
public IEnumerable<string>? UnitIds { get; init; }
public ReadOnlyMemory<byte> Payload { get; init; }
}
public enum DeliveryType
{
Unicast,
Multicast,
Broadcast
}
public interface IOutgoingMessage
{
ReadOnlyMemory<byte> ToJsonUtf8();
}
// Example
[MemoryPackable]
public partial class UnitStatusDto : IOutgoingMessage
{
public Guid UnitId { get; set; }
public string Status { get; set; } = default!;
public int Progress { get; set; }
public ReadOnlyMemory<byte> ToPayloadUtf8()
{
var typeName = GetType().Name;
// Обёртка
var transportObject = new
{
type = typeName,
data = this
};
return JsonSerializer.SerializeToUtf8Bytes(transportObject);
}
}
var message = new UnitStatusDto
{
UnitId = id,
Status = "Working",
Progress = 42
};
var envelope = new MessageEnvelope
{
DeliveryType = DeliveryType.Multicast,
UnitIds = new[] { unitId },
Payload = message.ToJsonUtf8()
};
Доменные модели реализуют контракт сериализации. На уровне WebSocket-доставки используются только готовые байтовые представления сообщений, что полностью изолирует транспортный слой от формата данных.
MessageRouter
Ответственность заключается в выборе стратегии доставки.
Не знает ничего о реализации отправки, что способствует для последующией миграции на другие протоколы.
public class MessageRouter
{
private readonly IDictionary<DeliveryType, IMessageDeliveryStrategy> _strategies;
public Task RouteAsync(MessageEnvelope message)
=> _strategies[message.DeliveryType].SendAsync(message);
}
Delivery Strategy
Каждая модель доставки реализуется отдельной стратегией.
public interface IMessageDeliveryStrategy
{
Task SendAsync(MessageEnvelope message);
}
| Стратегия | Назначение |
|---|---|
| UnicastStrategy | Сообщение одному пользователю |
| MulticastStrategy | Пользователям и/или units |
| BroadcastStrategy | Всем подключениям |
ConnectionRegistry
Единый источник информации о подключениях.
public interface IConnectionRegistry
{
void AddConnection(ConnectionContext ctx);
void RemoveConnection(ConnectionContext ctx);
IEnumerable<ConnectionContext> GetUserConnections(Guid userId);
IEnumerable<ConnectionContext> GetUsersConnections(IEnumerable<Guid> userIds);
IEnumerable<ConnectionContext> GetUnitConnections(int unitId);
IEnumerable<ConnectionContext> GetUnitsConnections(IEnumerable<int> unitIds);
IEnumerable<ConnectionContext> GetAllConnections();
}
Один пользователь/юнит может иметь несколько активных соединений.
Жизненный цикл:
- Подключение:
- Создаётся ConnectionContext для WebSocket
- Вызывается ConnectionRegistry.AddConnection(ctx)
- Обновляются структуры _userConnections, _unitConnections, _allConnections
- Отключение:
- WebSocket закрыт или исключение в SendLoop
- Вызывается ConnectionRegistry.RemoveConnection(ctx)
- Удаляется из всех словарей и групп
ConnectionContext
Каждое WebSocket-соединение оборачивается в ConnectionContext. Он не знает что внутри сообщения (MemoryPack или JSON), т.к. он работает только с байтами. Сериализация на уровне формирования сообщений.
class ConnectionContext
{
public Guid ConnectionId { get; }
public Guid UserId { get; }
public WebSocket Socket { get; }
public Channel<ReadOnlyMemory<byte>> Outgoing { get; }
}
Назначение
- изолировать медленные соединения
- обеспечить backpressure
- избежать блокировки при массовой рассылке
Очередь сообщений (backpressure)
Используется bounded channel:
Channel.CreateBounded<ReadOnlyMemory<byte>>(
new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.DropOldest
}
);
Для того, чтобы очередь была ограниченной. Лучше потеря сообщения, чем лежащий сервер из-за нехватки памяти.
WebSocket сам по себе не гарантирует доставку. Для реализации такой логики нужно будет очень много думать( Одним из вариантов будет реализация через отдельные очереди сообщений:
- critical (нельзя терять ни в коем случае), реализовывать через ack
- actual (актуальные состояния (позиция юнита), если клиент пропустил 3 обновления - не страшно
Поведение:
- если клиент не успевает читать → старые сообщения отбрасываются
- сервер не растёт по памяти
SendLoop
Для каждого WebSocket-соединения создаётся независимый асинхронный цикл отправки (SendLoop), который читает сообщения из ограниченной очереди. Это обеспечивает изоляцию клиентов, предотвращает блокировку при массовой рассылке и защищает сервер от неконтролируемого роста памяти.
Также он не знает формат сообщений, не знает тип, не знает domain модель, только отправляет utf-8, что идеально по SRP.
async Task SendLoop(ConnectionContext ctx, CancellationToken ct)
{
try
{
await foreach (var msg in ctx.Outgoing.Reader.ReadAllAsync(ct))
{
if (ctx.Socket.State != WebSocketState.Open)
break;
await ctx.Socket.SendAsync(
msg,
WebSocketMessageType.Text,
true,
ct
);
}
}
catch (OperationCanceledException)
{
// нормальный shutdown
}
catch (WebSocketException)
{
// клиент отвалился
}
finally
{
ctx.Outgoing.Writer.TryComplete();
ctx.Socket.Abort();
}
}
- если сообщений нет → поток освобождён
- loop “просыпается” только когда появляется сообщение
Благодаря await ctx.Socket.SendAsync(...)
- Несмотря на то, что SendAsync может быть медленным из-за клиента
- у каждого соединения свой SendLoop
- он ждёт тоько свой сокет
- остальные соединение не затрагиваются
- медленный клиент блокирует только свой SendLoop
- массовая рассылка не ждёт await SendAsync
Жизненный цикл соединения
Подключение
- HTTP → WebSocket upgrade
- Аутентификация пользователя
- Создание ConnectionContext
- Регистрация в ConnectionRegistry
- Запуск SendLoop
Возможная расширяемость
Добавить новый тип доставки:
- Новая стратегия
- Регистрация в DI