Skip to content

Паттерны конкурентности

Этот документ определяет паттерны обработки конкурентности для всех критических операций платформы IWM.


Обзор

Финансовые системы требуют явного контроля конкурентности для предотвращения:

  • Потерянные обновления — Две транзакции перезаписывают друг друга
  • Двойное списание — Одни и те же средства используются дважды
  • Состояния гонки — Несогласованное состояние из-за чередования операций
  • Взаимоблокировки — Транзакции ожидают друг друга бесконечно

Матрица критических операций

ОперацияТип блокировкиУровень изоляцииСтратегия повторов
Расчёт комиссийПессимистичная (строка)Serializable3x с экспоненциальным откатом
Обновление балансаПессимистичнаяRepeatableRead3x с экспоненциальным откатом
Запрос на выплатуПессимистичная + уникальный индексSerializableБез повторов (быстрый отказ)
Обработка выплатыПессимистичная + идемпотентностьSerializable3x с экспоненциальным откатом
Резервирование товараПессимистичнаяSerializable3x с экспоненциальным откатом
Вставка в дерево партнёровПессимистичная (спонсор)RepeatableReadБез повторов
Перемещение в дереве партнёровПессимистичная (поддерево)SerializableБез повторов
Изменение статуса заказаОптимистичная (версия)ReadCommitted3x немедленно
Пересчёт рангаПессимистичная (партнёр)RepeatableRead3x с экспоненциальным откатом

Стратегии блокировки

Пессимистичная блокировка (SELECT FOR UPDATE)

Использовать когда:

  • Ожидается высокая конкуренция
  • Операция не может быть безопасно повторена
  • Финансовые операции, где критична согласованность
sql
-- Блокировка конкретных строк перед изменением
SELECT * FROM mlm.partner_balances
WHERE partner_id = $1
FOR UPDATE;

-- Вариант NOWAIT: Немедленный отказ при блокировке (для быстрого отказа)
SELECT * FROM mlm.partner_balances
WHERE partner_id = $1
FOR UPDATE NOWAIT;

-- Вариант SKIP LOCKED: Обработка только незаблокированных строк (для batch jobs)
SELECT * FROM mlm.commission_transactions
WHERE status = 'PENDING'
FOR UPDATE SKIP LOCKED
LIMIT 100;

Оптимистичная блокировка (поле версии)

Использовать когда:

  • Ожидается низкая конкуренция
  • Повторы допустимы
  • Преобладают операции чтения
typescript
// Чтение с версией
const balance = await tx.partnerBalance.findUnique({
  where: { partnerId }
});

// Обновление с проверкой версии
const updated = await tx.partnerBalance.updateMany({
  where: {
    partnerId,
    version: balance.version  // Обновлять только если версия не изменилась
  },
  data: {
    availableBalance: { decrement: amount },
    version: { increment: 1 }
  }
});

if (updated.count === 0) {
  throw new OptimisticLockError('Balance was modified by another transaction');
}

Предотвращение взаимоблокировок

Правила порядка блокировки

Для предотвращения взаимоблокировок всегда захватывать блокировки в этом согласованном порядке:

  1. core.users (по ID, по возрастанию)
  2. mlm.partners (по ID, по возрастанию)
  3. mlm.partner_balances (по partner_id, по возрастанию)
  4. mlm.commission_transactions (по ID, по возрастанию)
  5. mlm.payout_requests (по ID, по возрастанию)
  6. product.products (по ID, по возрастанию)
  7. product.inventory_reservations (по product_id, затем ID)
  8. product.orders (по ID, по возрастанию)
  9. investment.participations (по ID, по возрастанию)

Паттерн реализации

typescript
// ПРАВИЛЬНО: Сортировка ID перед блокировкой для обеспечения согласованного порядка
async function lockMultiplePartnerBalances(
  tx: PrismaTransaction,
  partnerIds: string[]
): Promise<void> {
  const sortedIds = [...partnerIds].sort();

  await tx.$queryRaw`
    SELECT id FROM mlm.partner_balances
    WHERE partner_id = ANY(${sortedIds}::uuid[])
    ORDER BY partner_id
    FOR UPDATE
  `;
}

// НЕПРАВИЛЬНО: Случайный порядок может вызвать взаимоблокировки
async function lockMultiplePartnerBalancesWrong(
  tx: PrismaTransaction,
  partnerIds: string[]
): Promise<void> {
  // НЕ ДЕЛАТЬ ТАК - порядок непредсказуем
  for (const partnerId of partnerIds) {
    await tx.$queryRaw`
      SELECT id FROM mlm.partner_balances
      WHERE partner_id = ${partnerId}::uuid
      FOR UPDATE
    `;
  }
}

Обнаружение и мониторинг взаимоблокировок

sql
-- Создание таблицы лога взаимоблокировок
CREATE TABLE core.deadlock_log (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    detected_at TIMESTAMP NOT NULL DEFAULT NOW(),
    blocked_pid INT NOT NULL,
    blocking_pid INT NOT NULL,
    blocked_query TEXT,
    blocking_query TEXT,
    wait_duration_ms INT
);

-- Функция для логирования взаимоблокировок (вызывать из приложения при ошибке)
CREATE OR REPLACE FUNCTION core.log_deadlock(
    p_blocked_pid INT,
    p_blocking_pid INT,
    p_blocked_query TEXT,
    p_blocking_query TEXT,
    p_wait_duration_ms INT
) RETURNS VOID AS $$
BEGIN
    INSERT INTO core.deadlock_log (
        blocked_pid, blocking_pid, blocked_query, blocking_query, wait_duration_ms
    ) VALUES (
        p_blocked_pid, p_blocking_pid, p_blocked_query, p_blocking_query, p_wait_duration_ms
    );
END;
$$ LANGUAGE plpgsql;

Паттерны идемпотентности

Структура ключа идемпотентности

ОперацияФормат ключаTTLХранилище
Расчёт комиссийcomm:{sourceType}:{sourceId}30 днейБаза данных
Комиссия по партнёруcomm:{sourceType}:{sourceId}:{partnerId}:{level}30 днейБаза данных
Обработка выплатыpayout:{payoutRequestId}:{processor}7 днейБаза данных
Обработка webhookwebhook:{provider}:{eventId}7 днейRedis/База данных
Создание заказаorder:{userId}:{cartChecksum}1 часБаза данных
Участие в инвестицииinvest:{userId}:{strategyId}:{amount}:{timestamp_hour}1 часБаза данных

Паттерн проверки идемпотентности

typescript
async function processWithIdempotency<T>(
  tx: PrismaTransaction,
  idempotencyKey: string,
  ttlDays: number,
  operation: () => Promise<T>
): Promise<{ result: T; wasProcessed: boolean }> {
  // 1. Проверка, было ли уже обработано
  const existing = await tx.idempotencyKey.findUnique({
    where: { key: idempotencyKey }
  });

  if (existing) {
    return {
      result: existing.response as T,
      wasProcessed: true
    };
  }

  // 2. Выполнение операции
  const result = await operation();

  // 3. Сохранение ключа идемпотентности
  await tx.idempotencyKey.create({
    data: {
      key: idempotencyKey,
      response: result as any,
      requestHash: createHash('sha256').update(idempotencyKey).digest('hex'),
      expiresAt: addDays(new Date(), ttlDays)
    }
  });

  return { result, wasProcessed: false };
}

Паттерн двойной проверки (внутри транзакции)

typescript
async function safeProcessCommission(job: CommissionJobPayload) {
  const { idempotencyKey } = job;

  // Быстрый путь: Проверка вне транзакции
  const existing = await db.idempotencyKey.findUnique({
    where: { key: idempotencyKey }
  });
  if (existing) return existing.response;

  // Медленный путь: Обработка с транзакцией
  return await db.$transaction(async (tx) => {
    // КРИТИЧНО: Повторная проверка внутри транзакции (другой процесс мог завершить)
    const existingInTx = await tx.idempotencyKey.findUnique({
      where: { key: idempotencyKey }
    });
    if (existingInTx) return existingInTx.response;

    // Обработка...
    const result = await processCommissionLogic(tx, job);

    // Сохранение ключа идемпотентности
    await tx.idempotencyKey.create({
      data: {
        key: idempotencyKey,
        response: result,
        expiresAt: addDays(new Date(), 30)
      }
    });

    return result;
  }, {
    isolationLevel: 'Serializable'
  });
}

Стратегии повторов

Конфигурация

typescript
interface RetryConfig {
  maxAttempts: number;
  backoffMs: number[];
  retryableErrors: string[];
}

const RETRY_CONFIGS: Record<string, RetryConfig> = {
  commission: {
    maxAttempts: 3,
    backoffMs: [100, 500, 2000],
    retryableErrors: [
      'P2034',  // Конфликт транзакции Prisma
      '40001',  // Ошибка сериализации PostgreSQL
      '40P01',  // Обнаружена взаимоблокировка PostgreSQL
    ]
  },
  balance: {
    maxAttempts: 3,
    backoffMs: [50, 200, 1000],
    retryableErrors: ['P2034', '40001', '40P01']
  },
  payout: {
    maxAttempts: 1,  // Без повторов - пользователь должен повторить вручную
    backoffMs: [],
    retryableErrors: []
  },
  inventory: {
    maxAttempts: 3,
    backoffMs: [50, 100, 500],
    retryableErrors: ['P2034', '40001', '40P01']
  }
};

Реализация повторов

typescript
async function withRetry<T>(
  operation: () => Promise<T>,
  config: RetryConfig,
  context: string
): Promise<T> {
  let lastError: Error | null = null;

  for (let attempt = 1; attempt <= config.maxAttempts; attempt++) {
    try {
      return await operation();
    } catch (error) {
      lastError = error as Error;

      const errorCode = extractErrorCode(error);
      const isRetryable = config.retryableErrors.includes(errorCode);

      if (!isRetryable || attempt === config.maxAttempts) {
        logger.error(`${context}: Non-retryable error or max attempts reached`, {
          attempt,
          errorCode,
          error: lastError.message
        });
        throw lastError;
      }

      const backoffMs = config.backoffMs[attempt - 1] || config.backoffMs[config.backoffMs.length - 1];

      logger.warn(`${context}: Retryable error, backing off`, {
        attempt,
        errorCode,
        backoffMs,
        nextAttempt: attempt + 1
      });

      await sleep(backoffMs);
    }
  }

  throw lastError;
}

function extractErrorCode(error: unknown): string {
  if (error instanceof Prisma.PrismaClientKnownRequestError) {
    return error.code;
  }
  if (error instanceof Error && 'code' in error) {
    return (error as any).code;
  }
  return 'UNKNOWN';
}

function sleep(ms: number): Promise<void> {
  return new Promise(resolve => setTimeout(resolve, ms));
}

Паттерны для конкретных операций

1. Расчёт комиссий

См. Движок комиссий для полной реализации.

Ключевые моменты:

  • Сначала блокировать исходную запись (заказ/инвестиция)
  • Блокировать все затронутые балансы партнёров в отсортированном порядке
  • Использовать ключ идемпотентности для каждой транзакции комиссии
  • Обрабатывать с уровнем изоляции Serializable

2. Операции с балансом

См. MLM Schema для SQL функций.

Ключевые моменты:

  • Всегда блокировать строку баланса перед изменением
  • Использовать поле версии для оптимистичной блокировки где уместно
  • Проверять достаточность баланса внутри транзакции
  • Создавать аудит-трейл для каждого изменения баланса

3. Запросы на выплату

typescript
async function createPayoutRequest(
  partnerId: string,
  amount: number,
  payoutDetails: PayoutDetails
): Promise<PayoutRequest> {
  return await db.$transaction(async (tx) => {
    // 1. Блокировка баланса партнёра
    const balances = await tx.$queryRaw<PartnerBalance[]>`
      SELECT * FROM mlm.partner_balances
      WHERE partner_id = ${partnerId}::uuid
      FOR UPDATE
    `;

    const balance = balances[0];
    if (!balance) {
      throw new PartnerNotFoundError(partnerId);
    }

    if (balance.available_balance < amount) {
      throw new InsufficientBalanceError(
        Number(balance.available_balance),
        amount
      );
    }

    // 2. Проверка существующей ожидающей выплаты
    // (Также защищено уникальным частичным индексом на таблице)
    const existingPending = await tx.payoutRequest.findFirst({
      where: {
        partnerId,
        status: { in: ['PENDING', 'APPROVED', 'PROCESSING'] }
      }
    });

    if (existingPending) {
      throw new PayoutAlreadyPendingError(existingPending.id);
    }

    // 3. Списание с доступного баланса
    await tx.partnerBalance.update({
      where: { partnerId },
      data: {
        availableBalance: { decrement: amount },
        version: { increment: 1 },
        updatedAt: new Date()
      }
    });

    // 4. Создание запроса на выплату
    const payout = await tx.payoutRequest.create({
      data: {
        partnerId,
        amount,
        currency: 'RUB',
        payoutMethodType: payoutDetails.type,
        payoutDetails: payoutDetails as any,
        status: 'PENDING'
      }
    });

    // 5. Создание записи аудит-лога
    await tx.financialAuditLog.create({
      data: {
        eventType: 'PAYOUT_REQUESTED',
        partnerId,
        amount: -amount,
        currency: 'RUB',
        balanceBefore: Number(balance.available_balance),
        balanceAfter: Number(balance.available_balance) - amount,
        sourceType: 'PAYOUT_REQUEST',
        sourceId: payout.id,
        checksum: generateChecksum(payout)
      }
    });

    return payout;
  }, {
    isolationLevel: 'Serializable',
    timeout: 30000
  });
}

4. Резервирование товаров

typescript
async function reserveInventory(
  productId: string,
  cartId: string,
  quantity: number,
  expiresMinutes: number = 15
): Promise<{ success: boolean; reservationId?: string; availableQuantity: number }> {
  return await db.$transaction(async (tx) => {
    // 1. Блокировка строки товара
    const products = await tx.$queryRaw<Product[]>`
      SELECT * FROM product.products
      WHERE id = ${productId}::uuid
      FOR UPDATE
    `;

    const product = products[0];
    if (!product) {
      throw new ProductNotFoundError(productId);
    }

    // 2. Расчёт доступного количества (блокировка резервирований тоже)
    const reservedResult = await tx.$queryRaw<{ total: bigint }[]>`
      SELECT COALESCE(SUM(quantity), 0) as total
      FROM product.inventory_reservations
      WHERE product_id = ${productId}::uuid
        AND status = 'RESERVED'
        AND expires_at > NOW()
      FOR UPDATE
    `;

    const reserved = Number(reservedResult[0]?.total || 0);
    const available = product.stock_quantity - reserved;

    if (available < quantity) {
      return { success: false, availableQuantity: available };
    }

    // 3. Создание резервирования
    const reservation = await tx.inventoryReservation.create({
      data: {
        productId,
        cartId,
        quantity,
        status: 'RESERVED',
        expiresAt: addMinutes(new Date(), expiresMinutes)
      }
    });

    return {
      success: true,
      reservationId: reservation.id,
      availableQuantity: available - quantity
    };
  }, {
    isolationLevel: 'Serializable'
  });
}

5. Вставка в дерево партнёров

typescript
async function registerPartner(
  userId: string,
  sponsorId: string | null,
  referralCode: string
): Promise<Partner> {
  return await db.$transaction(async (tx) => {
    // 1. Блокировка спонсора если существует (предотвращает конкурентные изменения дерева)
    if (sponsorId) {
      const sponsors = await tx.$queryRaw<Partner[]>`
        SELECT * FROM mlm.partners
        WHERE id = ${sponsorId}::uuid
        FOR UPDATE
      `;

      const sponsor = sponsors[0];
      if (!sponsor) {
        throw new SponsorNotFoundError(sponsorId);
      }

      if (sponsor.status !== 'ACTIVE') {
        throw new SponsorNotActiveError(sponsorId);
      }
    }

    // 2. Создание партнёра (триггер обработает пути дерева)
    const partner = await tx.partner.create({
      data: {
        userId,
        sponsorId,
        referralCode,
        status: 'ACTIVE',
        joinedAt: new Date()
      }
    });

    // 3. Создание начальной записи баланса
    await tx.partnerBalance.create({
      data: {
        partnerId: partner.id,
        availableBalance: 0,
        pendingBalance: 0,
        totalEarned: 0,
        totalWithdrawn: 0,
        careerPointsTotal: 0,
        careerPointsPeriod: 0,
        currency: 'RUB',
        version: 1
      }
    });

    return partner;
  }, {
    isolationLevel: 'RepeatableRead'
  });
}

Уровни изоляции транзакций

Когда использовать каждый уровень

УровеньСлучай использованияКомпромисс
ReadCommittedПреобладание чтения, низкие требования к согласованностиЛучшая производительность, допускает неповторяемое чтение
RepeatableReadЧтение балансов, запросы дереваХорошая производительность, предотвращает неповторяемое чтение
SerializableФинансовые записи, расчёт комиссийНизшая производительность, полная согласованность

Конфигурация транзакций Prisma

typescript
// Serializable для критических финансовых операций
await db.$transaction(async (tx) => {
  // ...
}, {
  isolationLevel: 'Serializable',
  timeout: 30000  // 30 секунд
});

// RepeatableRead для операций с деревом
await db.$transaction(async (tx) => {
  // ...
}, {
  isolationLevel: 'RepeatableRead',
  timeout: 15000  // 15 секунд
});

// По умолчанию (ReadCommitted) для простых операций
await db.$transaction(async (tx) => {
  // ...
});

Обработка ошибок

Ошибки, специфичные для конкурентности

typescript
// Доменные ошибки для проблем конкурентности
export class OptimisticLockError extends BusinessRuleError {
  constructor(entity: string) {
    super(
      'OPTIMISTIC_LOCK_CONFLICT',
      `${entity} was modified by another process. Please retry.`,
      { entity }
    );
  }
}

export class DeadlockDetectedError extends BusinessRuleError {
  constructor(operation: string) {
    super(
      'DEADLOCK_DETECTED',
      `Deadlock detected during ${operation}. Please retry.`,
      { operation }
    );
  }
}

export class TransactionTimeoutError extends BusinessRuleError {
  constructor(operation: string, timeoutMs: number) {
    super(
      'TRANSACTION_TIMEOUT',
      `Transaction timed out after ${timeoutMs}ms during ${operation}`,
      { operation, timeoutMs }
    );
  }
}

Маппинг кодов ошибок

typescript
function mapDatabaseError(error: unknown, context: string): Error {
  if (error instanceof Prisma.PrismaClientKnownRequestError) {
    switch (error.code) {
      case 'P2034':
        return new OptimisticLockError(context);
      case 'P2028':
        return new TransactionTimeoutError(context, 30000);
    }
  }

  if (error instanceof Error && 'code' in error) {
    const pgError = error as any;
    switch (pgError.code) {
      case '40001':  // serialization_failure
        return new OptimisticLockError(context);
      case '40P01':  // deadlock_detected
        return new DeadlockDetectedError(context);
      case '55P03':  // lock_not_available (NOWAIT)
        return new ResourceLockedError(context);
    }
  }

  return error as Error;
}

Мониторинг и оповещения

Ключевые метрики

МетрикаИсточникПорог оповещения
Количество взаимоблокировокcore.deadlock_log> 5 в час
Частота повторов транзакцийЛоги приложения> 10% операций
Время ожидания блокировки (p95)pg_stat_activity> 1 секунда
Ошибки сериализацииЛоги приложения> 20 в минуту
Блокировки двойных выплатЛоги приложенияЛюбое событие

Запрос мониторинга блокировок PostgreSQL

sql
-- Активные блокировки и ожидающие запросы
SELECT
    blocked.pid AS blocked_pid,
    blocked.usename AS blocked_user,
    blocked.query AS blocked_query,
    blocked.wait_event_type,
    blocking.pid AS blocking_pid,
    blocking.usename AS blocking_user,
    blocking.query AS blocking_query,
    NOW() - blocked.query_start AS wait_duration
FROM pg_stat_activity blocked
JOIN pg_locks blocked_locks ON blocked.pid = blocked_locks.pid
JOIN pg_locks blocking_locks
    ON blocked_locks.locktype = blocking_locks.locktype
    AND blocked_locks.relation = blocking_locks.relation
    AND blocked_locks.pid != blocking_locks.pid
JOIN pg_stat_activity blocking ON blocking_locks.pid = blocking.pid
WHERE NOT blocked_locks.granted
ORDER BY wait_duration DESC;

Связанные документы