Skip to content

Concurrency Patterns

This document defines concurrency handling patterns for all critical operations in the IWM Platform.


Overview

Financial systems require explicit concurrency control to prevent:

  • Lost updates — Two transactions overwriting each other
  • Double-spending — Same funds used twice
  • Race conditions — Inconsistent state from interleaved operations
  • Deadlocks — Transactions waiting for each other indefinitely

Critical Operations Matrix

OperationLock TypeIsolation LevelRetry Strategy
Commission calculationPessimistic (row)Serializable3x with exponential backoff
Balance updatePessimisticRepeatableRead3x with exponential backoff
Payout requestPessimistic + unique indexSerializableNo retry (fail fast)
Payout processingPessimistic + idempotencySerializable3x with exponential backoff
Inventory reservationPessimisticSerializable3x with exponential backoff
Partner tree insertPessimistic (sponsor)RepeatableReadNo retry
Partner tree movePessimistic (subtree)SerializableNo retry
Order status changeOptimistic (version)ReadCommitted3x immediate
Rank recalculationPessimistic (partner)RepeatableRead3x with exponential backoff

Locking Strategies

Pessimistic Locking (SELECT FOR UPDATE)

Use when:

  • High contention expected
  • Operation cannot be safely retried
  • Financial operations where consistency is critical
sql
-- Lock specific rows before modification
SELECT * FROM mlm.partner_balances
WHERE partner_id = $1
FOR UPDATE;

-- NOWAIT variant: Fail immediately if locked (for fast-fail scenarios)
SELECT * FROM mlm.partner_balances
WHERE partner_id = $1
FOR UPDATE NOWAIT;

-- SKIP LOCKED variant: Process unlocked rows only (for batch jobs)
SELECT * FROM mlm.commission_transactions
WHERE status = 'PENDING'
FOR UPDATE SKIP LOCKED
LIMIT 100;

Optimistic Locking (Version Field)

Use when:

  • Low contention expected
  • Retries are acceptable
  • Read-heavy workloads
typescript
// Read with version
const balance = await tx.partnerBalance.findUnique({
  where: { partnerId }
});

// Update with version check
const updated = await tx.partnerBalance.updateMany({
  where: {
    partnerId,
    version: balance.version  // Only update if version unchanged
  },
  data: {
    availableBalance: { decrement: amount },
    version: { increment: 1 }
  }
});

if (updated.count === 0) {
  throw new OptimisticLockError('Balance was modified by another transaction');
}

Deadlock Prevention

Lock Ordering Rules

To prevent deadlocks, always acquire locks in this consistent order:

  1. core.users (by ID, ascending)
  2. mlm.partners (by ID, ascending)
  3. mlm.partner_balances (by partner_id, ascending)
  4. mlm.commission_transactions (by ID, ascending)
  5. mlm.payout_requests (by ID, ascending)
  6. product.products (by ID, ascending)
  7. product.inventory_reservations (by product_id, then ID)
  8. product.orders (by ID, ascending)
  9. investment.participations (by ID, ascending)

Implementation Pattern

typescript
// CORRECT: Sort IDs before locking to ensure consistent order
async function lockMultiplePartnerBalances(
  tx: PrismaTransaction,
  partnerIds: string[]
): Promise<void> {
  const sortedIds = [...partnerIds].sort();

  await tx.$queryRaw`
    SELECT id FROM mlm.partner_balances
    WHERE partner_id = ANY(${sortedIds}::uuid[])
    ORDER BY partner_id
    FOR UPDATE
  `;
}

// INCORRECT: Random order can cause deadlocks
async function lockMultiplePartnerBalancesWrong(
  tx: PrismaTransaction,
  partnerIds: string[]
): Promise<void> {
  // DON'T DO THIS - order is unpredictable
  for (const partnerId of partnerIds) {
    await tx.$queryRaw`
      SELECT id FROM mlm.partner_balances
      WHERE partner_id = ${partnerId}::uuid
      FOR UPDATE
    `;
  }
}

Deadlock Detection and Monitoring

sql
-- Create deadlock log table
CREATE TABLE core.deadlock_log (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    detected_at TIMESTAMP NOT NULL DEFAULT NOW(),
    blocked_pid INT NOT NULL,
    blocking_pid INT NOT NULL,
    blocked_query TEXT,
    blocking_query TEXT,
    wait_duration_ms INT
);

-- Function to log deadlock occurrences (call from application on deadlock error)
CREATE OR REPLACE FUNCTION core.log_deadlock(
    p_blocked_pid INT,
    p_blocking_pid INT,
    p_blocked_query TEXT,
    p_blocking_query TEXT,
    p_wait_duration_ms INT
) RETURNS VOID AS $$
BEGIN
    INSERT INTO core.deadlock_log (
        blocked_pid, blocking_pid, blocked_query, blocking_query, wait_duration_ms
    ) VALUES (
        p_blocked_pid, p_blocking_pid, p_blocked_query, p_blocking_query, p_wait_duration_ms
    );
END;
$$ LANGUAGE plpgsql;

Idempotency Patterns

Idempotency Key Structure

OperationKey FormatTTLStorage
Commission calculationcomm:{sourceType}:{sourceId}30 daysDatabase
Commission per partnercomm:{sourceType}:{sourceId}:{partnerId}:{level}30 daysDatabase
Payout processingpayout:{payoutRequestId}:{processor}7 daysDatabase
Webhook processingwebhook:{provider}:{eventId}7 daysRedis/Database
Order creationorder:{userId}:{cartChecksum}1 hourDatabase
Investment participationinvest:{userId}:{strategyId}:{amount}:{timestamp_hour}1 hourDatabase

Idempotency Check Pattern

typescript
async function processWithIdempotency<T>(
  tx: PrismaTransaction,
  idempotencyKey: string,
  ttlDays: number,
  operation: () => Promise<T>
): Promise<{ result: T; wasProcessed: boolean }> {
  // 1. Check if already processed
  const existing = await tx.idempotencyKey.findUnique({
    where: { key: idempotencyKey }
  });

  if (existing) {
    return {
      result: existing.response as T,
      wasProcessed: true
    };
  }

  // 2. Execute operation
  const result = await operation();

  // 3. Store idempotency key
  await tx.idempotencyKey.create({
    data: {
      key: idempotencyKey,
      response: result as any,
      requestHash: createHash('sha256').update(idempotencyKey).digest('hex'),
      expiresAt: addDays(new Date(), ttlDays)
    }
  });

  return { result, wasProcessed: false };
}

Double-Check Pattern (Inside Transaction)

typescript
async function safeProcessCommission(job: CommissionJobPayload) {
  const { idempotencyKey } = job;

  // Fast path: Check outside transaction
  const existing = await db.idempotencyKey.findUnique({
    where: { key: idempotencyKey }
  });
  if (existing) return existing.response;

  // Slow path: Process with transaction
  return await db.$transaction(async (tx) => {
    // CRITICAL: Re-check inside transaction (another process may have completed)
    const existingInTx = await tx.idempotencyKey.findUnique({
      where: { key: idempotencyKey }
    });
    if (existingInTx) return existingInTx.response;

    // Process...
    const result = await processCommissionLogic(tx, job);

    // Store idempotency key
    await tx.idempotencyKey.create({
      data: {
        key: idempotencyKey,
        response: result,
        expiresAt: addDays(new Date(), 30)
      }
    });

    return result;
  }, {
    isolationLevel: 'Serializable'
  });
}

Retry Strategies

Configuration

typescript
interface RetryConfig {
  maxAttempts: number;
  backoffMs: number[];
  retryableErrors: string[];
}

const RETRY_CONFIGS: Record<string, RetryConfig> = {
  commission: {
    maxAttempts: 3,
    backoffMs: [100, 500, 2000],
    retryableErrors: [
      'P2034',  // Prisma transaction conflict
      '40001',  // PostgreSQL serialization failure
      '40P01',  // PostgreSQL deadlock detected
    ]
  },
  balance: {
    maxAttempts: 3,
    backoffMs: [50, 200, 1000],
    retryableErrors: ['P2034', '40001', '40P01']
  },
  payout: {
    maxAttempts: 1,  // No retry - user should retry manually
    backoffMs: [],
    retryableErrors: []
  },
  inventory: {
    maxAttempts: 3,
    backoffMs: [50, 100, 500],
    retryableErrors: ['P2034', '40001', '40P01']
  }
};

Retry Implementation

typescript
async function withRetry<T>(
  operation: () => Promise<T>,
  config: RetryConfig,
  context: string
): Promise<T> {
  let lastError: Error | null = null;

  for (let attempt = 1; attempt <= config.maxAttempts; attempt++) {
    try {
      return await operation();
    } catch (error) {
      lastError = error as Error;

      const errorCode = extractErrorCode(error);
      const isRetryable = config.retryableErrors.includes(errorCode);

      if (!isRetryable || attempt === config.maxAttempts) {
        logger.error(`${context}: Non-retryable error or max attempts reached`, {
          attempt,
          errorCode,
          error: lastError.message
        });
        throw lastError;
      }

      const backoffMs = config.backoffMs[attempt - 1] || config.backoffMs[config.backoffMs.length - 1];

      logger.warn(`${context}: Retryable error, backing off`, {
        attempt,
        errorCode,
        backoffMs,
        nextAttempt: attempt + 1
      });

      await sleep(backoffMs);
    }
  }

  throw lastError;
}

function extractErrorCode(error: unknown): string {
  if (error instanceof Prisma.PrismaClientKnownRequestError) {
    return error.code;
  }
  if (error instanceof Error && 'code' in error) {
    return (error as any).code;
  }
  return 'UNKNOWN';
}

function sleep(ms: number): Promise<void> {
  return new Promise(resolve => setTimeout(resolve, ms));
}

Operation-Specific Patterns

1. Commission Calculation

See Commission Engine for full implementation.

Key Points:

  • Lock source record (order/investment) first
  • Lock all affected partner balances in sorted order
  • Use idempotency key per commission transaction
  • Process in Serializable isolation level

2. Balance Operations

See MLM Schema for SQL functions.

Key Points:

  • Always lock balance row before modification
  • Use version field for optimistic locking where appropriate
  • Validate sufficient balance inside transaction
  • Create audit trail for every balance change

3. Payout Requests

typescript
async function createPayoutRequest(
  partnerId: string,
  amount: number,
  payoutDetails: PayoutDetails
): Promise<PayoutRequest> {
  return await db.$transaction(async (tx) => {
    // 1. Lock partner balance
    const balances = await tx.$queryRaw<PartnerBalance[]>`
      SELECT * FROM mlm.partner_balances
      WHERE partner_id = ${partnerId}::uuid
      FOR UPDATE
    `;

    const balance = balances[0];
    if (!balance) {
      throw new PartnerNotFoundError(partnerId);
    }

    if (balance.available_balance < amount) {
      throw new InsufficientBalanceError(
        Number(balance.available_balance),
        amount
      );
    }

    // 2. Check for existing pending payout
    // (Also protected by unique partial index on table)
    const existingPending = await tx.payoutRequest.findFirst({
      where: {
        partnerId,
        status: { in: ['PENDING', 'APPROVED', 'PROCESSING'] }
      }
    });

    if (existingPending) {
      throw new PayoutAlreadyPendingError(existingPending.id);
    }

    // 3. Deduct from available balance
    await tx.partnerBalance.update({
      where: { partnerId },
      data: {
        availableBalance: { decrement: amount },
        version: { increment: 1 },
        updatedAt: new Date()
      }
    });

    // 4. Create payout request
    const payout = await tx.payoutRequest.create({
      data: {
        partnerId,
        amount,
        currency: 'RUB',
        payoutMethodType: payoutDetails.type,
        payoutDetails: payoutDetails as any,
        status: 'PENDING'
      }
    });

    // 5. Create audit log entry
    await tx.financialAuditLog.create({
      data: {
        eventType: 'PAYOUT_REQUESTED',
        partnerId,
        amount: -amount,
        currency: 'RUB',
        balanceBefore: Number(balance.available_balance),
        balanceAfter: Number(balance.available_balance) - amount,
        sourceType: 'PAYOUT_REQUEST',
        sourceId: payout.id,
        checksum: generateChecksum(payout)
      }
    });

    return payout;
  }, {
    isolationLevel: 'Serializable',
    timeout: 30000
  });
}

4. Inventory Reservation

typescript
async function reserveInventory(
  productId: string,
  cartId: string,
  quantity: number,
  expiresMinutes: number = 15
): Promise<{ success: boolean; reservationId?: string; availableQuantity: number }> {
  return await db.$transaction(async (tx) => {
    // 1. Lock product row
    const products = await tx.$queryRaw<Product[]>`
      SELECT * FROM product.products
      WHERE id = ${productId}::uuid
      FOR UPDATE
    `;

    const product = products[0];
    if (!product) {
      throw new ProductNotFoundError(productId);
    }

    // 2. Calculate available (lock reservations too)
    const reservedResult = await tx.$queryRaw<{ total: bigint }[]>`
      SELECT COALESCE(SUM(quantity), 0) as total
      FROM product.inventory_reservations
      WHERE product_id = ${productId}::uuid
        AND status = 'RESERVED'
        AND expires_at > NOW()
      FOR UPDATE
    `;

    const reserved = Number(reservedResult[0]?.total || 0);
    const available = product.stock_quantity - reserved;

    if (available < quantity) {
      return { success: false, availableQuantity: available };
    }

    // 3. Create reservation
    const reservation = await tx.inventoryReservation.create({
      data: {
        productId,
        cartId,
        quantity,
        status: 'RESERVED',
        expiresAt: addMinutes(new Date(), expiresMinutes)
      }
    });

    return {
      success: true,
      reservationId: reservation.id,
      availableQuantity: available - quantity
    };
  }, {
    isolationLevel: 'Serializable'
  });
}

5. Partner Tree Insert

typescript
async function registerPartner(
  userId: string,
  sponsorId: string | null,
  referralCode: string
): Promise<Partner> {
  return await db.$transaction(async (tx) => {
    // 1. Lock sponsor if exists (prevents concurrent tree modifications)
    if (sponsorId) {
      const sponsors = await tx.$queryRaw<Partner[]>`
        SELECT * FROM mlm.partners
        WHERE id = ${sponsorId}::uuid
        FOR UPDATE
      `;

      const sponsor = sponsors[0];
      if (!sponsor) {
        throw new SponsorNotFoundError(sponsorId);
      }

      if (sponsor.status !== 'ACTIVE') {
        throw new SponsorNotActiveError(sponsorId);
      }
    }

    // 2. Create partner (trigger will handle tree paths)
    const partner = await tx.partner.create({
      data: {
        userId,
        sponsorId,
        referralCode,
        status: 'ACTIVE',
        joinedAt: new Date()
      }
    });

    // 3. Create initial balance record
    await tx.partnerBalance.create({
      data: {
        partnerId: partner.id,
        availableBalance: 0,
        pendingBalance: 0,
        totalEarned: 0,
        totalWithdrawn: 0,
        careerPointsTotal: 0,
        careerPointsPeriod: 0,
        currency: 'RUB',
        version: 1
      }
    });

    return partner;
  }, {
    isolationLevel: 'RepeatableRead'
  });
}

Transaction Isolation Levels

When to Use Each Level

LevelUse CaseTrade-off
ReadCommittedRead-heavy, low consistency needsBest performance, allows non-repeatable reads
RepeatableReadBalance reads, tree queriesGood performance, prevents non-repeatable reads
SerializableFinancial writes, commission calcLowest performance, full consistency

Prisma Transaction Configuration

typescript
// Serializable for critical financial operations
await db.$transaction(async (tx) => {
  // ...
}, {
  isolationLevel: 'Serializable',
  timeout: 30000  // 30 seconds
});

// RepeatableRead for tree operations
await db.$transaction(async (tx) => {
  // ...
}, {
  isolationLevel: 'RepeatableRead',
  timeout: 15000  // 15 seconds
});

// Default (ReadCommitted) for simple operations
await db.$transaction(async (tx) => {
  // ...
});

Error Handling

Concurrency-Specific Errors

typescript
// Domain errors for concurrency issues
export class OptimisticLockError extends BusinessRuleError {
  constructor(entity: string) {
    super(
      'OPTIMISTIC_LOCK_CONFLICT',
      `${entity} was modified by another process. Please retry.`,
      { entity }
    );
  }
}

export class DeadlockDetectedError extends BusinessRuleError {
  constructor(operation: string) {
    super(
      'DEADLOCK_DETECTED',
      `Deadlock detected during ${operation}. Please retry.`,
      { operation }
    );
  }
}

export class TransactionTimeoutError extends BusinessRuleError {
  constructor(operation: string, timeoutMs: number) {
    super(
      'TRANSACTION_TIMEOUT',
      `Transaction timed out after ${timeoutMs}ms during ${operation}`,
      { operation, timeoutMs }
    );
  }
}

Error Code Mapping

typescript
function mapDatabaseError(error: unknown, context: string): Error {
  if (error instanceof Prisma.PrismaClientKnownRequestError) {
    switch (error.code) {
      case 'P2034':
        return new OptimisticLockError(context);
      case 'P2028':
        return new TransactionTimeoutError(context, 30000);
    }
  }

  if (error instanceof Error && 'code' in error) {
    const pgError = error as any;
    switch (pgError.code) {
      case '40001':  // serialization_failure
        return new OptimisticLockError(context);
      case '40P01':  // deadlock_detected
        return new DeadlockDetectedError(context);
      case '55P03':  // lock_not_available (NOWAIT)
        return new ResourceLockedError(context);
    }
  }

  return error as Error;
}

Monitoring and Alerting

Key Metrics

MetricSourceAlert Threshold
Deadlock countcore.deadlock_log> 5 per hour
Transaction retry rateApplication logs> 10% of operations
Lock wait time (p95)pg_stat_activity> 1 second
Serialization failuresApplication logs> 20 per minute
Payout double-attempt blocksApplication logsAny occurrence

PostgreSQL Lock Monitoring Query

sql
-- Active locks and waiting queries
SELECT
    blocked.pid AS blocked_pid,
    blocked.usename AS blocked_user,
    blocked.query AS blocked_query,
    blocked.wait_event_type,
    blocking.pid AS blocking_pid,
    blocking.usename AS blocking_user,
    blocking.query AS blocking_query,
    NOW() - blocked.query_start AS wait_duration
FROM pg_stat_activity blocked
JOIN pg_locks blocked_locks ON blocked.pid = blocked_locks.pid
JOIN pg_locks blocking_locks
    ON blocked_locks.locktype = blocking_locks.locktype
    AND blocked_locks.relation = blocking_locks.relation
    AND blocked_locks.pid != blocking_locks.pid
JOIN pg_stat_activity blocking ON blocking_locks.pid = blocking.pid
WHERE NOT blocked_locks.granted
ORDER BY wait_duration DESC;