【初心者向け】プログラミングの「トランザクション」基礎
データベーストランザクションの基本概念をプログラミング初心者向けに解説。ACID特性、コミット・ロールバック、実装例まで分かりやすく紹介します。
みなさん、プログラミングで「銀行の振込処理」や「ECサイトの注文処理」を作る時、「途中でエラーが起きたらどうしよう」と考えたことはありませんか?
例えば、振込処理の途中でシステムが落ちて、送金者の口座からは引き落としされたのに、受取人の口座には入金されない、なんてことが起きたら大変です。 このような問題を防ぐための仕組みが「トランザクション」です。
この記事では、プログラミング初心者向けにトランザクションの基本概念から実装方法まで、具体例を交えながら詳しく解説します。 データの整合性を保つ重要な技術を理解して、安全なプログラムを作れるようになりましょう。
トランザクションとは何か
トランザクション(Transaction)とは、データベース操作において「一連の処理をひとまとまりとして扱う仕組み」のことです。
「全ての操作が成功するか、全ての操作が失敗するか」のどちらかしか許さないことで、データの整合性を保ちます。 これを「All or Nothing(全か無か)」の原則と呼びます。
身近な例で理解する
トランザクションを身近な例で説明してみましょう。
// 銀行振込のトランザクション例class BankTransfer { constructor() { this.accounts = new Map([ ['A001', { name: '田中太郎', balance: 100000 }], ['A002', { name: '佐藤花子', balance: 50000 }] ]); } // トランザクションなしの危険な振込処理 unsafeTransfer(fromAccount, toAccount, amount) { console.log(`危険な振込開始: ${fromAccount} → ${toAccount} (${amount}円)`); // Step 1: 送金者の口座から引き落とし const sender = this.accounts.get(fromAccount); if (sender.balance < amount) { throw new Error('残高不足'); } sender.balance -= amount; console.log(`${fromAccount}: ${sender.balance}円 (${amount}円引き落とし)`); // ここでシステムエラーが発生したらどうなる? // 引き落としは完了したが、入金は未完了... if (Math.random() < 0.3) { // 30%の確率でエラー throw new Error('システムエラーが発生しました'); } // Step 2: 受取人の口座に入金 const receiver = this.accounts.get(toAccount); receiver.balance += amount; console.log(`${toAccount}: ${receiver.balance}円 (${amount}円入金)`); console.log('振込完了'); } // トランザクションを使った安全な振込処理 safeTransfer(fromAccount, toAccount, amount) { console.log(`安全な振込開始: ${fromAccount} → ${toAccount} (${amount}円)`); // トランザクション開始 const transaction = this.beginTransaction(); try { // Step 1: 送金者の口座から引き落とし const sender = this.accounts.get(fromAccount); if (sender.balance < amount) { throw new Error('残高不足'); } transaction.addOperation('debit', fromAccount, amount); sender.balance -= amount; console.log(`${fromAccount}: ${sender.balance}円 (${amount}円引き落とし予定)`); // エラーが発生してもここで捕捉される if (Math.random() < 0.3) { throw new Error('システムエラーが発生しました'); } // Step 2: 受取人の口座に入金 const receiver = this.accounts.get(toAccount); transaction.addOperation('credit', toAccount, amount); receiver.balance += amount; console.log(`${toAccount}: ${receiver.balance}円 (${amount}円入金予定)`); // 全ての操作が成功したらコミット transaction.commit(); console.log('振込完了(コミット)'); } catch (error) { // エラーが発生したらロールバック console.log(`エラー発生: ${error.message}`); transaction.rollback(); console.log('振込取消(ロールバック)'); throw error; } } beginTransaction() { return new Transaction(this.accounts); } showAllBalances() { console.log('=== 現在の残高 ==='); for (const [accountId, account] of this.accounts) { console.log(`${accountId} (${account.name}): ${account.balance}円`); } console.log('================'); }}
// トランザクション管理クラスclass Transaction { constructor(accounts) { this.accounts = accounts; this.operations = []; this.originalState = new Map(); this.isCommitted = false; this.isRollbacked = false; } addOperation(type, accountId, amount) { // 初回操作時に元の状態を保存 if (!this.originalState.has(accountId)) { const account = this.accounts.get(accountId); this.originalState.set(accountId, { ...account }); } this.operations.push({ type, accountId, amount }); } commit() { if (this.isCommitted || this.isRollbacked) { throw new Error('Transaction already finalized'); } // コミット処理(実際のDBでは永続化処理) console.log('トランザクションをコミットしました'); this.isCommitted = true; } rollback() { if (this.isCommitted || this.isRollbacked) { throw new Error('Transaction already finalized'); } // ロールバック処理:元の状態に戻す for (const [accountId, originalAccount] of this.originalState) { const currentAccount = this.accounts.get(accountId); currentAccount.balance = originalAccount.balance; } console.log('トランザクションをロールバックしました'); this.isRollbacked = true; }}
// 使用例const bank = new BankTransfer();
console.log('=== トランザクションのデモ ===');bank.showAllBalances();
try { // 安全な振込処理を試行 bank.safeTransfer('A001', 'A002', 30000);} catch (error) { console.log('振込に失敗しましたが、データは安全です');}
bank.showAllBalances();
この例では、トランザクションによってシステムエラーが発生しても、データの整合性が保たれることがわかります。
トランザクションが必要な場面
以下のような場面でトランザクションが重要になります。
- 金融取引: 振込、入出金、決済処理
- ECサイト: 注文処理、在庫管理、ポイント処理
- 予約システム: 座席予約、リソース確保
- ユーザー管理: アカウント作成、権限変更
- データ移行: 大量データの一括処理
ACID特性の理解
トランザクションの品質を保証する「ACID特性」について詳しく解説します。
Atomicity(原子性)
「All or Nothing」の原則で、トランザクション内の操作は全て成功するか、全て失敗するかのどちらかです。
# 原子性の実装例class AtomicityDemo: def __init__(self): self.users = {} self.orders = {} self.inventory = {'product_1': 100, 'product_2': 50} def create_order_atomic(self, user_id, product_id, quantity): """原子性を保った注文処理""" print(f"注文処理開始: ユーザー{user_id} - 商品{product_id} × {quantity}") # トランザクション開始 transaction = { 'operations': [], 'rollback_data': {} } try: # 1. 在庫確認と減算 if self.inventory[product_id] < quantity: raise Exception("在庫不足") # ロールバック用に元の在庫数を保存 transaction['rollback_data']['inventory'] = self.inventory[product_id] # 在庫を減算 self.inventory[product_id] -= quantity transaction['operations'].append(('inventory_update', product_id, quantity)) print(f"在庫減算: {product_id} 残り{self.inventory[product_id]}個") # 2. 注文レコード作成 order_id = f"order_{len(self.orders) + 1}" order = { 'order_id': order_id, 'user_id': user_id, 'product_id': product_id, 'quantity': quantity, 'status': 'processing' } # 何らかの理由で注文作成に失敗する場合 if product_id == 'error_product': raise Exception("注文作成エラー") self.orders[order_id] = order transaction['operations'].append(('order_create', order_id)) print(f"注文作成: {order_id}") # 3. ユーザーに注文履歴追加 if user_id not in self.users: self.users[user_id] = {'orders': []} self.users[user_id]['orders'].append(order_id) transaction['operations'].append(('user_update', user_id, order_id)) print(f"ユーザー履歴更新: {user_id}") # 全ての操作が成功 -> コミット print("✅ 全ての操作が成功しました(コミット)") return order_id except Exception as e: # エラーが発生 -> ロールバック print(f"❌ エラー発生: {e}") self.rollback_transaction(transaction) raise e def rollback_transaction(self, transaction): """トランザクションのロールバック""" print("🔄 ロールバック開始...") # 操作を逆順で取り消し for operation in reversed(transaction['operations']): operation_type = operation[0] if operation_type == 'inventory_update': product_id = operation[1] # 在庫を元に戻す self.inventory[product_id] = transaction['rollback_data']['inventory'] print(f"在庫復元: {product_id} = {self.inventory[product_id]}個") elif operation_type == 'order_create': order_id = operation[1] # 注文を削除 if order_id in self.orders: del self.orders[order_id] print(f"注文削除: {order_id}") elif operation_type == 'user_update': user_id = operation[1] order_id = operation[2] # ユーザーの注文履歴から削除 if user_id in self.users and order_id in self.users[user_id]['orders']: self.users[user_id]['orders'].remove(order_id) print(f"ユーザー履歴復元: {user_id}") print("✅ ロールバック完了") def show_status(self): print("=== 現在の状態 ===") print(f"在庫: {self.inventory}") print(f"注文: {list(self.orders.keys())}") print(f"ユーザー: {self.users}") print("================")
# 原子性のデモdemo = AtomicityDemo()
print("=== 原子性デモ ===")demo.show_status()
# 成功ケースtry: demo.create_order_atomic('user_1', 'product_1', 5)except: pass
demo.show_status()
# 失敗ケース(在庫不足)try: demo.create_order_atomic('user_2', 'product_1', 200)except: pass
demo.show_status()
Consistency(一貫性)
データベースの整合性制約が常に満たされる状態を保ちます。
// 一貫性の実装例class ConsistencyDemo { constructor() { this.accounts = new Map([ ['savings', { balance: 100000, type: 'savings' }], ['checking', { balance: 50000, type: 'checking' }] ]); // 整合性ルール this.consistencyRules = { // 残高は0以上でなければならない nonNegativeBalance: (account) => account.balance >= 0, // 普通預金口座は10万円以下でなければならない savingsLimit: (account) => { if (account.type === 'savings') { return account.balance <= 100000; } return true; }, // 当座預金口座は最低1万円は残す必要がある checkingMinimum: (account) => { if (account.type === 'checking') { return account.balance >= 10000; } return true; } }; } validateConsistency() { """全てのデータが整合性制約を満たしているかチェック""" const violations = []; for (const [accountId, account] of this.accounts) { for (const [ruleName, rule] of Object.entries(this.consistencyRules)) { if (!rule(account)) { violations.push({ account: accountId, rule: ruleName, current_balance: account.balance, type: account.type }); } } } return violations; } transferWithConsistency(fromAccount, toAccount, amount) { """整合性を保った送金処理""" console.log(`送金処理: ${fromAccount} → ${toAccount} (${amount}円)`); // 事前の整合性チェック let violations = this.validateConsistency(); if (violations.length > 0) { throw new Error('事前整合性チェック失敗: ' + JSON.stringify(violations)); } // 送金処理のシミュレーション const sender = this.accounts.get(fromAccount); const receiver = this.accounts.get(toAccount); // 一時的に変更を適用 const originalSenderBalance = sender.balance; const originalReceiverBalance = receiver.balance; sender.balance -= amount; receiver.balance += amount; console.log(`仮適用後: ${fromAccount}=${sender.balance}円, ${toAccount}=${receiver.balance}円`); // 事後の整合性チェック violations = this.validateConsistency(); if (violations.length > 0) { // 整合性違反が発生した場合はロールバック console.log('❌ 整合性違反検出:', violations); sender.balance = originalSenderBalance; receiver.balance = originalReceiverBalance; console.log('🔄 ロールバック完了'); throw new Error('整合性違反のため送金を取り消しました'); } console.log('✅ 送金完了(整合性保持)'); } showBalances() { console.log('=== 口座残高 ==='); for (const [accountId, account] of this.accounts) { console.log(`${accountId} (${account.type}): ${account.balance}円`); } const violations = this.validateConsistency(); if (violations.length > 0) { console.log('⚠️ 整合性違反:', violations); } else { console.log('✅ 整合性OK'); } console.log('==============='); }}
// 一貫性のデモconst consistencyDemo = new ConsistencyDemo();
console.log('=== 一貫性デモ ===');consistencyDemo.showBalances();
// 成功ケースtry { consistencyDemo.transferWithConsistency('savings', 'checking', 20000);} catch (error) { console.log('送金失敗:', error.message);}
consistencyDemo.showBalances();
// 失敗ケース(普通預金の上限超過)try { consistencyDemo.transferWithConsistency('checking', 'savings', 30000);} catch (error) { console.log('送金失敗:', error.message);}
consistencyDemo.showBalances();
Isolation(分離性)
複数のトランザクションが同時実行されても、互いに影響を与えないようにします。
# 分離性の実装例import threadingimport timeimport random
class IsolationDemo: def __init__(self): self.account_balance = 100000 self.lock = threading.Lock() # 排他制御用 self.transaction_log = [] def transfer_without_isolation(self, transaction_id, amount): """分離性のない危険な処理""" print(f"[{transaction_id}] 処理開始 - 金額: {amount}円") # 残高確認 current_balance = self.account_balance print(f"[{transaction_id}] 現在残高: {current_balance}円") # 処理時間をシミュレート time.sleep(random.uniform(0.1, 0.3)) # 残高が十分かチェック if current_balance >= amount: # 他のトランザクションがここで残高を変更する可能性がある time.sleep(random.uniform(0.1, 0.3)) # 引き落とし実行 self.account_balance = current_balance - amount print(f"[{transaction_id}] 引き落とし完了: {self.account_balance}円") self.transaction_log.append(f"{transaction_id}: -{amount}円") else: print(f"[{transaction_id}] 残高不足") def transfer_with_isolation(self, transaction_id, amount): """分離性を保った安全な処理""" print(f"[{transaction_id}] 安全な処理開始 - 金額: {amount}円") # ロックを取得して排他制御 with self.lock: print(f"[{transaction_id}] ロック取得") # 残高確認 current_balance = self.account_balance print(f"[{transaction_id}] 現在残高: {current_balance}円") # 処理時間をシミュレート time.sleep(random.uniform(0.1, 0.3)) if current_balance >= amount: self.account_balance = current_balance - amount print(f"[{transaction_id}] 引き落とし完了: {self.account_balance}円") self.transaction_log.append(f"{transaction_id}: -{amount}円") else: print(f"[{transaction_id}] 残高不足") print(f"[{transaction_id}] ロック解放") def reset(self): self.account_balance = 100000 self.transaction_log = [] def run_concurrent_transactions(self, use_isolation=True): """並行トランザクションの実行""" self.reset() print(f"=== 並行処理テスト(分離性: {'有' if use_isolation else '無'})===") print(f"初期残高: {self.account_balance}円") # 複数のトランザクションを並行実行 threads = [] transactions = [ ('T1', 30000), ('T2', 40000), ('T3', 35000) ] for tx_id, amount in transactions: if use_isolation: thread = threading.Thread( target=self.transfer_with_isolation, args=(tx_id, amount) ) else: thread = threading.Thread( target=self.transfer_without_isolation, args=(tx_id, amount) ) threads.append(thread) # 全スレッド開始 for thread in threads: thread.start() # 全スレッド終了まで待機 for thread in threads: thread.join() print(f"最終残高: {self.account_balance}円") print("取引履歴:", self.transaction_log) # 整合性チェック total_withdrawn = sum(int(log.split(': -')[1].replace('円', '')) for log in self.transaction_log) expected_balance = 100000 - total_withdrawn print(f"期待残高: {expected_balance}円") if self.account_balance == expected_balance: print("✅ 整合性OK") else: print("❌ 整合性エラー!")
# 分離性のデモisolation_demo = IsolationDemo()
# 分離性なしのテストisolation_demo.run_concurrent_transactions(use_isolation=False)
# 分離性ありのテストisolation_demo.run_concurrent_transactions(use_isolation=True)
Durability(永続性)
コミットされたトランザクションの結果は、システム障害が発生しても失われません。
// 永続性の実装例class DurabilityDemo { constructor() { this.database = new Map(); this.transactionLog = []; this.commitLog = []; } // トランザクションログへの書き込み writeToTransactionLog(operation) { const logEntry = { timestamp: new Date().toISOString(), operation: operation, status: 'logged' }; // 永続化前にログに記録 this.transactionLog.push(logEntry); console.log(`📝 トランザクションログ記録: ${JSON.stringify(operation)}`); return logEntry; } // データベースへの永続化 persistToDatabase(key, value) { this.database.set(key, value); console.log(`💾 データベースに永続化: ${key} = ${JSON.stringify(value)}`); } // コミットログへの記録 writeToCommitLog(transactionId) { const commitEntry = { transactionId: transactionId, timestamp: new Date().toISOString(), status: 'committed' }; this.commitLog.push(commitEntry); console.log(`✅ コミットログ記録: Transaction ${transactionId}`); } // 永続性を保ったトランザクション処理 durableTransaction(transactionId, operations) { console.log(`=== Transaction ${transactionId} 開始 ===`); try { const logEntries = []; // 1. 全ての操作をトランザクションログに記録 for (const operation of operations) { const logEntry = this.writeToTransactionLog(operation); logEntries.push(logEntry); } // 2. ログがディスクに書き込まれたことを確認 this.flushToDisk('transaction_log'); // 3. 実際のデータ更新を実行 for (const operation of operations) { this.executeOperation(operation); } // 4. コミットログに記録 this.writeToCommitLog(transactionId); this.flushToDisk('commit_log'); // 5. トランザクションログエントリを完了としてマーク for (const logEntry of logEntries) { logEntry.status = 'committed'; } console.log(`✅ Transaction ${transactionId} 完了`); } catch (error) { console.log(`❌ Transaction ${transactionId} 失敗: ${error.message}`); this.rollbackTransaction(transactionId); throw error; } } executeOperation(operation) { switch (operation.type) { case 'insert': this.persistToDatabase(operation.key, operation.value); break; case 'update': const existing = this.database.get(operation.key); if (existing) { this.persistToDatabase(operation.key, { ...existing, ...operation.value }); } break; case 'delete': this.database.delete(operation.key); console.log(`🗑️ データ削除: ${operation.key}`); break; default: throw new Error(`Unknown operation type: ${operation.type}`); } } // ディスクへの強制書き込み(永続化) flushToDisk(logType) { // 実際の実装では fsync() などを使用 console.log(`💿 ${logType} をディスクに強制書き込み`); } // システム障害からの復旧 recoverFromCrash() { console.log('🔧 システム障害から復旧中...'); // コミットログをチェック const committedTransactions = new Set( this.commitLog.map(entry => entry.transactionId) ); // トランザクションログから復旧 for (const logEntry of this.transactionLog) { const operation = logEntry.operation; if (committedTransactions.has(operation.transactionId)) { // コミット済みの操作は再実行 if (logEntry.status !== 'committed') { console.log(`🔄 操作を再実行: ${JSON.stringify(operation)}`); this.executeOperation(operation); logEntry.status = 'committed'; } } else { // 未コミットの操作は破棄 console.log(`🗑️ 未コミット操作を破棄: ${JSON.stringify(operation)}`); } } console.log('✅ 復旧完了'); } // システム障害のシミュレート simulateSystemCrash() { console.log('💥 システム障害発生!'); // 実際の障害では、メモリ上のデータは失われるが、 // ディスクに永続化されたログは残る } showDatabaseState() { console.log('=== データベース状態 ==='); for (const [key, value] of this.database) { console.log(`${key}: ${JSON.stringify(value)}`); } console.log(`トランザクションログ: ${this.transactionLog.length}件`); console.log(`コミットログ: ${this.commitLog.length}件`); console.log('===================='); } rollbackTransaction(transactionId) { console.log(`🔄 Transaction ${transactionId} をロールバック`); // ロールバック処理の実装 }}
// 永続性のデモconst durabilityDemo = new DurabilityDemo();
console.log('=== 永続性デモ ===');
// ユーザー作成トランザクションconst userOperations = [ { transactionId: 'T001', type: 'insert', key: 'user_001', value: { name: '田中太郎', email: 'tanaka@example.com' } }, { transactionId: 'T001', type: 'insert', key: 'profile_001', value: { userId: 'user_001', age: 30, city: '東京' } }];
durabilityDemo.durableTransaction('T001', userOperations);durabilityDemo.showDatabaseState();
// システム障害をシミュレートdurabilityDemo.simulateSystemCrash();
// 復旧処理durabilityDemo.recoverFromCrash();durabilityDemo.showDatabaseState();
ACID特性により、トランザクションは信頼性の高いデータ処理を実現します。
実装パターンと実例
実際のプログラミングでトランザクションを実装する際の代表的なパターンを紹介します。
データベーストランザクション
最も一般的なSQL Database でのトランザクション実装です。
# SQLでのトランザクション実装例import sqlite3import random
class DatabaseTransactionDemo: def __init__(self, db_name=':memory:'): self.db_name = db_name self.setup_database() def setup_database(self): """データベースセットアップ""" with sqlite3.connect(self.db_name) as conn: cursor = conn.cursor() # テーブル作成 cursor.execute(''' CREATE TABLE IF NOT EXISTS accounts ( id TEXT PRIMARY KEY, name TEXT NOT NULL, balance DECIMAL(10,2) NOT NULL CHECK (balance >= 0) ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS transactions ( id INTEGER PRIMARY KEY AUTOINCREMENT, from_account TEXT, to_account TEXT, amount DECIMAL(10,2), timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (from_account) REFERENCES accounts (id), FOREIGN KEY (to_account) REFERENCES accounts (id) ) ''') # 初期データ cursor.executemany(''' INSERT OR REPLACE INTO accounts (id, name, balance) VALUES (?, ?, ?) ''', [ ('ACC001', '田中太郎', 100000), ('ACC002', '佐藤花子', 50000), ('ACC003', '鈴木一郎', 75000) ]) conn.commit() print("✅ データベースセットアップ完了") def transfer_money(self, from_account, to_account, amount): """トランザクションを使った送金処理""" print(f"💰 送金処理: {from_account} → {to_account} ({amount}円)") conn = sqlite3.connect(self.db_name) conn.execute('PRAGMA foreign_keys = ON') # 外部キー制約を有効化 try: # トランザクション開始(autocommit=False がデフォルト) cursor = conn.cursor() # 1. 送金者の残高確認と引き落とし cursor.execute(''' SELECT balance FROM accounts WHERE id = ? ''', (from_account,)) result = cursor.fetchone() if not result: raise Exception(f"送金者アカウント {from_account} が見つかりません") sender_balance = result[0] if sender_balance < amount: raise Exception(f"残高不足: {sender_balance}円 < {amount}円") # 送金者から引き落とし cursor.execute(''' UPDATE accounts SET balance = balance - ? WHERE id = ? ''', (amount, from_account)) print(f" 📤 {from_account}: {amount}円引き落とし") # エラーシミュレーション if random.random() < 0.2: # 20%の確率でエラー raise Exception("システムエラーが発生しました") # 2. 受取人の口座に入金 cursor.execute(''' SELECT id FROM accounts WHERE id = ? ''', (to_account,)) if not cursor.fetchone(): raise Exception(f"受取人アカウント {to_account} が見つかりません") cursor.execute(''' UPDATE accounts SET balance = balance + ? WHERE id = ? ''', (amount, to_account)) print(f" 📥 {to_account}: {amount}円入金") # 3. 取引履歴を記録 cursor.execute(''' INSERT INTO transactions (from_account, to_account, amount) VALUES (?, ?, ?) ''', (from_account, to_account, amount)) print(f" 📝 取引履歴記録") # 全ての操作が成功した場合のみコミット conn.commit() print(" ✅ 送金完了(コミット)") except Exception as e: # エラーが発生した場合はロールバック conn.rollback() print(f" ❌ エラー発生: {e}") print(" 🔄 全ての変更をロールバック") raise e finally: conn.close() def show_accounts(self): """アカウント残高表示""" with sqlite3.connect(self.db_name) as conn: cursor = conn.cursor() cursor.execute('SELECT id, name, balance FROM accounts ORDER BY id') print("=== アカウント残高 ===") for account_id, name, balance in cursor.fetchall(): print(f"{account_id} ({name}): {balance:,.0f}円") print("==================") def show_transaction_history(self): """取引履歴表示""" with sqlite3.connect(self.db_name) as conn: cursor = conn.cursor() cursor.execute(''' SELECT from_account, to_account, amount, timestamp FROM transactions ORDER BY timestamp DESC LIMIT 10 ''') print("=== 取引履歴 ===") for from_acc, to_acc, amount, timestamp in cursor.fetchall(): print(f"{timestamp}: {from_acc} → {to_acc} ({amount:,.0f}円)") print("===============") def test_concurrent_transactions(self): """並行トランザクションのテスト""" import threading def concurrent_transfer(tx_id, from_acc, to_acc, amount): try: print(f"[T{tx_id}] 開始") self.transfer_money(from_acc, to_acc, amount) print(f"[T{tx_id}] 成功") except Exception as e: print(f"[T{tx_id}] 失敗: {e}") print("=== 並行トランザクションテスト ===") self.show_accounts() # 複数のトランザクションを並行実行 threads = [ threading.Thread(target=concurrent_transfer, args=(1, 'ACC001', 'ACC002', 15000)), threading.Thread(target=concurrent_transfer, args=(2, 'ACC002', 'ACC003', 20000)), threading.Thread(target=concurrent_transfer, args=(3, 'ACC003', 'ACC001', 10000)) ] for thread in threads: thread.start() for thread in threads: thread.join() print("=== 並行処理完了 ===") self.show_accounts() self.show_transaction_history()
# データベーストランザクションのデモdb_demo = DatabaseTransactionDemo()
print("=== データベーストランザクションデモ ===")db_demo.show_accounts()
# 通常の送金try: db_demo.transfer_money('ACC001', 'ACC002', 25000)except: pass
db_demo.show_accounts()
# エラーが発生する可能性のある送金(複数回試行)for i in range(3): try: db_demo.transfer_money('ACC002', 'ACC003', 15000) break # 成功したら終了 except: continue # 失敗したら再試行
db_demo.show_accounts()db_demo.show_transaction_history()
# 並行処理テストdb_demo.test_concurrent_transactions()
アプリケーション層トランザクション
データベース以外のリソースも含めたトランザクション管理です。
// アプリケーション層トランザクション例class ApplicationTransaction { constructor() { this.operations = []; this.compensations = []; this.status = 'pending'; this.id = this.generateTransactionId(); } generateTransactionId() { return `tx_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; } // 操作とその補償操作を追加 addOperation(operation, compensation) { this.operations.push(operation); this.compensations.unshift(compensation); // 逆順で実行するため } async execute() { console.log(`🚀 トランザクション ${this.id} 開始`); const executedOperations = []; try { // 全ての操作を順次実行 for (let i = 0; i < this.operations.length; i++) { const operation = this.operations[i]; console.log(` 📋 操作 ${i + 1}: ${operation.description}`); await operation.execute(); executedOperations.push(i); console.log(` ✅ 操作 ${i + 1} 完了`); } // 全ての操作が成功 this.status = 'committed'; console.log(`✅ トランザクション ${this.id} コミット`); } catch (error) { console.log(`❌ エラー発生: ${error.message}`); // 実行済みの操作を逆順で補償 console.log(`🔄 補償処理開始...`); for (let i = executedOperations.length - 1; i >= 0; i--) { const operationIndex = executedOperations[i]; const compensation = this.compensations[operationIndex]; try { console.log(` 🔄 補償 ${operationIndex + 1}: ${compensation.description}`); await compensation.execute(); console.log(` ✅ 補償 ${operationIndex + 1} 完了`); } catch (compensationError) { console.log(` ❌ 補償失敗: ${compensationError.message}`); // 補償失敗は深刻な問題 } } this.status = 'aborted'; console.log(`🔄 トランザクション ${this.id} ロールバック`); throw error; } }}
// 各種サービスの実装class InventoryService { constructor() { this.inventory = new Map([ ['product_1', 100], ['product_2', 50] ]); } async reserveStock(productId, quantity) { if (!this.inventory.has(productId)) { throw new Error(`商品 ${productId} が見つかりません`); } const current = this.inventory.get(productId); if (current < quantity) { throw new Error(`在庫不足: ${productId} (在庫: ${current}, 必要: ${quantity})`); } // 在庫を減らす this.inventory.set(productId, current - quantity); console.log(` 📦 在庫確保: ${productId} × ${quantity} (残り: ${current - quantity})`); } async releaseStock(productId, quantity) { const current = this.inventory.get(productId) || 0; this.inventory.set(productId, current + quantity); console.log(` 📦 在庫復旧: ${productId} × ${quantity} (合計: ${current + quantity})`); } getStock(productId) { return this.inventory.get(productId) || 0; }}
class PaymentService { constructor() { this.accounts = new Map([ ['user_1', 50000], ['user_2', 30000] ]); this.payments = new Map(); } async processPayment(userId, amount) { if (!this.accounts.has(userId)) { throw new Error(`ユーザー ${userId} が見つかりません`); } const balance = this.accounts.get(userId); if (balance < amount) { throw new Error(`残高不足: ${userId} (残高: ${balance}, 必要: ${amount})`); } // 支払い処理 const paymentId = `pay_${Date.now()}`; this.accounts.set(userId, balance - amount); this.payments.set(paymentId, { userId, amount, status: 'completed' }); console.log(` 💳 支払い処理: ${userId} から ${amount}円 (ID: ${paymentId})`); return paymentId; } async refundPayment(paymentId) { if (!this.payments.has(paymentId)) { console.log(` ⚠️ 支払い ${paymentId} が見つかりません(既に取消済み?)`); return; } const payment = this.payments.get(paymentId); const currentBalance = this.accounts.get(payment.userId); this.accounts.set(payment.userId, currentBalance + payment.amount); this.payments.delete(paymentId); console.log(` 💳 返金処理: ${payment.userId} に ${payment.amount}円返金 (ID: ${paymentId})`); }}
class OrderService { constructor() { this.orders = new Map(); } async createOrder(orderId, userId, items) { if (this.orders.has(orderId)) { throw new Error(`注文 ${orderId} は既に存在します`); } const order = { id: orderId, userId: userId, items: items, status: 'created', createdAt: new Date() }; this.orders.set(orderId, order); console.log(` 📋 注文作成: ${orderId} (ユーザー: ${userId})`); return order; } async cancelOrder(orderId) { if (this.orders.has(orderId)) { this.orders.delete(orderId); console.log(` 📋 注文取消: ${orderId}`); } }}
// 統合注文処理システムclass OrderProcessingSystem { constructor() { this.inventoryService = new InventoryService(); this.paymentService = new PaymentService(); this.orderService = new OrderService(); } async processOrder(userId, items, totalAmount) { const orderId = `order_${Date.now()}`; console.log(`🛒 注文処理開始: ${orderId}`); const transaction = new ApplicationTransaction(); let paymentId = null; // 1. 在庫確保操作 for (const item of items) { transaction.addOperation( { description: `在庫確保: ${item.productId} × ${item.quantity}`, execute: async () => { await this.inventoryService.reserveStock(item.productId, item.quantity); } }, { description: `在庫復旧: ${item.productId} × ${item.quantity}`, execute: async () => { await this.inventoryService.releaseStock(item.productId, item.quantity); } } ); } // 2. 支払い処理操作 transaction.addOperation( { description: `支払い処理: ${userId} - ${totalAmount}円`, execute: async () => { paymentId = await this.paymentService.processPayment(userId, totalAmount); } }, { description: `返金処理: ${paymentId}`, execute: async () => { if (paymentId) { await this.paymentService.refundPayment(paymentId); } } } ); // 3. 注文作成操作 transaction.addOperation( { description: `注文作成: ${orderId}`, execute: async () => { await this.orderService.createOrder(orderId, userId, items); } }, { description: `注文取消: ${orderId}`, execute: async () => { await this.orderService.cancelOrder(orderId); } } ); // トランザクション実行 await transaction.execute(); return orderId; } showSystemState() { console.log('=== システム状態 ==='); console.log('在庫:'); for (const [productId, stock] of this.inventoryService.inventory) { console.log(` ${productId}: ${stock}個`); } console.log('アカウント残高:'); for (const [userId, balance] of this.paymentService.accounts) { console.log(` ${userId}: ${balance}円`); } console.log(`注文数: ${this.orderService.orders.size}件`); console.log('=================='); }}
// アプリケーション層トランザクションのデモasync function runApplicationTransactionDemo() { console.log('=== アプリケーション層トランザクションデモ ==='); const orderSystem = new OrderProcessingSystem(); orderSystem.showSystemState(); // 成功ケース try { const orderId = await orderSystem.processOrder('user_1', [ { productId: 'product_1', quantity: 5 }, { productId: 'product_2', quantity: 2 } ], 15000); console.log(`🎉 注文成功: ${orderId}`); } catch (error) { console.log(`❌ 注文失敗: ${error.message}`); } orderSystem.showSystemState(); // 失敗ケース(在庫不足) try { const orderId = await orderSystem.processOrder('user_2', [ { productId: 'product_1', quantity: 200 } // 在庫不足 ], 25000); console.log(`🎉 注文成功: ${orderId}`); } catch (error) { console.log(`❌ 注文失敗: ${error.message}`); } orderSystem.showSystemState();}
// デモ実行runApplicationTransactionDemo().catch(console.error);
このようにアプリケーション層でトランザクションを実装することで、複数のサービスにまたがる処理の整合性を保つことができます。
まとめ
トランザクションは、プログラミングにおいてデータの整合性と信頼性を保つための重要な仕組みです。 特に金融系やECサイトなど、データの正確性が要求されるシステムでは必須の技術です。
重要なポイントを整理しましょう。
- 基本概念: 「All or Nothing」の原則でデータ整合性を保つ
- ACID特性: 原子性、一貫性、分離性、永続性の4つの性質
- 実装レベル: データベース層とアプリケーション層での異なるアプローチ
- エラーハンドリング: 適切な例外処理とロールバック機能
- 並行処理: 複数のトランザクションが同時実行される際の制御
トランザクションの概念を理解することで、より安全で信頼性の高いプログラムを作ることができるようになります。 初心者のうちから意識することで、将来的により高品質なシステムを設計できるエンジニアになれるでしょう。
ぜひ、この記事で学んだ知識を実際のプロジェクトで活用し、データの整合性を重視したプログラミングを心がけてください。