connect($redisHost, $redisPort); // 그룹 생성 (이미 있으면 예외 발생하므로 무시) try { $redis->xGroup('CREATE', $stream, $group, '$'); } catch (Exception $e) {} // 저장 경로가 없으면 생성 (권한 문제 주의) if (!is_dir($storagePath)) { mkdir($storagePath, 0777, true); } while (true) { // 1. Stream에서 메시지 읽기 $res = $redis->xReadGroup($group, $consumer, [$stream => '>'], $batchSize, 5000); if (!$res) { continue; } $entries = $res[$stream]; $ids = []; $processedCount = 0; foreach ($entries as $id => $fields) { $ids[] = $id; $articleNumbr = $fields['articleNumbr'] ?? ''; if (empty($articleNumbr)) { // 매물 번호가 없으면 실패 스트림으로 이동 후 다음 루프로 $fields['fail_reason'] = 'Missing articleNumbr'; $redis->xAdd('vrfc_failures', '*', $fields); $redis->xAck($stream, $group, $id); $redis->xDel($stream, $id); continue; } // 2. 파일 저장 로직 실행 // 파일 이름: 매물번호_요청시간.json $fileName = sprintf('%s_%s.json', $articleNumbr, $fields['requestDatetime']); $filePath = $storagePath . '/' . $fileName; // JSON 데이터 (stream fields를 그대로 사용) $jsonData = json_encode($fields, JSON_PRETTY_PRINT); try { // 파일에 JSON 데이터 쓰기 $result = file_put_contents($filePath, $jsonData); if ($result === false) { throw new Exception("File write failed for: {$filePath}"); } error_log("SUCCESS: Saved request for {$articleNumbr} to {$fileName}"); $processedCount++; } catch (Exception $e) { error_log("File save error: " . $e->getMessage()); // 3. 파일 저장 실패 시 실패 스트림으로 이동 (DLQ) $fields['fail_reason'] = $e->getMessage(); $redis->xAdd('vrfc_failures', '*', $fields); // 잠깐 대기 sleep(2); } // 4. 성공 및 실패 처리 후 ACK 및 삭제 $redis->xAck($stream, $group, $id); $redis->xDel($stream, $id); } }