Skip to content

Паттерны конкурентности

Этот документ определяет паттерны обработки конкурентности для всех критических операций платформы IWM.


Обзор

Финансовые системы требуют явного контроля конкурентности для предотвращения:

  • Потерянные обновления — Две транзакции перезаписывают друг друга
  • Двойное списание — Одни и те же средства используются дважды
  • Состояния гонки — Несогласованное состояние из-за чередования операций
  • Взаимоблокировки — Транзакции ожидают друг друга бесконечно

Матрица критических операций

ОперацияТип блокировкиУровень изоляцииСтратегия повторов
Расчёт комиссийПессимистичная (строка)Serializable3x с экспоненциальным откатом
Обновление балансаПессимистичнаяRepeatableRead3x с экспоненциальным откатом
Запрос на выплатуПессимистичная + уникальный индексSerializableБез повторов (быстрый отказ)
Обработка выплатыПессимистичная + идемпотентностьSerializable3x с экспоненциальным откатом
Резервирование товараПессимистичнаяSerializable3x с экспоненциальным откатом
Вставка в дерево партнёровПессимистичная (спонсор)RepeatableReadБез повторов
Перемещение в дереве партнёровПессимистичная (поддерево)SerializableБез повторов
Изменение статуса заказаОптимистичная (версия)ReadCommitted3x немедленно
Пересчёт рангаПессимистичная (партнёр)RepeatableRead3x с экспоненциальным откатом

Стратегии блокировки

Пессимистичная блокировка (SELECT FOR UPDATE)

Использовать когда:

  • Ожидается высокая конкуренция
  • Операция не может быть безопасно повторена
  • Финансовые операции, где критична согласованность
sql
-- Блокировка конкретных строк перед изменением
SELECT * FROM mlm.partner_balances
WHERE partner_id = $1
FOR UPDATE;

-- Вариант NOWAIT: Немедленный отказ при блокировке (для быстрого отказа)
SELECT * FROM mlm.partner_balances
WHERE partner_id = $1
FOR UPDATE NOWAIT;

-- Вариант SKIP LOCKED: Обработка только незаблокированных строк (для batch jobs)
SELECT * FROM mlm.commission_transactions
WHERE status = 'PENDING'
FOR UPDATE SKIP LOCKED
LIMIT 100;

Оптимистичная блокировка (поле версии)

Использовать когда:

  • Ожидается низкая конкуренция
  • Повторы допустимы
  • Преобладают операции чтения
typescript
// Чтение с версией
const balance = await tx.partnerBalance.findUnique({
  where: { partnerId },
});

// Обновление с проверкой версии
const updated = await tx.partnerBalance.updateMany({
  where: {
    partnerId,
    version: balance.version, // Обновлять только если версия не изменилась
  },
  data: {
    availableBalance: { decrement: amount },
    version: { increment: 1 },
  },
});

if (updated.count === 0) {
  throw new OptimisticLockError("Balance was modified by another transaction");
}

Предотвращение взаимоблокировок

Правила порядка блокировки

Для предотвращения взаимоблокировок всегда захватывать блокировки в этом согласованном порядке:

  1. core.users (по ID, по возрастанию)
  2. mlm.partners (по ID, по возрастанию)
  3. mlm.partner_balances (по partner_id, по возрастанию)
  4. mlm.commission_transactions (по ID, по возрастанию)
  5. mlm.payout_requests (по ID, по возрастанию)
  6. product.products (по ID, по возрастанию)
  7. product.inventory_reservations (по product_id, затем ID)
  8. product.orders (по ID, по возрастанию)
  9. investment.participations (по ID, по возрастанию)

Паттерн реализации

typescript
// ПРАВИЛЬНО: Сортировка ID перед блокировкой для обеспечения согласованного порядка
async function lockMultiplePartnerBalances(
  tx: PrismaTransaction,
  partnerIds: string[],
): Promise<void> {
  const sortedIds = [...partnerIds].sort();

  await tx.$queryRaw`
    SELECT id FROM mlm.partner_balances
    WHERE partner_id = ANY(${sortedIds}::uuid[])
    ORDER BY partner_id
    FOR UPDATE
  `;
}

// НЕПРАВИЛЬНО: Случайный порядок может вызвать взаимоблокировки
async function lockMultiplePartnerBalancesWrong(
  tx: PrismaTransaction,
  partnerIds: string[],
): Promise<void> {
  // НЕ ДЕЛАТЬ ТАК - порядок непредсказуем
  for (const partnerId of partnerIds) {
    await tx.$queryRaw`
      SELECT id FROM mlm.partner_balances
      WHERE partner_id = ${partnerId}::uuid
      FOR UPDATE
    `;
  }
}

Обнаружение и мониторинг взаимоблокировок

sql
-- Создание таблицы лога взаимоблокировок
CREATE TABLE core.deadlock_log (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    detected_at TIMESTAMP NOT NULL DEFAULT NOW(),
    blocked_pid INT NOT NULL,
    blocking_pid INT NOT NULL,
    blocked_query TEXT,
    blocking_query TEXT,
    wait_duration_ms INT
);

-- Функция для логирования взаимоблокировок (вызывать из приложения при ошибке)
CREATE OR REPLACE FUNCTION core.log_deadlock(
    p_blocked_pid INT,
    p_blocking_pid INT,
    p_blocked_query TEXT,
    p_blocking_query TEXT,
    p_wait_duration_ms INT
) RETURNS VOID AS $$
BEGIN
    INSERT INTO core.deadlock_log (
        blocked_pid, blocking_pid, blocked_query, blocking_query, wait_duration_ms
    ) VALUES (
        p_blocked_pid, p_blocking_pid, p_blocked_query, p_blocking_query, p_wait_duration_ms
    );
END;
$$ LANGUAGE plpgsql;

Паттерны идемпотентности

Структура ключа идемпотентности

ОперацияФормат ключаTTLХранилище
Расчёт комиссийcomm:{sourceType}:{sourceId}30 днейБаза данных
Комиссия по партнёруcomm:{sourceType}:{sourceId}:{partnerId}:{level}30 днейБаза данных
Обработка выплатыpayout:{payoutRequestId}:{processor}7 днейБаза данных
Обработка webhookwebhook:{provider}:{eventId}7 днейRedis/База данных
Создание заказаorder:{userId}:{cartChecksum}1 часБаза данных
Участие в инвестицииinvest:{userId}:{strategyId}:{amount}:{timestamp_hour}1 часБаза данных

Паттерн проверки идемпотентности

typescript
async function processWithIdempotency<T>(
  tx: PrismaTransaction,
  idempotencyKey: string,
  ttlDays: number,
  operation: () => Promise<T>,
): Promise<{ result: T; wasProcessed: boolean }> {
  // 1. Проверка, было ли уже обработано
  const existing = await tx.idempotencyKey.findUnique({
    where: { key: idempotencyKey },
  });

  if (existing) {
    return {
      result: existing.response as T,
      wasProcessed: true,
    };
  }

  // 2. Выполнение операции
  const result = await operation();

  // 3. Сохранение ключа идемпотентности
  await tx.idempotencyKey.create({
    data: {
      key: idempotencyKey,
      response: result as any,
      requestHash: createHash("sha256").update(idempotencyKey).digest("hex"),
      expiresAt: addDays(new Date(), ttlDays),
    },
  });

  return { result, wasProcessed: false };
}

Паттерн двойной проверки (внутри транзакции)

typescript
async function safeProcessCommission(job: CommissionJobPayload) {
  const { idempotencyKey } = job;

  // Быстрый путь: Проверка вне транзакции
  const existing = await db.idempotencyKey.findUnique({
    where: { key: idempotencyKey },
  });
  if (existing) return existing.response;

  // Медленный путь: Обработка с транзакцией
  return await db.$transaction(
    async (tx) => {
      // КРИТИЧНО: Повторная проверка внутри транзакции (другой процесс мог завершить)
      const existingInTx = await tx.idempotencyKey.findUnique({
        where: { key: idempotencyKey },
      });
      if (existingInTx) return existingInTx.response;

      // Обработка...
      const result = await processCommissionLogic(tx, job);

      // Сохранение ключа идемпотентности
      await tx.idempotencyKey.create({
        data: {
          key: idempotencyKey,
          response: result,
          expiresAt: addDays(new Date(), 30),
        },
      });

      return result;
    },
    {
      isolationLevel: "Serializable",
    },
  );
}

Стратегии повторов

Конфигурация

typescript
interface RetryConfig {
  maxAttempts: number;
  backoffMs: number[];
  retryableErrors: string[];
}

const RETRY_CONFIGS: Record<string, RetryConfig> = {
  commission: {
    maxAttempts: 3,
    backoffMs: [100, 500, 2000],
    retryableErrors: [
      "P2034", // Конфликт транзакции Prisma
      "40001", // Ошибка сериализации PostgreSQL
      "40P01", // Обнаружена взаимоблокировка PostgreSQL
    ],
  },
  balance: {
    maxAttempts: 3,
    backoffMs: [50, 200, 1000],
    retryableErrors: ["P2034", "40001", "40P01"],
  },
  payout: {
    maxAttempts: 1, // Без повторов - пользователь должен повторить вручную
    backoffMs: [],
    retryableErrors: [],
  },
  inventory: {
    maxAttempts: 3,
    backoffMs: [50, 100, 500],
    retryableErrors: ["P2034", "40001", "40P01"],
  },
};

Реализация повторов

typescript
async function withRetry<T>(
  operation: () => Promise<T>,
  config: RetryConfig,
  context: string,
): Promise<T> {
  let lastError: Error | null = null;

  for (let attempt = 1; attempt <= config.maxAttempts; attempt++) {
    try {
      return await operation();
    } catch (error) {
      lastError = error as Error;

      const errorCode = extractErrorCode(error);
      const isRetryable = config.retryableErrors.includes(errorCode);

      if (!isRetryable || attempt === config.maxAttempts) {
        logger.error(
          `${context}: Non-retryable error or max attempts reached`,
          {
            attempt,
            errorCode,
            error: lastError.message,
          },
        );
        throw lastError;
      }

      const backoffMs =
        config.backoffMs[attempt - 1] ||
        config.backoffMs[config.backoffMs.length - 1];

      logger.warn(`${context}: Retryable error, backing off`, {
        attempt,
        errorCode,
        backoffMs,
        nextAttempt: attempt + 1,
      });

      await sleep(backoffMs);
    }
  }

  throw lastError;
}

function extractErrorCode(error: unknown): string {
  if (error instanceof Prisma.PrismaClientKnownRequestError) {
    return error.code;
  }
  if (error instanceof Error && "code" in error) {
    return (error as any).code;
  }
  return "UNKNOWN";
}

function sleep(ms: number): Promise<void> {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

Паттерны для конкретных операций

1. Расчёт комиссий

См. Движок комиссий для полной реализации.

Ключевые моменты:

  • Сначала блокировать исходную запись (заказ/инвестиция)
  • Блокировать все затронутые балансы партнёров в отсортированном порядке
  • Использовать ключ идемпотентности для каждой транзакции комиссии
  • Обрабатывать с уровнем изоляции Serializable

2. Операции с балансом

См. MLM Schema для SQL функций.

Ключевые моменты:

  • Всегда блокировать строку баланса перед изменением
  • Использовать поле версии для оптимистичной блокировки где уместно
  • Проверять достаточность баланса внутри транзакции
  • Создавать аудит-трейл для каждого изменения баланса

3. Запросы на выплату

typescript
async function createPayoutRequest(
  partnerId: string,
  amount: number,
  payoutDetails: PayoutDetails,
): Promise<PayoutRequest> {
  return await db.$transaction(
    async (tx) => {
      // 1. Блокировка баланса партнёра
      const balances = await tx.$queryRaw<PartnerBalance[]>`
      SELECT * FROM mlm.partner_balances
      WHERE partner_id = ${partnerId}::uuid
      FOR UPDATE
    `;

      const balance = balances[0];
      if (!balance) {
        throw new PartnerNotFoundError(partnerId);
      }

      if (balance.available_balance < amount) {
        throw new InsufficientBalanceError(
          Number(balance.available_balance),
          amount,
        );
      }

      // 2. Проверка существующей ожидающей выплаты
      // (Также защищено уникальным частичным индексом на таблице)
      const existingPending = await tx.payoutRequest.findFirst({
        where: {
          partnerId,
          status: { in: ["PENDING", "APPROVED", "PROCESSING"] },
        },
      });

      if (existingPending) {
        throw new PayoutAlreadyPendingError(existingPending.id);
      }

      // 3. Списание с доступного баланса
      await tx.partnerBalance.update({
        where: { partnerId },
        data: {
          availableBalance: { decrement: amount },
          version: { increment: 1 },
          updatedAt: new Date(),
        },
      });

      // 4. Создание запроса на выплату
      const payout = await tx.payoutRequest.create({
        data: {
          partnerId,
          amount,
          currency: "USD",
          payoutMethodType: payoutDetails.type,
          payoutDetails: payoutDetails as any,
          status: "PENDING",
        },
      });

      // 5. Создание записи аудит-лога
      await tx.financialAuditLog.create({
        data: {
          eventType: "PAYOUT_REQUESTED",
          partnerId,
          amount: -amount,
          currency: "USD",
          balanceBefore: Number(balance.available_balance),
          balanceAfter: Number(balance.available_balance) - amount,
          sourceType: "PAYOUT_REQUEST",
          sourceId: payout.id,
          checksum: generateChecksum(payout),
        },
      });

      return payout;
    },
    {
      isolationLevel: "Serializable",
      timeout: 30000,
    },
  );
}

4. Резервирование товаров

typescript
async function reserveInventory(
  productId: string,
  cartId: string,
  quantity: number,
  expiresMinutes: number = 15,
): Promise<{
  success: boolean;
  reservationId?: string;
  availableQuantity: number;
}> {
  return await db.$transaction(
    async (tx) => {
      // 1. Блокировка строки товара
      const products = await tx.$queryRaw<Product[]>`
      SELECT * FROM product.products
      WHERE id = ${productId}::uuid
      FOR UPDATE
    `;

      const product = products[0];
      if (!product) {
        throw new ProductNotFoundError(productId);
      }

      // 2. Расчёт доступного количества (блокировка резервирований тоже)
      const reservedResult = await tx.$queryRaw<{ total: bigint }[]>`
      SELECT COALESCE(SUM(quantity), 0) as total
      FROM product.inventory_reservations
      WHERE product_id = ${productId}::uuid
        AND status = 'RESERVED'
        AND expires_at > NOW()
      FOR UPDATE
    `;

      const reserved = Number(reservedResult[0]?.total || 0);
      const available = product.stock_quantity - reserved;

      if (available < quantity) {
        return { success: false, availableQuantity: available };
      }

      // 3. Создание резервирования
      const reservation = await tx.inventoryReservation.create({
        data: {
          productId,
          cartId,
          quantity,
          status: "RESERVED",
          expiresAt: addMinutes(new Date(), expiresMinutes),
        },
      });

      return {
        success: true,
        reservationId: reservation.id,
        availableQuantity: available - quantity,
      };
    },
    {
      isolationLevel: "Serializable",
    },
  );
}

5. Вставка в дерево партнёров

typescript
async function registerPartner(
  userId: string,
  sponsorId: string | null,
  referralCode: string,
): Promise<Partner> {
  return await db.$transaction(
    async (tx) => {
      // 1. Блокировка спонсора если существует (предотвращает конкурентные изменения дерева)
      if (sponsorId) {
        const sponsors = await tx.$queryRaw<Partner[]>`
        SELECT * FROM mlm.partners
        WHERE id = ${sponsorId}::uuid
        FOR UPDATE
      `;

        const sponsor = sponsors[0];
        if (!sponsor) {
          throw new SponsorNotFoundError(sponsorId);
        }

        if (sponsor.status !== "ACTIVE") {
          throw new SponsorNotActiveError(sponsorId);
        }
      }

      // 2. Создание партнёра (триггер обработает пути дерева)
      const partner = await tx.partner.create({
        data: {
          userId,
          sponsorId,
          referralCode,
          status: "ACTIVE",
          joinedAt: new Date(),
        },
      });

      // 3. Создание начальной записи баланса
      await tx.partnerBalance.create({
        data: {
          partnerId: partner.id,
          availableBalance: 0,
          pendingBalance: 0,
          totalEarned: 0,
          totalWithdrawn: 0,
          careerPointsTotal: 0,
          careerPointsPeriod: 0,
          currency: "USD",
          version: 1,
        },
      });

      return partner;
    },
    {
      isolationLevel: "RepeatableRead",
    },
  );
}

Уровни изоляции транзакций

Когда использовать каждый уровень

УровеньСлучай использованияКомпромисс
ReadCommittedПреобладание чтения, низкие требования к согласованностиЛучшая производительность, допускает неповторяемое чтение
RepeatableReadЧтение балансов, запросы дереваХорошая производительность, предотвращает неповторяемое чтение
SerializableФинансовые записи, расчёт комиссийНизшая производительность, полная согласованность

Конфигурация транзакций Prisma

typescript
// Serializable для критических финансовых операций
await db.$transaction(
  async (tx) => {
    // ...
  },
  {
    isolationLevel: "Serializable",
    timeout: 30000, // 30 секунд
  },
);

// RepeatableRead для операций с деревом
await db.$transaction(
  async (tx) => {
    // ...
  },
  {
    isolationLevel: "RepeatableRead",
    timeout: 15000, // 15 секунд
  },
);

// По умолчанию (ReadCommitted) для простых операций
await db.$transaction(async (tx) => {
  // ...
});

Обработка ошибок

Ошибки, специфичные для конкурентности

typescript
// Доменные ошибки для проблем конкурентности
export class OptimisticLockError extends BusinessRuleError {
  constructor(entity: string) {
    super(
      "OPTIMISTIC_LOCK_CONFLICT",
      `${entity} was modified by another process. Please retry.`,
      { entity },
    );
  }
}

export class DeadlockDetectedError extends BusinessRuleError {
  constructor(operation: string) {
    super(
      "DEADLOCK_DETECTED",
      `Deadlock detected during ${operation}. Please retry.`,
      { operation },
    );
  }
}

export class TransactionTimeoutError extends BusinessRuleError {
  constructor(operation: string, timeoutMs: number) {
    super(
      "TRANSACTION_TIMEOUT",
      `Transaction timed out after ${timeoutMs}ms during ${operation}`,
      { operation, timeoutMs },
    );
  }
}

Маппинг кодов ошибок

typescript
function mapDatabaseError(error: unknown, context: string): Error {
  if (error instanceof Prisma.PrismaClientKnownRequestError) {
    switch (error.code) {
      case "P2034":
        return new OptimisticLockError(context);
      case "P2028":
        return new TransactionTimeoutError(context, 30000);
    }
  }

  if (error instanceof Error && "code" in error) {
    const pgError = error as any;
    switch (pgError.code) {
      case "40001": // serialization_failure
        return new OptimisticLockError(context);
      case "40P01": // deadlock_detected
        return new DeadlockDetectedError(context);
      case "55P03": // lock_not_available (NOWAIT)
        return new ResourceLockedError(context);
    }
  }

  return error as Error;
}

Мониторинг и оповещения

Ключевые метрики

МетрикаИсточникПорог оповещения
Количество взаимоблокировокcore.deadlock_log> 5 в час
Частота повторов транзакцийЛоги приложения> 10% операций
Время ожидания блокировки (p95)pg_stat_activity> 1 секунда
Ошибки сериализацииЛоги приложения> 20 в минуту
Блокировки двойных выплатЛоги приложенияЛюбое событие

Запрос мониторинга блокировок PostgreSQL

sql
-- Активные блокировки и ожидающие запросы
SELECT
    blocked.pid AS blocked_pid,
    blocked.usename AS blocked_user,
    blocked.query AS blocked_query,
    blocked.wait_event_type,
    blocking.pid AS blocking_pid,
    blocking.usename AS blocking_user,
    blocking.query AS blocking_query,
    NOW() - blocked.query_start AS wait_duration
FROM pg_stat_activity blocked
JOIN pg_locks blocked_locks ON blocked.pid = blocked_locks.pid
JOIN pg_locks blocking_locks
    ON blocked_locks.locktype = blocking_locks.locktype
    AND blocked_locks.relation = blocking_locks.relation
    AND blocked_locks.pid != blocking_locks.pid
JOIN pg_stat_activity blocking ON blocking_locks.pid = blocking.pid
WHERE NOT blocked_locks.granted
ORDER BY wait_duration DESC;

Связанные документы