Skip to main content

Архитектура передачи данных на веб-клиент в режиме реального времени

Цели архитектуры

Архитектура доставки сообщений должна:

  • поддерживать несколько соединений на одного пользователя (несколько вкладок)
  • обеспечивать разные модели доставки:
    • unicast (одному пользователю)
    • multicast (группам: users / units)
    • broadcast (всем)
  • эффективно работать при большом количестве сообщений
  • изолировать медленные клиенты (backpressure)
  • быть расширяемой без переписывания существующего кода

Общая схема компонентов

MessageSource (API, Jobs)
  ↓
MessageRouter
  ↓
DeliveryStrategy
  ↓
ConnectionRegistry
  ↓
ConnectionContext (Queue)
  ↓
SendLoop
  ↓
WebSocket

MessageEnvelop

Все сообщения передаются в едином формате-обёртке

public class MessageEnvelope
{
    public DeliveryType DeliveryType { get; init; }
    public string MessageType { get; init; } // e.g. UnitStatusDto

    public string? UserId { get; init; }
    public IEnumerable<string>? UserIds { get; init; }
    public IEnumerable<string>? UnitIds { get; init; }

    public ReadOnlyMemory<byte> Payload { get; init; }
}

public enum DeliveryType
{
    Unicast,
    Multicast,
    Broadcast
}

public interface IOutgoingMessage
{
    ReadOnlyMemory<byte> ToJsonUtf8();
}

// Example
[MemoryPackable]
public partial class UnitStatusDto : IOutgoingMessage
{
    public Guid UnitId { get; set; }
    public string Status { get; set; } = default!;
    public int Progress { get; set; }

    public ReadOnlyMemory<byte> ToPayloadUtf8()
    {
        var typeName = GetType().Name;

        // Обёртка
        var transportObject = new
        {
            type = typeName,
            data = this
        };

        return JsonSerializer.SerializeToUtf8Bytes(transportObject);
    }
}

var message = new UnitStatusDto
{
    UnitId = id,
    Status = "Working",
    Progress = 42
};

var envelope = new MessageEnvelope
{
    DeliveryType = DeliveryType.Multicast,
    UnitIds = new[] { unitId },
    Payload = message.ToJsonUtf8()
};

Доменные модели реализуют контракт сериализации. На уровне WebSocket-доставки используются только готовые байтовые представления сообщений, что полностью изолирует транспортный слой от формата данных.

MessageRouter

Ответственность заключается в выборе стратегии доставки.

Не знает ничего о реализации отправки, что способствует для последующией миграции на другие протоколы.

public class MessageRouter
{
    private readonly IDictionary<DeliveryType, IMessageDeliveryStrategy> _strategies;

    public Task RouteAsync(MessageEnvelope message)
        => _strategies[message.DeliveryType].SendAsync(message);
}

Delivery Strategy

Каждая модель доставки реализуется отдельной стратегией.

public interface IMessageDeliveryStrategy
{
    Task SendAsync(MessageEnvelope message);
}
Стратегия Назначение
UnicastStrategy Сообщение одному пользователю
MulticastStrategy Пользователям и/или units
BroadcastStrategy Всем подключениям

ConnectionRegistry

Единый источник информации о подключениях.

public interface IConnectionRegistry
{
    void AddConnection(ConnectionContext ctx);
    void RemoveConnection(ConnectionContext ctx);
    
    IEnumerable<ConnectionContext> GetUserConnections(Guid userId);
    IEnumerable<ConnectionContext> GetUsersConnections(IEnumerable<Guid> userIds);
    IEnumerable<ConnectionContext> GetUnitConnections(int unitId);
    IEnumerable<ConnectionContext> GetUnitsConnections(IEnumerable<int> unitIds);
    IEnumerable<ConnectionContext> GetAllConnections();
}

Один пользователь/юнит может иметь несколько активных соединений.

Жизненный цикл:

  • Подключение:
    • Создаётся ConnectionContext для WebSocket
    • Вызывается ConnectionRegistry.AddConnection(ctx)
    • Обновляются структуры _userConnections, _unitConnections, _allConnections
  • Отключение:
    • WebSocket закрыт или исключение в SendLoop
    • Вызывается ConnectionRegistry.RemoveConnection(ctx)
    • Удаляется из всех словарей и групп

ConnectionContext

Каждое WebSocket-соединение оборачивается в ConnectionContext. Он не знает что внутри сообщения (MemoryPack или JSON), т.к. он работает только с байтами. Сериализация на уровне формирования сообщений.

class ConnectionContext
{
    public Guid ConnectionId { get; }
    public Guid UserId { get; }
    public WebSocket Socket { get; }

    public Channel<ReadOnlyMemory<byte>> Outgoing { get; }
}

Назначение

  • изолировать медленные соединения
  • обеспечить backpressure
  • избежать блокировки при массовой рассылке

Очередь сообщений (backpressure)

Используется bounded channel:

Channel.CreateBounded<ReadOnlyMemory<byte>>(
    new BoundedChannelOptions(100)
    {
        FullMode = BoundedChannelFullMode.DropOldest
    }
);

Для того, чтобы очередь была ограниченной. Лучше потеря сообщения, чем лежащий сервер из-за нехватки памяти.

WebSocket сам по себе не гарантирует доставку. Для реализации такой логики нужно будет очень много думать( Одним из вариантов будет реализация через отдельные очереди сообщений:

  • critical (нельзя терять ни в коем случае), реализовывать через ack
  • actual (актуальные состояния (позиция юнита), если клиент пропустил 3 обновления - не страшно

Поведение:

  • если клиент не успевает читать → старые сообщения отбрасываются
  • сервер не растёт по памяти

SendLoop

Для каждого WebSocket-соединения создаётся независимый асинхронный цикл отправки (SendLoop), который читает сообщения из ограниченной очереди. Это обеспечивает изоляцию клиентов, предотвращает блокировку при массовой рассылке и защищает сервер от неконтролируемого роста памяти.

Также он не знает формат сообщений, не знает тип, не знает domain модель, только отправляет utf-8, что идеально по SRP.

async Task SendLoop(ConnectionContext ctx, CancellationToken ct)
{
    try
    {
        await foreach (var msg in ctx.Outgoing.Reader.ReadAllAsync(ct))
        {
            if (ctx.Socket.State != WebSocketState.Open)
                break;

            await ctx.Socket.SendAsync(
                msg,
                WebSocketMessageType.Text,
                true,
                ct
            );
        }
    }
    catch (OperationCanceledException)
    {
        // нормальный shutdown
    }
    catch (WebSocketException)
    {
        // клиент отвалился
    }
    finally
    {
        ctx.Outgoing.Writer.TryComplete();
        ctx.Socket.Abort();
    }
}
  • если сообщений нет → поток освобождён
  • loop “просыпается” только когда появляется сообщение

Благодаря await ctx.Socket.SendAsync(...)

  • Несмотря на то, что SendAsync может быть медленным из-за клиента
  • у каждого соединения свой SendLoop
  • он ждёт тоько свой сокет
  • остальные соединение не затрагиваются
  • медленный клиент блокирует только свой SendLoop
  • массовая рассылка не ждёт await SendAsync

Жизненный цикл соединения

Подключение

  1. HTTP → WebSocket upgrade
  2. Аутентификация пользователя
  3. Создание ConnectionContext
  4. Регистрация в ConnectionRegistry
  5. Запуск SendLoop

Возможная расширяемость

Добавить новый тип доставки:

  • Новая стратегия
  • Регистрация в DI