receiver.php 파일 생성
This commit is contained in:
100
worker.php
100
worker.php
@@ -1,89 +1,35 @@
|
||||
<?php
|
||||
// worker.php
|
||||
set_time_limit(0);
|
||||
$redisHost = getenv('REDIS_HOST') ?: 'redis';
|
||||
$redisPort = getenv('REDIS_PORT') ?: 6379;
|
||||
|
||||
// --- 설정값 ---
|
||||
$stream = 'vrfc_requests';
|
||||
$group = 'vrfc_group';
|
||||
$consumer = 'php_worker_' . bin2hex(random_bytes(3));
|
||||
$batchSize = 500;
|
||||
// 저장할 기본 경로 (필수 환경 변수 추가 필요)
|
||||
$storagePath = getenv('STORAGE_PATH') ?: '/var/www/html/storage/requests';
|
||||
// ---------------
|
||||
$storagePath = getenv('STORAGE_PATH') ?: '/var/www/html/storage/requests';
|
||||
|
||||
$redis = new Redis();
|
||||
$redis->connect($redisHost, $redisPort);
|
||||
|
||||
// 그룹 생성 (이미 있으면 예외 발생하므로 무시)
|
||||
try {
|
||||
$redis->xGroup('CREATE', $stream, $group, '$');
|
||||
} catch (Exception $e) {}
|
||||
|
||||
// 저장 경로가 없으면 생성 (권한 문제 주의)
|
||||
if (!is_dir($storagePath)) {
|
||||
mkdir($storagePath, 0777, true);
|
||||
$redis->connect($redisHost, 6379);
|
||||
$redis->select(10);
|
||||
echo "[*] Connected to Redis. Waiting for data...\n";
|
||||
} catch (Exception $e) {
|
||||
die("Redis Connection Failed: " . $e->getMessage());
|
||||
}
|
||||
|
||||
|
||||
while (true) {
|
||||
// 1. Stream에서 메시지 읽기
|
||||
$res = $redis->xReadGroup($group, $consumer, [$stream => '>'], $batchSize, 5000);
|
||||
if (!$res) {
|
||||
continue;
|
||||
}
|
||||
// 1. 입구 리스트에서 데이터 꺼내기
|
||||
$item = $redis->brPop(['naver:queue'], 5);
|
||||
if (!$item) continue;
|
||||
|
||||
$jsonStr = $item[1];
|
||||
$data = json_decode($jsonStr, true);
|
||||
|
||||
$entries = $res[$stream];
|
||||
$ids = [];
|
||||
$processedCount = 0;
|
||||
|
||||
foreach ($entries as $id => $fields) {
|
||||
$ids[] = $id;
|
||||
$articleNumbr = $fields['articleNumbr'] ?? '';
|
||||
// 2. 파일 저장
|
||||
$fileName = sprintf('%s_%s.json', $data['articleNumber'], $data['requestDatetime']);
|
||||
$filePath = $storagePath . '/' . $fileName;
|
||||
|
||||
if (file_put_contents($filePath, $jsonStr) !== false) {
|
||||
echo "✅ [SUCCESS] File Saved: $fileName\n";
|
||||
|
||||
if (empty($articleNumbr)) {
|
||||
// 매물 번호가 없으면 실패 스트림으로 이동 후 다음 루프로
|
||||
$fields['fail_reason'] = 'Missing articleNumbr';
|
||||
$redis->xAdd('vrfc_failures', '*', $fields);
|
||||
$redis->xAck($stream, $group, $id);
|
||||
$redis->xDel($stream, $id);
|
||||
continue;
|
||||
}
|
||||
|
||||
// 2. 파일 저장 로직 실행
|
||||
|
||||
// 파일 이름: 매물번호_요청시간.json
|
||||
$fileName = sprintf('%s_%s.json', $articleNumbr, $fields['requestDatetime']);
|
||||
$filePath = $storagePath . '/' . $fileName;
|
||||
|
||||
// JSON 데이터 (stream fields를 그대로 사용)
|
||||
$jsonData = json_encode($fields, JSON_PRETTY_PRINT);
|
||||
|
||||
try {
|
||||
// 파일에 JSON 데이터 쓰기
|
||||
$result = file_put_contents($filePath, $jsonData);
|
||||
|
||||
if ($result === false) {
|
||||
throw new Exception("File write failed for: {$filePath}");
|
||||
}
|
||||
|
||||
error_log("SUCCESS: Saved request for {$articleNumbr} to {$fileName}");
|
||||
$processedCount++;
|
||||
|
||||
} catch (Exception $e) {
|
||||
error_log("File save error: " . $e->getMessage());
|
||||
|
||||
// 3. 파일 저장 실패 시 실패 스트림으로 이동 (DLQ)111
|
||||
$fields['fail_reason'] = $e->getMessage();
|
||||
$redis->xAdd('vrfc_failures', '*', $fields);
|
||||
|
||||
// 잠깐 대기
|
||||
sleep(2);
|
||||
}
|
||||
|
||||
// 4. 성공 및 실패 처리 후 ACK 및 삭제
|
||||
$redis->xAck($stream, $group, $id);
|
||||
$redis->xDel($stream, $id);
|
||||
// 3. 파일 저장 성공 시에만 전송 큐로 넘김 (중요!)
|
||||
$redis->lPush('naver:worker_queue', $jsonStr);
|
||||
} else {
|
||||
echo "❌ [ERROR] Failed to save file: $fileName\n";
|
||||
// 실패 시 다시 큐에 넣거나 로그 처리
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user