Архитектура передачи данных на веб-клиент в режиме реального времени
Цели архитектуры
Архитектура доставки сообщений должна:
- поддерживать несколько соединений на одного пользователя (несколько вкладок)
- обеспечивать разные модели доставки:
- unicast (одному пользователю)
- multicast (
группам:группеusers / units)пользователей) - broadcast (всем)
- эффективно работать при большом количестве сообщений
- изолировать медленные клиенты (backpressure)
- быть расширяемой без переписывания существующего кода
Общая схема компонентов
MessageSource
↓
TransportSerializer (API,сериализация Jobs)в JSON)
↓
MessageEnvelop
↓
MessageRouter
↓
DeliveryStrategy
↓
ConnectionRegistry
↓
ConnectionContext (Queue)BoundedChannel)
↓
SendLoop
↓
WebSocket
MessageEnvelop
Все сообщения передаются в едином формате-обёрткеобёртке:
public sealed class MessageEnvelope
{
public DeliveryType DeliveryType { get; init; }
public stringGuid? MessageTypeUserId { get; init; } // e.g.для UnitStatusDto
public string? UserId { get; init; }Unicast
public IEnumerable<stringGuid>? UserIds { get; init; } public// IEnumerable<string>?для UnitIds { get; init; }Multicast
public ReadOnlyMemory<byte> Payload { get; init; } // готовые байты (JSON)
}
public enum DeliveryType
{
Unicast,
Multicast,
Broadcast
}
public interface IOutgoingMessage
{
ReadOnlyMemory<byte> ToJsonUtf8();
}
// Example
[MemoryPackable]
public partial class UnitStatusDto : IOutgoingMessage
{
public Guid UnitId { get; set; }
public string Status { get; set; } = default!;
public int Progress { get; set; }
public ReadOnlyMemory<byte> ToPayloadUtf8()
{
var typeName = GetType().Name;
// Обёртка
var transportObject = new
{
type = typeName,
data = this
};
return JsonSerializer.SerializeToUtf8Bytes(transportObject);
}
}
var message = new UnitStatusDto
{
UnitId = id,
Status = "Working",
Progress = 42
};
var envelope = new MessageEnvelope
{
DeliveryType = DeliveryType.Multicast,
UnitIds = new[] { unitId },
Payload = message.ToJsonUtf8()
};
Доменные модели реализуют контракт сериализации.На уровне WebSocket-доставки используются только готовые байтовые представления сообщений, что полностью изолирует транспортный слой от формата данных.
TransportSerializer
Доменные моедли сериалзиуются через TransportSerializer:
public static class TransportSerializer
{
public static ReadOnlyMemory<byte> Serialize(OutgoingMessage message)
{
var typeName = message.GetType().Name;
var transportObject = new
{
type = typeName,
data = (object)message
};
return JsonSerializer.SerializeToUtf8Bytes(transportObject);
}
}
// Example
var message = new UnitStatusPack
{
UnitId = id,
Status = "Working",
Progress = 42
};
var payload = TransportSerializer.Serialize(message);
var envelope = new MessageEnvelope
{
DeliveryType = DeliveryType.Broadcast,
Payload = payload
};
Сериализация происходит до создания MessageEnvelope. Т.к. транспортный слой работает только с байтами.
MessageRouter
Ответственность заключается в выборе стратегии доставки.доставки по типу сообщения.
Не знает ничего о реализации отправки, что способствует для последующией миграции на другие протоколы.
public sealed class MessageRouter : IMessageRouter
{
private readonly IDictionaryIReadOnlyDictionary<DeliveryType, IMessageDeliveryStrategy> _strategies;
public Task RouteAsync(MessageEnvelope message)
=>{
_strategies[if (!_strategies.TryGetValue(message.DeliveryType].DeliveryType, out var strategy))
throw new InvalidOperationException($"No strategy for {message.DeliveryType}");
return strategy.SendAsync(message);
}
}
Delivery Strategy
Каждая модель доставки реализуется отдельной стратегией.
public interface IMessageDeliveryStrategy
{
Task SendAsync(MessageEnvelope message);
}
| Стратегия | Назначение | Параметры MessageEnvelope |
|---|---|---|
| UnicastStrategy | Сообщение одному пользователю | UserId |
| MulticastStrategy | UserIds | |
| BroadcastStrategy | Всем подключениям | - |
Реализация:
- Стратегии получают список соединений из ConnectionRegistry
- Проверяют состояние WebSocket перед отправкой
- Записывают сообщение в Channel каждого соединения через TryWrite
- Логируют результаты доставки (успешные/неуспешные)
Если
TryWriteвозвращает false (канал полон или закрыт), сообщение логируется как неуспешное, но не блокирует доставку другим соединениям.
ConnectionRegistry
Единый источник информации о подключениях.
public interface IConnectionRegistry
{
void AddConnection(Add(ConnectionContext ctx);
void RemoveConnection(Remove(ConnectionContext ctx);
IEnumerable<ConnectionContext> GetUserConnections(Guid userId);
IEnumerable<ConnectionContext> GetUsersConnections(IEnumerable<Guid> userIds);
IEnumerable<ConnectionContext> GetUnitConnections(int unitId);
IEnumerable<ConnectionContext> GetUnitsConnections(IEnumerable<int> unitIds);
IEnumerable<ConnectionContext> GetAllConnections();
}
Один
пользователь/юнитпользователь может иметь несколько активных соединений.
Жизненный цикл:
- Подключение:
- Создаётся
ConnectionContextдля WebSocket- Вызывается
ConnectionRegistry.AddConnection(ctx)- Обновляются структуры _userConnections,
_unitConnections,_allConnections- Отключение:
- WebSocket закрыт или исключение в
SendLoopSendLoop/RecieveLoop- Вызывается
ConnectionRegistry.RemoveConnection(ctx)- Удаляется из всех словарей
и групп
ConnectionContext
Каждое WebSocket-соединение оборачивается в ConnectionContext.
Он не знает что внутри сообщения (MemoryPack или JSON), т.к. он работает только с байтами. Сериализация на уровне формирования сообщений.ConnectionContext:
public sealed class ConnectionContext
{
public Guid ConnectionId { get; } = Guid.NewGuid();
public Guid UserId { get; }
public WebSocket Socket { get; }
public Channel<ReadOnlyMemory<byte>> Outgoing { get; }
}
НазначениеНазначение:
- изолировать медленные соединения
- обеспечить backpressure
- избежать блокировки при массовой рассылке
Очередь сообщений (backpressure)
Используется bounded channel:channel с настраиваемым размером:
Channel.CreateBounded<ReadOnlyMemory<byte>>(
new BoundedChannelOptions(100)channelBufferSize) // из конфигурации, по умолчанию 100
{
FullMode = BoundedChannelFullMode.DropOldest
}
);
Для того, чтобы очередь была ограниченной. Лучше потеря сообщения, чем лежащий сервер из-за нехватки памяти.
WebSocket сам по себе не гарантирует доставку. Для реализации такой логики нужно будет очень много думать( Одним из вариантов будет реализация через отдельные очереди сообщений:
critical (нельзя терять ни в коем случае), реализовывать через ackactual (актуальные состояния (позиция юнита), если клиент пропустил 3 обновления - не страшно
Поведение:
- если клиент не успевает читать → старые сообщения отбрасываются
- сервер не растёт по памяти
SendLoop
Для каждого WebSocket-соединения создаётся независимый асинхронный цикл отправки (SendLoop), который читает сообщения из ограниченной очереди. Это обеспечивает изоляцию клиентов, предотвращает блокировку при массовой рассылке и защищает сервер от неконтролируемого роста памяти.
Также он не знает формат сообщений, не знает тип, не знает domain модель, только отправляет utf-8, что идеально по SRP.
public static async Task SendLoop(RunAsync(
ConnectionContext ctx,
CancellationToken ct)cancellationToken)
{
var socket = ctx.Socket;
var reader = ctx.Outgoing.Reader;
try
{
await foreachwhile (var!cancellationToken.IsCancellationRequested msg&&
in ctx.Outgoing.Reader.ReadAllAsync(ct))
{
if (ctx.Socket.socket.State !== WebSocketState.Open)
break;{
var payload = await ctx.Socket.reader.ReadAsync(cancellationToken);
await socket.SendAsync(
msg,payload,
WebSocketMessageType.Text,
endOfMessage: true,
ct
)cancellationToken);
}
}
catch (OperationCanceledException)
{
// нормальный shutdown
}
catch (ChannelClosedException)
{
// канал закрыт
}
catch (WebSocketException)
{
// клиент отвалился
}
finally
{
ctx.Outgoing.Writer.TryComplete();await ctx.Socket.Abort()SafeCloseAsync(socket);
}
}
еслиEсли сообщений нет → поток освобождёнloopLoop “просыпается” только когда появляется сообщение
Благодаря await ctx.Socket.SendAsync(...)
Несмотря на то, что SendAsync может быть медленным из-за клиентау каждого соединения свой SendLoopон ждёт тоько свой сокетостальные соединение не затрагиваются
медленныйМедленный клиент блокирует только свой SendLoopмассоваяМассовая рассылка не ждётawait SendAsyncдругих соединений
Жизненный цикл соединения
ПодключениеПодключение:
- HTTP → WebSocket upgrade
- JWT Аутентификация пользователя
- Создание ConnectionContext
- Регистрация в ConnectionRegistry
- Запуск SendLoop
Возможная расширяемость
Добавить новый тип доставки:
НоваяСоздатьстратегияновую стратегию, реализующую IMessageDeliveryStrategyРегистрацияДобавить новый DeliveryType в enum- Зарегистрировать стратегию в DI
- Обновить MessageRouter для маппинга типа на стратегию
- Расширить WaitForCloseAsync для обработки текстовых сообщений
- Добавить десериализацию и обработку команд от клиента
- Расширить ConnectionContext полем UnitId
- Добавить методы
GetUnitConnectionsвConnectionRegistry - Обновить стратегии для фильтрации по UnitId