Skip to main content

Архитектура передачи данных на веб-клиент в режиме реального времени

Цели архитектуры

Архитектура доставки сообщений должна:

  • поддерживать несколько соединений на одного пользователя (несколько вкладок)
  • обеспечивать разные модели доставки:
    • unicast (одному пользователю)
    • multicast (группам:группе users / units)пользователей)
    • broadcast (всем)
  • эффективно работать при большом количестве сообщений
  • изолировать медленные клиенты (backpressure)
  • быть расширяемой без переписывания существующего кода

Общая схема компонентов

MessageSource
  ↓
TransportSerializer (API,сериализация Jobs)в JSON)
  ↓
MessageEnvelop
  ↓
MessageRouter
  ↓
DeliveryStrategy
  ↓
ConnectionRegistry
  ↓
ConnectionContext (Queue)BoundedChannel)
  ↓
SendLoop
  ↓
WebSocket

MessageEnvelop

Все сообщения передаются в едином формате-обёрткеобёртке:

public sealed class MessageEnvelope
{
    public DeliveryType DeliveryType { get; init; }
    
    public stringGuid? MessageTypeUserId { get; init; }                    // e.g.для UnitStatusDto

    public string? UserId { get; init; }Unicast
    public IEnumerable<stringGuid>? UserIds { get; init; }      public// IEnumerable<string>?для UnitIds { get; init; }Multicast
    
    public ReadOnlyMemory<byte> Payload { get; init; }    // готовые байты (JSON)
}

public enum DeliveryType
{
    Unicast,
    Multicast,
    Broadcast
}
public interface IOutgoingMessage
{
    ReadOnlyMemory<byte> ToJsonUtf8();
}

// Example
[MemoryPackable]
public partial class UnitStatusDto : IOutgoingMessage
{
    public Guid UnitId { get; set; }
    public string Status { get; set; } = default!;
    public int Progress { get; set; }

    public ReadOnlyMemory<byte> ToPayloadUtf8()
    {
        var typeName = GetType().Name;

        // Обёртка
        var transportObject = new
        {
            type = typeName,
            data = this
        };

        return JsonSerializer.SerializeToUtf8Bytes(transportObject);
    }
}

var message = new UnitStatusDto
{
    UnitId = id,
    Status = "Working",
    Progress = 42
};

var envelope = new MessageEnvelope
{
    DeliveryType = DeliveryType.Multicast,
    UnitIds = new[] { unitId },
    Payload = message.ToJsonUtf8()
};

Доменные модели реализуют контракт сериализации. На уровне WebSocket-доставки используются только готовые байтовые представления сообщений, что полностью изолирует транспортный слой от формата данных.

TransportSerializer

Доменные моедли сериалзиуются через TransportSerializer:

public static class TransportSerializer
{
    public static ReadOnlyMemory<byte> Serialize(OutgoingMessage message)
    {
        var typeName = message.GetType().Name;
        
        var transportObject = new
        {
            type = typeName,
            data = (object)message
        };
        
        return JsonSerializer.SerializeToUtf8Bytes(transportObject);
    }
}

// Example
var message = new UnitStatusPack
{
    UnitId = id,
    Status = "Working",
    Progress = 42
};

var payload = TransportSerializer.Serialize(message);

var envelope = new MessageEnvelope
{
    DeliveryType = DeliveryType.Broadcast,
    Payload = payload
};

Сериализация происходит до создания MessageEnvelope. Т.к. транспортный слой работает только с байтами.

MessageRouter

Ответственность заключается в выборе стратегии доставки.доставки по типу сообщения.

Не знает ничего о реализации отправки, что способствует для последующией миграции на другие протоколы.

public sealed class MessageRouter : IMessageRouter
{
    private readonly IDictionaryIReadOnlyDictionary<DeliveryType, IMessageDeliveryStrategy> _strategies;
    
    public Task RouteAsync(MessageEnvelope message)
    =>{
        _strategies[if (!_strategies.TryGetValue(message.DeliveryType].DeliveryType, out var strategy))
            throw new InvalidOperationException($"No strategy for {message.DeliveryType}");
        
        return strategy.SendAsync(message);
    }
}

Delivery Strategy

Каждая модель доставки реализуется отдельной стратегией.

public interface IMessageDeliveryStrategy
{
    Task SendAsync(MessageEnvelope message);
}
units
Стратегия Назначение Параметры MessageEnvelope
UnicastStrategy Сообщение одному пользователю UserId
MulticastStrategy ПользователямГруппе и/илипользователей UserIds
BroadcastStrategy Всем подключениям -

Реализация:

  • Стратегии получают список соединений из ConnectionRegistry
  • Проверяют состояние WebSocket перед отправкой
  • Записывают сообщение в Channel каждого соединения через TryWrite
  • Логируют результаты доставки (успешные/неуспешные)

Если TryWrite возвращает false (канал полон или закрыт), сообщение логируется как неуспешное, но не блокирует доставку другим соединениям.

ConnectionRegistry

Единый источник информации о подключениях.

public interface IConnectionRegistry
{
    void AddConnection(Add(ConnectionContext ctx);
    void RemoveConnection(Remove(ConnectionContext ctx);
    
    IEnumerable<ConnectionContext> GetUserConnections(Guid userId);
    IEnumerable<ConnectionContext> GetUsersConnections(IEnumerable<Guid> userIds);
    IEnumerable<ConnectionContext> GetUnitConnections(int unitId);
    IEnumerable<ConnectionContext> GetUnitsConnections(IEnumerable<int> unitIds);
    IEnumerable<ConnectionContext> GetAllConnections();
}

Один пользователь/юнитпользователь может иметь несколько активных соединений.

Жизненный цикл:

  • Подключение:
    • Создаётся ConnectionContext для WebSocket
    • Вызывается ConnectionRegistry.AddConnection(ctx)
    • Обновляются структуры _userConnections, _unitConnections, _allConnections
  • Отключение:
    • WebSocket закрыт или исключение в SendLoopSendLoop/RecieveLoop
    • Вызывается ConnectionRegistry.RemoveConnection(ctx)
    • Удаляется из всех словарей и групп

ConnectionContext

Каждое WebSocket-соединение оборачивается в ConnectionContext. Он не знает что внутри сообщения (MemoryPack или JSON), т.к. он работает только с байтами. Сериализация на уровне формирования сообщений.ConnectionContext:

public sealed class ConnectionContext
{
    public Guid ConnectionId { get; } = Guid.NewGuid();
    public Guid UserId { get; }
    public WebSocket Socket { get; }
    
    public Channel<ReadOnlyMemory<byte>> Outgoing { get; }
}

НазначениеНазначение:

  • изолировать медленные соединения
  • обеспечить backpressure
  • избежать блокировки при массовой рассылке

Очередь сообщений (backpressure)

Используется bounded channel:channel с настраиваемым размером:

Channel.CreateBounded<ReadOnlyMemory<byte>>(
    new BoundedChannelOptions(100)channelBufferSize)  // из конфигурации, по умолчанию 100
    {
        FullMode = BoundedChannelFullMode.DropOldest
    }
);

Для того, чтобы очередь была ограниченной. Лучше потеря сообщения, чем лежащий сервер из-за нехватки памяти.

WebSocket сам по себе не гарантирует доставку. Для реализации такой логики нужно будет очень много думать( Одним из вариантов будет реализация через отдельные очереди сообщений:

  • critical (нельзя терять ни в коем случае), реализовывать через ack
  • actual (актуальные состояния (позиция юнита), если клиент пропустил 3 обновления - не страшно

Поведение:

  • если клиент не успевает читать → старые сообщения отбрасываются
  • сервер не растёт по памяти

SendLoop

Для каждого WebSocket-соединения создаётся независимый асинхронный цикл отправки (SendLoop), который читает сообщения из ограниченной очереди. Это обеспечивает изоляцию клиентов, предотвращает блокировку при массовой рассылке и защищает сервер от неконтролируемого роста памяти.

Также он не знает формат сообщений, не знает тип, не знает domain модель, только отправляет utf-8, что идеально по SRP.

public static async Task SendLoop(RunAsync(
    ConnectionContext ctx,
    CancellationToken ct)cancellationToken)
{
    var socket = ctx.Socket;
    var reader = ctx.Outgoing.Reader;
    
    try
    {
        await foreachwhile (var!cancellationToken.IsCancellationRequested msg&&
               in ctx.Outgoing.Reader.ReadAllAsync(ct))
        {
            if (ctx.Socket.socket.State !== WebSocketState.Open)
        break;{
            var payload = await ctx.Socket.reader.ReadAsync(cancellationToken);
            
            await socket.SendAsync(
                msg,payload,
                WebSocketMessageType.Text,
                endOfMessage: true,
                ct
            )cancellationToken);
        }
    }
    catch (OperationCanceledException)
    {
        // нормальный shutdown
    }
    catch (ChannelClosedException)
    {
        // канал закрыт
    }
    catch (WebSocketException)
    {
        // клиент отвалился
    }
    finally
    {
        ctx.Outgoing.Writer.TryComplete();await ctx.Socket.Abort()SafeCloseAsync(socket);
    }
}
  • еслиEсли сообщений нет → поток освобождён
  • loopLoop “просыпается” только когда появляется сообщение

Благодаря await ctx.Socket.SendAsync(...)

  • Несмотря на то, что SendAsync может быть медленным из-за клиента
  • у каждого соединения свой SendLoop
  • он ждёт тоько свой сокет
  • остальные соединение не затрагиваются
  • медленныйМедленный клиент блокирует только свой SendLoop
  • массоваяМассовая рассылка не ждёт await SendAsync других соединений

Жизненный цикл соединения

ПодключениеПодключение:

  1. HTTP → WebSocket upgrade
  2. JWT Аутентификация пользователя
  3. Создание ConnectionContext
  4. Регистрация в ConnectionRegistry
  5. Запуск SendLoop

Возможная расширяемость

Добавить новый тип доставки:

    1. НоваяСоздать стратегияновую стратегию, реализующую IMessageDeliveryStrategy
    2. РегистрацияДобавить новый DeliveryType в enum
    3. Зарегистрировать стратегию в DI
    4. Обновить MessageRouter для маппинга типа на стратегию
    Добавить обработку входящих сообщений
    • Расширить WaitForCloseAsync для обработки текстовых сообщений
    • Добавить десериализацию и обработку команд от клиента
    Добавить поддержку UnitConnections
    • Расширить ConnectionContext полем UnitId
    • Добавить методы GetUnitConnections в ConnectionRegistry
    • Обновить стратегии для фильтрации по UnitId