worker service 매물 등록 및 redis 다운시 처리
This commit is contained in:
@@ -23,136 +23,155 @@ class NaverWorker extends BaseCommand
|
||||
|
||||
public function run(array $params)
|
||||
{
|
||||
helper('log'); // 여기서 로드 완료!
|
||||
helper(['log', 'redis']); // redis helper 추가
|
||||
|
||||
$this->db = \Config\Database::connect();
|
||||
$logModel = model(NaverWorkerLogModel::class);
|
||||
$naverService = new \App\Services\NaverService(); // 서비스 생성
|
||||
|
||||
$redis = new \Redis();
|
||||
try {
|
||||
// 두 가지 환경 변수 형식 지원 (REDIS_HOST 또는 redis.default.host)
|
||||
$this->redisHost = getenv('REDIS_HOST') ?: (getenv('redis.default.host') ?: '127.0.0.1');
|
||||
$this->redisPort = getenv('REDIS_PORT') ?: (getenv('redis.default.port') ?: 6379);
|
||||
$this->redisDatabase = getenv('REDIS_DATABASE') ?: (getenv('redis.default.database') ?: 9);
|
||||
|
||||
$redis->connect($this->redisHost, (int)$this->redisPort);
|
||||
$redis->select((int)$this->redisDatabase);
|
||||
CLI::write(CLI::color('🟢 Naver Worker running... (Redis: ' . $this->redisHost . ':' . $this->redisPort . ' DB:' . $this->redisDatabase . ')', 'green'));
|
||||
} catch (\Exception $e) {
|
||||
CLI::error("Redis 연결 불가: " . $e->getMessage());
|
||||
return;
|
||||
// Redis 연결 (실패해도 계속 진행 - 파일 모드로 동작 가능)
|
||||
$redis = get_redis_connection('worker');
|
||||
$config = get_redis_config('worker');
|
||||
|
||||
if ($redis) {
|
||||
CLI::write(CLI::color('🟢 Naver Worker running... (Redis: ' . $config['host'] . ':' . $config['port'] . ' DB:' . $config['database'] . ')', 'green'));
|
||||
} else {
|
||||
CLI::write(CLI::color('⚠️ Naver Worker running in FILE-ONLY mode (Redis unavailable)', 'yellow'));
|
||||
}
|
||||
|
||||
|
||||
|
||||
while (true) {
|
||||
|
||||
// Redis brPop with retry logic
|
||||
$maxRetries = 3;
|
||||
$retryCount = 0;
|
||||
$result = null;
|
||||
|
||||
while ($retryCount < $maxRetries) {
|
||||
try {
|
||||
$result = $redis->brPop(['naver:raw_queue'], 30);
|
||||
break; // 성공하면 루프 탈출
|
||||
} catch (\RedisException $e) {
|
||||
$retryCount++;
|
||||
CLI::write(CLI::color("⚠️ Redis error (attempt {$retryCount}/{$maxRetries}): " . $e->getMessage(), 'yellow'));
|
||||
|
||||
if ($retryCount >= $maxRetries) {
|
||||
CLI::error("❌ Redis reconnection failed after {$maxRetries} attempts");
|
||||
break;
|
||||
}
|
||||
|
||||
// Redis 재연결 시도
|
||||
// Redis 또는 폴백 파일에서 데이터 읽기
|
||||
$rawData = null;
|
||||
$source = 'redis'; // 데이터 소스 추적
|
||||
|
||||
// 1. Redis에서 데이터 읽기 시도 (Redis가 있을 경우만)
|
||||
if ($redis) {
|
||||
$maxRetries = 2;
|
||||
$retryCount = 0;
|
||||
|
||||
while ($retryCount < $maxRetries) {
|
||||
try {
|
||||
CLI::write(CLI::color('🔄 Reconnecting to Redis...', 'yellow'));
|
||||
$redis->close();
|
||||
$redis->connect($this->redisHost, (int)$this->redisPort);
|
||||
$redis->select((int)$this->redisDatabase);
|
||||
CLI::write(CLI::color('✅ Redis reconnected', 'green'));
|
||||
} catch (\Exception $reconnectError) {
|
||||
CLI::error("Redis reconnection error: " . $reconnectError->getMessage());
|
||||
sleep(2); // 재연결 실패 시 잠시 대기
|
||||
$result = $redis->brPop(['naver:raw_queue'], 5); // 5초 타임아웃
|
||||
if ($result) {
|
||||
$rawData = $result[1];
|
||||
$source = 'redis';
|
||||
}
|
||||
break; // 성공하면 루프 탈출
|
||||
} catch (\Exception $e) {
|
||||
$retryCount++;
|
||||
CLI::write(CLI::color("⚠️ Redis error (attempt {$retryCount}/{$maxRetries}): " . $e->getMessage(), 'yellow'));
|
||||
|
||||
if ($retryCount >= $maxRetries) {
|
||||
CLI::write(CLI::color("⚠️ Redis unavailable, switching to file mode", 'yellow'));
|
||||
$redis = null; // Redis를 비활성화
|
||||
break;
|
||||
}
|
||||
|
||||
// Redis 재연결 시도
|
||||
try {
|
||||
CLI::write(CLI::color('🔄 Reconnecting to Redis...', 'yellow'));
|
||||
$redis->close();
|
||||
$redis = get_redis_connection('worker');
|
||||
if ($redis) {
|
||||
CLI::write(CLI::color('✅ Redis reconnected', 'green'));
|
||||
}
|
||||
} catch (\Exception $reconnectError) {
|
||||
CLI::error("Redis reconnection error: " . $reconnectError->getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!$result) {
|
||||
// 데이터가 없어서 타임아웃 난 경우.
|
||||
// 굳이 sleep 안 해도 바로 다음 brPop이 다시 30초 대기를 시작함.
|
||||
// 2. Redis에서 데이터 없으면 폴백 파일 확인
|
||||
if (!$rawData) {
|
||||
$rawData = $this->readFromFallbackFile();
|
||||
if ($rawData) {
|
||||
$source = 'file';
|
||||
}
|
||||
}
|
||||
|
||||
// 3. 데이터 없으면 다음 루프
|
||||
if (!$rawData) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if ($result) {
|
||||
$rawData = $result[1];
|
||||
// 4. 데이터 소스 로깅
|
||||
CLI::write(CLI::color("📥 Data received from: " . strtoupper($source), 'cyan'));
|
||||
|
||||
// [1] 꺼내자마자 DB에 원문 저장 (2차 임시 저장) - 실패 시 재시도
|
||||
try {
|
||||
$logId = $logModel->insert([
|
||||
'raw_payload' => $rawData,
|
||||
'status' => 'INIT'
|
||||
]);
|
||||
} catch (\CodeIgniter\Database\Exceptions\DatabaseException $e) {
|
||||
// MySQL gone away 에러 시 재연결 후 재시도
|
||||
if (strpos($e->getMessage(), 'MySQL server has gone away') !== false) {
|
||||
CLI::write(CLI::color('⚠️ MySQL gone away, reconnecting...', 'yellow'));
|
||||
$this->db->close();
|
||||
$this->db = \Config\Database::connect();
|
||||
$logModel = model(NaverWorkerLogModel::class);
|
||||
|
||||
// 재시도
|
||||
$logId = $logModel->insert([
|
||||
'raw_payload' => $rawData,
|
||||
'status' => 'INIT'
|
||||
]);
|
||||
} else {
|
||||
throw $e; // 다른 에러면 그대로 throw
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
$responseJson = json_decode($result[1], true);
|
||||
$payload = $responseJson['request_data'] ?? [];
|
||||
|
||||
if (empty($payload)) {
|
||||
throw new \Exception("빈 페이로드 데이터");
|
||||
}
|
||||
|
||||
// 서비스의 함수 하나로 모든 처리 완료
|
||||
$insertId = $naverService->processArticle($payload);
|
||||
|
||||
// [3] 성공 시 로그 업데이트 (재연결 처리 포함)
|
||||
$this->safeUpdateLog($logModel, $logId, [
|
||||
'atcl_no' => $payload['articleNumber'] ?? null,
|
||||
'status' => 'SUCCESS',
|
||||
'target_db_id' => $insertId
|
||||
]);
|
||||
// [1] 꺼내자마자 DB에 원문 저장 (2차 임시 저장) - 실패 시 재시도
|
||||
try {
|
||||
$logId = $logModel->insert([
|
||||
'raw_payload' => $rawData,
|
||||
'status' => 'INIT'
|
||||
]);
|
||||
} catch (\CodeIgniter\Database\Exceptions\DatabaseException $e) {
|
||||
// MySQL gone away 에러 시 재연결 후 재시도
|
||||
if (strpos($e->getMessage(), 'MySQL server has gone away') !== false) {
|
||||
CLI::write(CLI::color('⚠️ MySQL gone away, reconnecting...', 'yellow'));
|
||||
$this->db->close();
|
||||
$this->db = \Config\Database::connect();
|
||||
$logModel = model(NaverWorkerLogModel::class);
|
||||
|
||||
CLI::write("✅ Success! DB ID: $insertId", 'cyan');
|
||||
|
||||
} catch (\Exception $e) {
|
||||
CLI::error("❌ Task Failed: " . $e->getMessage());
|
||||
// 실패 로그는 여기서 남김
|
||||
// 1. DB 상태를 FAIL로 업데이트 (필수) (재연결 처리 포함)
|
||||
$this->safeUpdateLog($logModel, $logId, [
|
||||
'status' => 'FAIL',
|
||||
'error_msg' => $e->getMessage()
|
||||
// 재시도
|
||||
$logId = $logModel->insert([
|
||||
'raw_payload' => $rawData,
|
||||
'status' => 'INIT'
|
||||
]);
|
||||
|
||||
// 2. Redis 실패 큐에 백업 (선택 - 나중에 모아서 다시 던질 때 편함)
|
||||
$redis->lPush('naver:failed_queue', $rawData);
|
||||
helper('log');
|
||||
write_custom_log("FAILED_DATA | Error: " . $e->getMessage(), 'ERROR', 'failed');
|
||||
|
||||
// 루프 과부하 방지 (연속 에러 시)
|
||||
sleep(1);
|
||||
} else {
|
||||
throw $e; // 다른 에러면 그대로 throw
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
$responseJson = json_decode($rawData, true);
|
||||
$payload = $responseJson['request_data'] ?? [];
|
||||
|
||||
if (empty($payload)) {
|
||||
throw new \Exception("빈 페이로드 데이터");
|
||||
}
|
||||
|
||||
// 서비스의 함수 하나로 모든 처리 완료
|
||||
$insertId = $naverService->processArticle($payload);
|
||||
|
||||
// [3] 성공 시 로그 업데이트 (재연결 처리 포함)
|
||||
$this->safeUpdateLog($logModel, $logId, [
|
||||
'atcl_no' => $payload['articleNumber'] ?? null,
|
||||
'status' => 'SUCCESS',
|
||||
'target_db_id' => $insertId
|
||||
]);
|
||||
|
||||
CLI::write("✅ Success! DB ID: $insertId | Source: $source", 'cyan');
|
||||
|
||||
} catch (\Exception $e) {
|
||||
CLI::error("❌ Task Failed: " . $e->getMessage());
|
||||
// 실패 로그는 여기서 남김
|
||||
// 1. DB 상태를 FAIL로 업데이트 (필수) (재연결 처리 포함)
|
||||
$this->safeUpdateLog($logModel, $logId, [
|
||||
'status' => 'FAIL',
|
||||
'error_msg' => $e->getMessage()
|
||||
]);
|
||||
|
||||
// 2. Redis 실패 큐에 백업 (선택 - Redis가 있을 경우만)
|
||||
if ($redis) {
|
||||
try {
|
||||
$redis->lPush('naver:failed_queue', $rawData);
|
||||
} catch (\Exception $redisEx) {
|
||||
// Redis 실패 시에도 에러 처리하지 않음 (이미 DB에 FAIL 로그 남김)
|
||||
CLI::write(CLI::color('⚠️ Failed to push to failed_queue: ' . $redisEx->getMessage(), 'yellow'));
|
||||
}
|
||||
}
|
||||
|
||||
helper('log');
|
||||
write_custom_log("FAILED_DATA | Error: " . $e->getMessage(), 'ERROR', 'failed');
|
||||
|
||||
// 루프 과부하 방지 (연속 에러 시)
|
||||
sleep(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* MySQL gone away 에러 발생 시 재연결 후 재시도하는 안전한 update
|
||||
@@ -176,5 +195,68 @@ class NaverWorker extends BaseCommand
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 폴백 파일에서 데이터 읽기 (Redis 장애 시 파일에서 직접 처리)
|
||||
*
|
||||
* @return string|null JSON 데이터 또는 null
|
||||
*/
|
||||
protected function readFromFallbackFile()
|
||||
{
|
||||
$fallbackDir = ROOTPATH . 'worker/fallback_queue';
|
||||
|
||||
// 폴백 디렉토리가 없으면 null 반환
|
||||
if (!is_dir($fallbackDir)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// 폴백 파일 목록 가져오기 (오래된 순서대로)
|
||||
$files = glob($fallbackDir . '/*.json');
|
||||
|
||||
if (empty($files)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
sort($files); // 파일명(타임스탬프) 기준 정렬
|
||||
|
||||
// 가장 오래된 파일 하나 처리
|
||||
$filePath = $files[0];
|
||||
|
||||
try {
|
||||
// 파일 락을 사용하여 읽기 (동시 접근 방지)
|
||||
$fp = fopen($filePath, 'r');
|
||||
if (!$fp) {
|
||||
CLI::write(CLI::color("⚠️ Failed to open fallback file: " . basename($filePath), 'yellow'));
|
||||
return null;
|
||||
}
|
||||
|
||||
// 배타적 락 획득 시도
|
||||
if (!flock($fp, LOCK_EX | LOCK_NB)) {
|
||||
// 락 획득 실패 (다른 프로세스가 처리 중)
|
||||
fclose($fp);
|
||||
return null;
|
||||
}
|
||||
|
||||
// 파일 내용 읽기
|
||||
$content = stream_get_contents($fp);
|
||||
|
||||
// 파일 삭제 (처리 완료로 간주)
|
||||
flock($fp, LOCK_UN);
|
||||
fclose($fp);
|
||||
unlink($filePath);
|
||||
|
||||
CLI::write(CLI::color("📂 Processing fallback file: " . basename($filePath), 'green'));
|
||||
|
||||
return $content;
|
||||
|
||||
} catch (\Exception $e) {
|
||||
CLI::write(CLI::color("❌ Error reading fallback file " . basename($filePath) . ": " . $e->getMessage(), 'red'));
|
||||
if (isset($fp) && is_resource($fp)) {
|
||||
flock($fp, LOCK_UN);
|
||||
fclose($fp);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user