Паттерны конкурентности
Этот документ определяет паттерны обработки конкурентности для всех критических операций платформы IWM.
Обзор
Финансовые системы требуют явного контроля конкурентности для предотвращения:
- Потерянные обновления — Две транзакции перезаписывают друг друга
- Двойное списание — Одни и те же средства используются дважды
- Состояния гонки — Несогласованное состояние из-за чередования операций
- Взаимоблокировки — Транзакции ожидают друг друга бесконечно
Матрица критических операций
| Операция | Тип блокировки | Уровень изоляции | Стратегия повторов |
|---|---|---|---|
| Расчёт комиссий | Пессимистичная (строка) | Serializable | 3x с экспоненциальным откатом |
| Обновление баланса | Пессимистичная | RepeatableRead | 3x с экспоненциальным откатом |
| Запрос на выплату | Пессимистичная + уникальный индекс | Serializable | Без повторов (быстрый отказ) |
| Обработка выплаты | Пессимистичная + идемпотентность | Serializable | 3x с экспоненциальным откатом |
| Резервирование товара | Пессимистичная | Serializable | 3x с экспоненциальным откатом |
| Вставка в дерево партнёров | Пессимистичная (спонсор) | RepeatableRead | Без повторов |
| Перемещение в дереве партнёров | Пессимистичная (поддерево) | Serializable | Без повторов |
| Изменение статуса заказа | Оптимистичная (версия) | ReadCommitted | 3x немедленно |
| Пересчёт ранга | Пессимистичная (партнёр) | RepeatableRead | 3x с экспоненциальным откатом |
Стратегии блокировки
Пессимистичная блокировка (SELECT FOR UPDATE)
Использовать когда:
- Ожидается высокая конкуренция
- Операция не может быть безопасно повторена
- Финансовые операции, где критична согласованность
sql
-- Блокировка конкретных строк перед изменением
SELECT * FROM mlm.partner_balances
WHERE partner_id = $1
FOR UPDATE;
-- Вариант NOWAIT: Немедленный отказ при блокировке (для быстрого отказа)
SELECT * FROM mlm.partner_balances
WHERE partner_id = $1
FOR UPDATE NOWAIT;
-- Вариант SKIP LOCKED: Обработка только незаблокированных строк (для batch jobs)
SELECT * FROM mlm.commission_transactions
WHERE status = 'PENDING'
FOR UPDATE SKIP LOCKED
LIMIT 100;Оптимистичная блокировка (поле версии)
Использовать когда:
- Ожидается низкая конкуренция
- Повторы допустимы
- Преобладают операции чтения
typescript
// Чтение с версией
const balance = await tx.partnerBalance.findUnique({
where: { partnerId }
});
// Обновление с проверкой версии
const updated = await tx.partnerBalance.updateMany({
where: {
partnerId,
version: balance.version // Обновлять только если версия не изменилась
},
data: {
availableBalance: { decrement: amount },
version: { increment: 1 }
}
});
if (updated.count === 0) {
throw new OptimisticLockError('Balance was modified by another transaction');
}Предотвращение взаимоблокировок
Правила порядка блокировки
Для предотвращения взаимоблокировок всегда захватывать блокировки в этом согласованном порядке:
core.users(по ID, по возрастанию)mlm.partners(по ID, по возрастанию)mlm.partner_balances(по partner_id, по возрастанию)mlm.commission_transactions(по ID, по возрастанию)mlm.payout_requests(по ID, по возрастанию)product.products(по ID, по возрастанию)product.inventory_reservations(по product_id, затем ID)product.orders(по ID, по возрастанию)investment.participations(по ID, по возрастанию)
Паттерн реализации
typescript
// ПРАВИЛЬНО: Сортировка ID перед блокировкой для обеспечения согласованного порядка
async function lockMultiplePartnerBalances(
tx: PrismaTransaction,
partnerIds: string[]
): Promise<void> {
const sortedIds = [...partnerIds].sort();
await tx.$queryRaw`
SELECT id FROM mlm.partner_balances
WHERE partner_id = ANY(${sortedIds}::uuid[])
ORDER BY partner_id
FOR UPDATE
`;
}
// НЕПРАВИЛЬНО: Случайный порядок может вызвать взаимоблокировки
async function lockMultiplePartnerBalancesWrong(
tx: PrismaTransaction,
partnerIds: string[]
): Promise<void> {
// НЕ ДЕЛАТЬ ТАК - порядок непредсказуем
for (const partnerId of partnerIds) {
await tx.$queryRaw`
SELECT id FROM mlm.partner_balances
WHERE partner_id = ${partnerId}::uuid
FOR UPDATE
`;
}
}Обнаружение и мониторинг взаимоблокировок
sql
-- Создание таблицы лога взаимоблокировок
CREATE TABLE core.deadlock_log (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
detected_at TIMESTAMP NOT NULL DEFAULT NOW(),
blocked_pid INT NOT NULL,
blocking_pid INT NOT NULL,
blocked_query TEXT,
blocking_query TEXT,
wait_duration_ms INT
);
-- Функция для логирования взаимоблокировок (вызывать из приложения при ошибке)
CREATE OR REPLACE FUNCTION core.log_deadlock(
p_blocked_pid INT,
p_blocking_pid INT,
p_blocked_query TEXT,
p_blocking_query TEXT,
p_wait_duration_ms INT
) RETURNS VOID AS $$
BEGIN
INSERT INTO core.deadlock_log (
blocked_pid, blocking_pid, blocked_query, blocking_query, wait_duration_ms
) VALUES (
p_blocked_pid, p_blocking_pid, p_blocked_query, p_blocking_query, p_wait_duration_ms
);
END;
$$ LANGUAGE plpgsql;Паттерны идемпотентности
Структура ключа идемпотентности
| Операция | Формат ключа | TTL | Хранилище |
|---|---|---|---|
| Расчёт комиссий | comm:{sourceType}:{sourceId} | 30 дней | База данных |
| Комиссия по партнёру | comm:{sourceType}:{sourceId}:{partnerId}:{level} | 30 дней | База данных |
| Обработка выплаты | payout:{payoutRequestId}:{processor} | 7 дней | База данных |
| Обработка webhook | webhook:{provider}:{eventId} | 7 дней | Redis/База данных |
| Создание заказа | order:{userId}:{cartChecksum} | 1 час | База данных |
| Участие в инвестиции | invest:{userId}:{strategyId}:{amount}:{timestamp_hour} | 1 час | База данных |
Паттерн проверки идемпотентности
typescript
async function processWithIdempotency<T>(
tx: PrismaTransaction,
idempotencyKey: string,
ttlDays: number,
operation: () => Promise<T>
): Promise<{ result: T; wasProcessed: boolean }> {
// 1. Проверка, было ли уже обработано
const existing = await tx.idempotencyKey.findUnique({
where: { key: idempotencyKey }
});
if (existing) {
return {
result: existing.response as T,
wasProcessed: true
};
}
// 2. Выполнение операции
const result = await operation();
// 3. Сохранение ключа идемпотентности
await tx.idempotencyKey.create({
data: {
key: idempotencyKey,
response: result as any,
requestHash: createHash('sha256').update(idempotencyKey).digest('hex'),
expiresAt: addDays(new Date(), ttlDays)
}
});
return { result, wasProcessed: false };
}Паттерн двойной проверки (внутри транзакции)
typescript
async function safeProcessCommission(job: CommissionJobPayload) {
const { idempotencyKey } = job;
// Быстрый путь: Проверка вне транзакции
const existing = await db.idempotencyKey.findUnique({
where: { key: idempotencyKey }
});
if (existing) return existing.response;
// Медленный путь: Обработка с транзакцией
return await db.$transaction(async (tx) => {
// КРИТИЧНО: Повторная проверка внутри транзакции (другой процесс мог завершить)
const existingInTx = await tx.idempotencyKey.findUnique({
where: { key: idempotencyKey }
});
if (existingInTx) return existingInTx.response;
// Обработка...
const result = await processCommissionLogic(tx, job);
// Сохранение ключа идемпотентности
await tx.idempotencyKey.create({
data: {
key: idempotencyKey,
response: result,
expiresAt: addDays(new Date(), 30)
}
});
return result;
}, {
isolationLevel: 'Serializable'
});
}Стратегии повторов
Конфигурация
typescript
interface RetryConfig {
maxAttempts: number;
backoffMs: number[];
retryableErrors: string[];
}
const RETRY_CONFIGS: Record<string, RetryConfig> = {
commission: {
maxAttempts: 3,
backoffMs: [100, 500, 2000],
retryableErrors: [
'P2034', // Конфликт транзакции Prisma
'40001', // Ошибка сериализации PostgreSQL
'40P01', // Обнаружена взаимоблокировка PostgreSQL
]
},
balance: {
maxAttempts: 3,
backoffMs: [50, 200, 1000],
retryableErrors: ['P2034', '40001', '40P01']
},
payout: {
maxAttempts: 1, // Без повторов - пользователь должен повторить вручную
backoffMs: [],
retryableErrors: []
},
inventory: {
maxAttempts: 3,
backoffMs: [50, 100, 500],
retryableErrors: ['P2034', '40001', '40P01']
}
};Реализация повторов
typescript
async function withRetry<T>(
operation: () => Promise<T>,
config: RetryConfig,
context: string
): Promise<T> {
let lastError: Error | null = null;
for (let attempt = 1; attempt <= config.maxAttempts; attempt++) {
try {
return await operation();
} catch (error) {
lastError = error as Error;
const errorCode = extractErrorCode(error);
const isRetryable = config.retryableErrors.includes(errorCode);
if (!isRetryable || attempt === config.maxAttempts) {
logger.error(`${context}: Non-retryable error or max attempts reached`, {
attempt,
errorCode,
error: lastError.message
});
throw lastError;
}
const backoffMs = config.backoffMs[attempt - 1] || config.backoffMs[config.backoffMs.length - 1];
logger.warn(`${context}: Retryable error, backing off`, {
attempt,
errorCode,
backoffMs,
nextAttempt: attempt + 1
});
await sleep(backoffMs);
}
}
throw lastError;
}
function extractErrorCode(error: unknown): string {
if (error instanceof Prisma.PrismaClientKnownRequestError) {
return error.code;
}
if (error instanceof Error && 'code' in error) {
return (error as any).code;
}
return 'UNKNOWN';
}
function sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}Паттерны для конкретных операций
1. Расчёт комиссий
См. Движок комиссий для полной реализации.
Ключевые моменты:
- Сначала блокировать исходную запись (заказ/инвестиция)
- Блокировать все затронутые балансы партнёров в отсортированном порядке
- Использовать ключ идемпотентности для каждой транзакции комиссии
- Обрабатывать с уровнем изоляции Serializable
2. Операции с балансом
См. MLM Schema для SQL функций.
Ключевые моменты:
- Всегда блокировать строку баланса перед изменением
- Использовать поле версии для оптимистичной блокировки где уместно
- Проверять достаточность баланса внутри транзакции
- Создавать аудит-трейл для каждого изменения баланса
3. Запросы на выплату
typescript
async function createPayoutRequest(
partnerId: string,
amount: number,
payoutDetails: PayoutDetails
): Promise<PayoutRequest> {
return await db.$transaction(async (tx) => {
// 1. Блокировка баланса партнёра
const balances = await tx.$queryRaw<PartnerBalance[]>`
SELECT * FROM mlm.partner_balances
WHERE partner_id = ${partnerId}::uuid
FOR UPDATE
`;
const balance = balances[0];
if (!balance) {
throw new PartnerNotFoundError(partnerId);
}
if (balance.available_balance < amount) {
throw new InsufficientBalanceError(
Number(balance.available_balance),
amount
);
}
// 2. Проверка существующей ожидающей выплаты
// (Также защищено уникальным частичным индексом на таблице)
const existingPending = await tx.payoutRequest.findFirst({
where: {
partnerId,
status: { in: ['PENDING', 'APPROVED', 'PROCESSING'] }
}
});
if (existingPending) {
throw new PayoutAlreadyPendingError(existingPending.id);
}
// 3. Списание с доступного баланса
await tx.partnerBalance.update({
where: { partnerId },
data: {
availableBalance: { decrement: amount },
version: { increment: 1 },
updatedAt: new Date()
}
});
// 4. Создание запроса на выплату
const payout = await tx.payoutRequest.create({
data: {
partnerId,
amount,
currency: 'RUB',
payoutMethodType: payoutDetails.type,
payoutDetails: payoutDetails as any,
status: 'PENDING'
}
});
// 5. Создание записи аудит-лога
await tx.financialAuditLog.create({
data: {
eventType: 'PAYOUT_REQUESTED',
partnerId,
amount: -amount,
currency: 'RUB',
balanceBefore: Number(balance.available_balance),
balanceAfter: Number(balance.available_balance) - amount,
sourceType: 'PAYOUT_REQUEST',
sourceId: payout.id,
checksum: generateChecksum(payout)
}
});
return payout;
}, {
isolationLevel: 'Serializable',
timeout: 30000
});
}4. Резервирование товаров
typescript
async function reserveInventory(
productId: string,
cartId: string,
quantity: number,
expiresMinutes: number = 15
): Promise<{ success: boolean; reservationId?: string; availableQuantity: number }> {
return await db.$transaction(async (tx) => {
// 1. Блокировка строки товара
const products = await tx.$queryRaw<Product[]>`
SELECT * FROM product.products
WHERE id = ${productId}::uuid
FOR UPDATE
`;
const product = products[0];
if (!product) {
throw new ProductNotFoundError(productId);
}
// 2. Расчёт доступного количества (блокировка резервирований тоже)
const reservedResult = await tx.$queryRaw<{ total: bigint }[]>`
SELECT COALESCE(SUM(quantity), 0) as total
FROM product.inventory_reservations
WHERE product_id = ${productId}::uuid
AND status = 'RESERVED'
AND expires_at > NOW()
FOR UPDATE
`;
const reserved = Number(reservedResult[0]?.total || 0);
const available = product.stock_quantity - reserved;
if (available < quantity) {
return { success: false, availableQuantity: available };
}
// 3. Создание резервирования
const reservation = await tx.inventoryReservation.create({
data: {
productId,
cartId,
quantity,
status: 'RESERVED',
expiresAt: addMinutes(new Date(), expiresMinutes)
}
});
return {
success: true,
reservationId: reservation.id,
availableQuantity: available - quantity
};
}, {
isolationLevel: 'Serializable'
});
}5. Вставка в дерево партнёров
typescript
async function registerPartner(
userId: string,
sponsorId: string | null,
referralCode: string
): Promise<Partner> {
return await db.$transaction(async (tx) => {
// 1. Блокировка спонсора если существует (предотвращает конкурентные изменения дерева)
if (sponsorId) {
const sponsors = await tx.$queryRaw<Partner[]>`
SELECT * FROM mlm.partners
WHERE id = ${sponsorId}::uuid
FOR UPDATE
`;
const sponsor = sponsors[0];
if (!sponsor) {
throw new SponsorNotFoundError(sponsorId);
}
if (sponsor.status !== 'ACTIVE') {
throw new SponsorNotActiveError(sponsorId);
}
}
// 2. Создание партнёра (триггер обработает пути дерева)
const partner = await tx.partner.create({
data: {
userId,
sponsorId,
referralCode,
status: 'ACTIVE',
joinedAt: new Date()
}
});
// 3. Создание начальной записи баланса
await tx.partnerBalance.create({
data: {
partnerId: partner.id,
availableBalance: 0,
pendingBalance: 0,
totalEarned: 0,
totalWithdrawn: 0,
careerPointsTotal: 0,
careerPointsPeriod: 0,
currency: 'RUB',
version: 1
}
});
return partner;
}, {
isolationLevel: 'RepeatableRead'
});
}Уровни изоляции транзакций
Когда использовать каждый уровень
| Уровень | Случай использования | Компромисс |
|---|---|---|
| ReadCommitted | Преобладание чтения, низкие требования к согласованности | Лучшая производительность, допускает неповторяемое чтение |
| RepeatableRead | Чтение балансов, запросы дерева | Хорошая производительность, предотвращает неповторяемое чтение |
| Serializable | Финансовые записи, расчёт комиссий | Низшая производительность, полная согласованность |
Конфигурация транзакций Prisma
typescript
// Serializable для критических финансовых операций
await db.$transaction(async (tx) => {
// ...
}, {
isolationLevel: 'Serializable',
timeout: 30000 // 30 секунд
});
// RepeatableRead для операций с деревом
await db.$transaction(async (tx) => {
// ...
}, {
isolationLevel: 'RepeatableRead',
timeout: 15000 // 15 секунд
});
// По умолчанию (ReadCommitted) для простых операций
await db.$transaction(async (tx) => {
// ...
});Обработка ошибок
Ошибки, специфичные для конкурентности
typescript
// Доменные ошибки для проблем конкурентности
export class OptimisticLockError extends BusinessRuleError {
constructor(entity: string) {
super(
'OPTIMISTIC_LOCK_CONFLICT',
`${entity} was modified by another process. Please retry.`,
{ entity }
);
}
}
export class DeadlockDetectedError extends BusinessRuleError {
constructor(operation: string) {
super(
'DEADLOCK_DETECTED',
`Deadlock detected during ${operation}. Please retry.`,
{ operation }
);
}
}
export class TransactionTimeoutError extends BusinessRuleError {
constructor(operation: string, timeoutMs: number) {
super(
'TRANSACTION_TIMEOUT',
`Transaction timed out after ${timeoutMs}ms during ${operation}`,
{ operation, timeoutMs }
);
}
}Маппинг кодов ошибок
typescript
function mapDatabaseError(error: unknown, context: string): Error {
if (error instanceof Prisma.PrismaClientKnownRequestError) {
switch (error.code) {
case 'P2034':
return new OptimisticLockError(context);
case 'P2028':
return new TransactionTimeoutError(context, 30000);
}
}
if (error instanceof Error && 'code' in error) {
const pgError = error as any;
switch (pgError.code) {
case '40001': // serialization_failure
return new OptimisticLockError(context);
case '40P01': // deadlock_detected
return new DeadlockDetectedError(context);
case '55P03': // lock_not_available (NOWAIT)
return new ResourceLockedError(context);
}
}
return error as Error;
}Мониторинг и оповещения
Ключевые метрики
| Метрика | Источник | Порог оповещения |
|---|---|---|
| Количество взаимоблокировок | core.deadlock_log | > 5 в час |
| Частота повторов транзакций | Логи приложения | > 10% операций |
| Время ожидания блокировки (p95) | pg_stat_activity | > 1 секунда |
| Ошибки сериализации | Логи приложения | > 20 в минуту |
| Блокировки двойных выплат | Логи приложения | Любое событие |
Запрос мониторинга блокировок PostgreSQL
sql
-- Активные блокировки и ожидающие запросы
SELECT
blocked.pid AS blocked_pid,
blocked.usename AS blocked_user,
blocked.query AS blocked_query,
blocked.wait_event_type,
blocking.pid AS blocking_pid,
blocking.usename AS blocking_user,
blocking.query AS blocking_query,
NOW() - blocked.query_start AS wait_duration
FROM pg_stat_activity blocked
JOIN pg_locks blocked_locks ON blocked.pid = blocked_locks.pid
JOIN pg_locks blocking_locks
ON blocked_locks.locktype = blocking_locks.locktype
AND blocked_locks.relation = blocking_locks.relation
AND blocked_locks.pid != blocking_locks.pid
JOIN pg_stat_activity blocking ON blocking_locks.pid = blocking.pid
WHERE NOT blocked_locks.granted
ORDER BY wait_duration DESC;