Concurrency Patterns
This document defines concurrency handling patterns for all critical operations in the IWM Platform.
Overview
Financial systems require explicit concurrency control to prevent:
- Lost updates — Two transactions overwriting each other
- Double-spending — Same funds used twice
- Race conditions — Inconsistent state from interleaved operations
- Deadlocks — Transactions waiting for each other indefinitely
Critical Operations Matrix
| Operation | Lock Type | Isolation Level | Retry Strategy |
|---|---|---|---|
| Commission calculation | Pessimistic (row) | Serializable | 3x with exponential backoff |
| Balance update | Pessimistic | RepeatableRead | 3x with exponential backoff |
| Payout request | Pessimistic + unique index | Serializable | No retry (fail fast) |
| Payout processing | Pessimistic + idempotency | Serializable | 3x with exponential backoff |
| Inventory reservation | Pessimistic | Serializable | 3x with exponential backoff |
| Partner tree insert | Pessimistic (sponsor) | RepeatableRead | No retry |
| Partner tree move | Pessimistic (subtree) | Serializable | No retry |
| Order status change | Optimistic (version) | ReadCommitted | 3x immediate |
| Rank recalculation | Pessimistic (partner) | RepeatableRead | 3x with exponential backoff |
Locking Strategies
Pessimistic Locking (SELECT FOR UPDATE)
Use when:
- High contention expected
- Operation cannot be safely retried
- Financial operations where consistency is critical
sql
-- Lock specific rows before modification
SELECT * FROM mlm.partner_balances
WHERE partner_id = $1
FOR UPDATE;
-- NOWAIT variant: Fail immediately if locked (for fast-fail scenarios)
SELECT * FROM mlm.partner_balances
WHERE partner_id = $1
FOR UPDATE NOWAIT;
-- SKIP LOCKED variant: Process unlocked rows only (for batch jobs)
SELECT * FROM mlm.commission_transactions
WHERE status = 'PENDING'
FOR UPDATE SKIP LOCKED
LIMIT 100;Optimistic Locking (Version Field)
Use when:
- Low contention expected
- Retries are acceptable
- Read-heavy workloads
typescript
// Read with version
const balance = await tx.partnerBalance.findUnique({
where: { partnerId }
});
// Update with version check
const updated = await tx.partnerBalance.updateMany({
where: {
partnerId,
version: balance.version // Only update if version unchanged
},
data: {
availableBalance: { decrement: amount },
version: { increment: 1 }
}
});
if (updated.count === 0) {
throw new OptimisticLockError('Balance was modified by another transaction');
}Deadlock Prevention
Lock Ordering Rules
To prevent deadlocks, always acquire locks in this consistent order:
core.users(by ID, ascending)mlm.partners(by ID, ascending)mlm.partner_balances(by partner_id, ascending)mlm.commission_transactions(by ID, ascending)mlm.payout_requests(by ID, ascending)product.products(by ID, ascending)product.inventory_reservations(by product_id, then ID)product.orders(by ID, ascending)investment.participations(by ID, ascending)
Implementation Pattern
typescript
// CORRECT: Sort IDs before locking to ensure consistent order
async function lockMultiplePartnerBalances(
tx: PrismaTransaction,
partnerIds: string[]
): Promise<void> {
const sortedIds = [...partnerIds].sort();
await tx.$queryRaw`
SELECT id FROM mlm.partner_balances
WHERE partner_id = ANY(${sortedIds}::uuid[])
ORDER BY partner_id
FOR UPDATE
`;
}
// INCORRECT: Random order can cause deadlocks
async function lockMultiplePartnerBalancesWrong(
tx: PrismaTransaction,
partnerIds: string[]
): Promise<void> {
// DON'T DO THIS - order is unpredictable
for (const partnerId of partnerIds) {
await tx.$queryRaw`
SELECT id FROM mlm.partner_balances
WHERE partner_id = ${partnerId}::uuid
FOR UPDATE
`;
}
}Deadlock Detection and Monitoring
sql
-- Create deadlock log table
CREATE TABLE core.deadlock_log (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
detected_at TIMESTAMP NOT NULL DEFAULT NOW(),
blocked_pid INT NOT NULL,
blocking_pid INT NOT NULL,
blocked_query TEXT,
blocking_query TEXT,
wait_duration_ms INT
);
-- Function to log deadlock occurrences (call from application on deadlock error)
CREATE OR REPLACE FUNCTION core.log_deadlock(
p_blocked_pid INT,
p_blocking_pid INT,
p_blocked_query TEXT,
p_blocking_query TEXT,
p_wait_duration_ms INT
) RETURNS VOID AS $$
BEGIN
INSERT INTO core.deadlock_log (
blocked_pid, blocking_pid, blocked_query, blocking_query, wait_duration_ms
) VALUES (
p_blocked_pid, p_blocking_pid, p_blocked_query, p_blocking_query, p_wait_duration_ms
);
END;
$$ LANGUAGE plpgsql;Idempotency Patterns
Idempotency Key Structure
| Operation | Key Format | TTL | Storage |
|---|---|---|---|
| Commission calculation | comm:{sourceType}:{sourceId} | 30 days | Database |
| Commission per partner | comm:{sourceType}:{sourceId}:{partnerId}:{level} | 30 days | Database |
| Payout processing | payout:{payoutRequestId}:{processor} | 7 days | Database |
| Webhook processing | webhook:{provider}:{eventId} | 7 days | Redis/Database |
| Order creation | order:{userId}:{cartChecksum} | 1 hour | Database |
| Investment participation | invest:{userId}:{strategyId}:{amount}:{timestamp_hour} | 1 hour | Database |
Idempotency Check Pattern
typescript
async function processWithIdempotency<T>(
tx: PrismaTransaction,
idempotencyKey: string,
ttlDays: number,
operation: () => Promise<T>
): Promise<{ result: T; wasProcessed: boolean }> {
// 1. Check if already processed
const existing = await tx.idempotencyKey.findUnique({
where: { key: idempotencyKey }
});
if (existing) {
return {
result: existing.response as T,
wasProcessed: true
};
}
// 2. Execute operation
const result = await operation();
// 3. Store idempotency key
await tx.idempotencyKey.create({
data: {
key: idempotencyKey,
response: result as any,
requestHash: createHash('sha256').update(idempotencyKey).digest('hex'),
expiresAt: addDays(new Date(), ttlDays)
}
});
return { result, wasProcessed: false };
}Double-Check Pattern (Inside Transaction)
typescript
async function safeProcessCommission(job: CommissionJobPayload) {
const { idempotencyKey } = job;
// Fast path: Check outside transaction
const existing = await db.idempotencyKey.findUnique({
where: { key: idempotencyKey }
});
if (existing) return existing.response;
// Slow path: Process with transaction
return await db.$transaction(async (tx) => {
// CRITICAL: Re-check inside transaction (another process may have completed)
const existingInTx = await tx.idempotencyKey.findUnique({
where: { key: idempotencyKey }
});
if (existingInTx) return existingInTx.response;
// Process...
const result = await processCommissionLogic(tx, job);
// Store idempotency key
await tx.idempotencyKey.create({
data: {
key: idempotencyKey,
response: result,
expiresAt: addDays(new Date(), 30)
}
});
return result;
}, {
isolationLevel: 'Serializable'
});
}Retry Strategies
Configuration
typescript
interface RetryConfig {
maxAttempts: number;
backoffMs: number[];
retryableErrors: string[];
}
const RETRY_CONFIGS: Record<string, RetryConfig> = {
commission: {
maxAttempts: 3,
backoffMs: [100, 500, 2000],
retryableErrors: [
'P2034', // Prisma transaction conflict
'40001', // PostgreSQL serialization failure
'40P01', // PostgreSQL deadlock detected
]
},
balance: {
maxAttempts: 3,
backoffMs: [50, 200, 1000],
retryableErrors: ['P2034', '40001', '40P01']
},
payout: {
maxAttempts: 1, // No retry - user should retry manually
backoffMs: [],
retryableErrors: []
},
inventory: {
maxAttempts: 3,
backoffMs: [50, 100, 500],
retryableErrors: ['P2034', '40001', '40P01']
}
};Retry Implementation
typescript
async function withRetry<T>(
operation: () => Promise<T>,
config: RetryConfig,
context: string
): Promise<T> {
let lastError: Error | null = null;
for (let attempt = 1; attempt <= config.maxAttempts; attempt++) {
try {
return await operation();
} catch (error) {
lastError = error as Error;
const errorCode = extractErrorCode(error);
const isRetryable = config.retryableErrors.includes(errorCode);
if (!isRetryable || attempt === config.maxAttempts) {
logger.error(`${context}: Non-retryable error or max attempts reached`, {
attempt,
errorCode,
error: lastError.message
});
throw lastError;
}
const backoffMs = config.backoffMs[attempt - 1] || config.backoffMs[config.backoffMs.length - 1];
logger.warn(`${context}: Retryable error, backing off`, {
attempt,
errorCode,
backoffMs,
nextAttempt: attempt + 1
});
await sleep(backoffMs);
}
}
throw lastError;
}
function extractErrorCode(error: unknown): string {
if (error instanceof Prisma.PrismaClientKnownRequestError) {
return error.code;
}
if (error instanceof Error && 'code' in error) {
return (error as any).code;
}
return 'UNKNOWN';
}
function sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}Operation-Specific Patterns
1. Commission Calculation
See Commission Engine for full implementation.
Key Points:
- Lock source record (order/investment) first
- Lock all affected partner balances in sorted order
- Use idempotency key per commission transaction
- Process in Serializable isolation level
2. Balance Operations
See MLM Schema for SQL functions.
Key Points:
- Always lock balance row before modification
- Use version field for optimistic locking where appropriate
- Validate sufficient balance inside transaction
- Create audit trail for every balance change
3. Payout Requests
typescript
async function createPayoutRequest(
partnerId: string,
amount: number,
payoutDetails: PayoutDetails
): Promise<PayoutRequest> {
return await db.$transaction(async (tx) => {
// 1. Lock partner balance
const balances = await tx.$queryRaw<PartnerBalance[]>`
SELECT * FROM mlm.partner_balances
WHERE partner_id = ${partnerId}::uuid
FOR UPDATE
`;
const balance = balances[0];
if (!balance) {
throw new PartnerNotFoundError(partnerId);
}
if (balance.available_balance < amount) {
throw new InsufficientBalanceError(
Number(balance.available_balance),
amount
);
}
// 2. Check for existing pending payout
// (Also protected by unique partial index on table)
const existingPending = await tx.payoutRequest.findFirst({
where: {
partnerId,
status: { in: ['PENDING', 'APPROVED', 'PROCESSING'] }
}
});
if (existingPending) {
throw new PayoutAlreadyPendingError(existingPending.id);
}
// 3. Deduct from available balance
await tx.partnerBalance.update({
where: { partnerId },
data: {
availableBalance: { decrement: amount },
version: { increment: 1 },
updatedAt: new Date()
}
});
// 4. Create payout request
const payout = await tx.payoutRequest.create({
data: {
partnerId,
amount,
currency: 'RUB',
payoutMethodType: payoutDetails.type,
payoutDetails: payoutDetails as any,
status: 'PENDING'
}
});
// 5. Create audit log entry
await tx.financialAuditLog.create({
data: {
eventType: 'PAYOUT_REQUESTED',
partnerId,
amount: -amount,
currency: 'RUB',
balanceBefore: Number(balance.available_balance),
balanceAfter: Number(balance.available_balance) - amount,
sourceType: 'PAYOUT_REQUEST',
sourceId: payout.id,
checksum: generateChecksum(payout)
}
});
return payout;
}, {
isolationLevel: 'Serializable',
timeout: 30000
});
}4. Inventory Reservation
typescript
async function reserveInventory(
productId: string,
cartId: string,
quantity: number,
expiresMinutes: number = 15
): Promise<{ success: boolean; reservationId?: string; availableQuantity: number }> {
return await db.$transaction(async (tx) => {
// 1. Lock product row
const products = await tx.$queryRaw<Product[]>`
SELECT * FROM product.products
WHERE id = ${productId}::uuid
FOR UPDATE
`;
const product = products[0];
if (!product) {
throw new ProductNotFoundError(productId);
}
// 2. Calculate available (lock reservations too)
const reservedResult = await tx.$queryRaw<{ total: bigint }[]>`
SELECT COALESCE(SUM(quantity), 0) as total
FROM product.inventory_reservations
WHERE product_id = ${productId}::uuid
AND status = 'RESERVED'
AND expires_at > NOW()
FOR UPDATE
`;
const reserved = Number(reservedResult[0]?.total || 0);
const available = product.stock_quantity - reserved;
if (available < quantity) {
return { success: false, availableQuantity: available };
}
// 3. Create reservation
const reservation = await tx.inventoryReservation.create({
data: {
productId,
cartId,
quantity,
status: 'RESERVED',
expiresAt: addMinutes(new Date(), expiresMinutes)
}
});
return {
success: true,
reservationId: reservation.id,
availableQuantity: available - quantity
};
}, {
isolationLevel: 'Serializable'
});
}5. Partner Tree Insert
typescript
async function registerPartner(
userId: string,
sponsorId: string | null,
referralCode: string
): Promise<Partner> {
return await db.$transaction(async (tx) => {
// 1. Lock sponsor if exists (prevents concurrent tree modifications)
if (sponsorId) {
const sponsors = await tx.$queryRaw<Partner[]>`
SELECT * FROM mlm.partners
WHERE id = ${sponsorId}::uuid
FOR UPDATE
`;
const sponsor = sponsors[0];
if (!sponsor) {
throw new SponsorNotFoundError(sponsorId);
}
if (sponsor.status !== 'ACTIVE') {
throw new SponsorNotActiveError(sponsorId);
}
}
// 2. Create partner (trigger will handle tree paths)
const partner = await tx.partner.create({
data: {
userId,
sponsorId,
referralCode,
status: 'ACTIVE',
joinedAt: new Date()
}
});
// 3. Create initial balance record
await tx.partnerBalance.create({
data: {
partnerId: partner.id,
availableBalance: 0,
pendingBalance: 0,
totalEarned: 0,
totalWithdrawn: 0,
careerPointsTotal: 0,
careerPointsPeriod: 0,
currency: 'RUB',
version: 1
}
});
return partner;
}, {
isolationLevel: 'RepeatableRead'
});
}Transaction Isolation Levels
When to Use Each Level
| Level | Use Case | Trade-off |
|---|---|---|
| ReadCommitted | Read-heavy, low consistency needs | Best performance, allows non-repeatable reads |
| RepeatableRead | Balance reads, tree queries | Good performance, prevents non-repeatable reads |
| Serializable | Financial writes, commission calc | Lowest performance, full consistency |
Prisma Transaction Configuration
typescript
// Serializable for critical financial operations
await db.$transaction(async (tx) => {
// ...
}, {
isolationLevel: 'Serializable',
timeout: 30000 // 30 seconds
});
// RepeatableRead for tree operations
await db.$transaction(async (tx) => {
// ...
}, {
isolationLevel: 'RepeatableRead',
timeout: 15000 // 15 seconds
});
// Default (ReadCommitted) for simple operations
await db.$transaction(async (tx) => {
// ...
});Error Handling
Concurrency-Specific Errors
typescript
// Domain errors for concurrency issues
export class OptimisticLockError extends BusinessRuleError {
constructor(entity: string) {
super(
'OPTIMISTIC_LOCK_CONFLICT',
`${entity} was modified by another process. Please retry.`,
{ entity }
);
}
}
export class DeadlockDetectedError extends BusinessRuleError {
constructor(operation: string) {
super(
'DEADLOCK_DETECTED',
`Deadlock detected during ${operation}. Please retry.`,
{ operation }
);
}
}
export class TransactionTimeoutError extends BusinessRuleError {
constructor(operation: string, timeoutMs: number) {
super(
'TRANSACTION_TIMEOUT',
`Transaction timed out after ${timeoutMs}ms during ${operation}`,
{ operation, timeoutMs }
);
}
}Error Code Mapping
typescript
function mapDatabaseError(error: unknown, context: string): Error {
if (error instanceof Prisma.PrismaClientKnownRequestError) {
switch (error.code) {
case 'P2034':
return new OptimisticLockError(context);
case 'P2028':
return new TransactionTimeoutError(context, 30000);
}
}
if (error instanceof Error && 'code' in error) {
const pgError = error as any;
switch (pgError.code) {
case '40001': // serialization_failure
return new OptimisticLockError(context);
case '40P01': // deadlock_detected
return new DeadlockDetectedError(context);
case '55P03': // lock_not_available (NOWAIT)
return new ResourceLockedError(context);
}
}
return error as Error;
}Monitoring and Alerting
Key Metrics
| Metric | Source | Alert Threshold |
|---|---|---|
| Deadlock count | core.deadlock_log | > 5 per hour |
| Transaction retry rate | Application logs | > 10% of operations |
| Lock wait time (p95) | pg_stat_activity | > 1 second |
| Serialization failures | Application logs | > 20 per minute |
| Payout double-attempt blocks | Application logs | Any occurrence |
PostgreSQL Lock Monitoring Query
sql
-- Active locks and waiting queries
SELECT
blocked.pid AS blocked_pid,
blocked.usename AS blocked_user,
blocked.query AS blocked_query,
blocked.wait_event_type,
blocking.pid AS blocking_pid,
blocking.usename AS blocking_user,
blocking.query AS blocking_query,
NOW() - blocked.query_start AS wait_duration
FROM pg_stat_activity blocked
JOIN pg_locks blocked_locks ON blocked.pid = blocked_locks.pid
JOIN pg_locks blocking_locks
ON blocked_locks.locktype = blocking_locks.locktype
AND blocked_locks.relation = blocking_locks.relation
AND blocked_locks.pid != blocking_locks.pid
JOIN pg_stat_activity blocking ON blocking_locks.pid = blocking.pid
WHERE NOT blocked_locks.granted
ORDER BY wait_duration DESC;