connect($redisHost, $redisPort); // 그룹 생성 (이미 있으면 예외 발생하므로 무시) try { $redis->xGroup('CREATE', $stream, $group, '$'); } catch (Exception $e) {} // PDO 연결 $dsn = sprintf('mysql:host=%s;dbname=%s;charset=utf8mb4', getenv('DB_HOST'), getenv('DB_NAME')); $pdo = new PDO($dsn, getenv('DB_USER'), getenv('DB_PASS'), [ PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION, PDO::ATTR_EMULATE_PREPARES => false, ]); while (true) { $res = $redis->xReadGroup($group, $consumer, [$stream => '>'], $batchSize, 5000); if (!$res) { // 대기 후 루프 continue; } $entries = $res[$stream]; $ids = []; $values = []; foreach ($entries as $id => $fields) { $ids[] = $id; $articleNumbr = $fields['articleNumbr'] ?? ''; $reqeustType = $fields['reqeustType'] ?? ''; $requestDatetime = $fields['requestDatetime'] ?? ''; $values[] = [ 'articleNumbr' => $articleNumbr, 'reqeustType' => $reqeustType, 'requestDatetime' => $requestDatetime ]; } if (count($values) === 0) { continue; } // 벌크 인서트 준비 (prepared statements) $placeholders = []; $params = []; foreach ($values as $v) { $placeholders[] = "(?, ?, ?)"; $params[] = $v['articleNumbr']; $params[] = $v['reqeustType']; $params[] = $v['requestDatetime']; } $sql = "INSERT INTO vrfc_requests (articleNumbr, reqeustType, requestDatetime) VALUES " . implode(',', $placeholders); try { $pdo->beginTransaction(); $stmt = $pdo->prepare($sql); $stmt->execute($params); $pdo->commit(); // ACK 및 삭제 foreach ($ids as $id) { $redis->xAck($stream, $group, $id); $redis->xDel($stream, $id); } } catch (Exception $e) { $pdo->rollBack(); error_log("DB insert error: " . $e->getMessage()); // 실패 항목을 실패 스트림으로 이동 foreach ($entries as $id => $fields) { $redis->xAdd('vrfc_failures', '*', 'id', $id, 'payload', json_encode($fields)); $redis->xAck($stream, $group, $id); $redis->xDel($stream, $id); } // 잠깐 대기 sleep(2); } }