From 515a981cd02e314eefaa497ef71f33c82e92a6d6 Mon Sep 17 00:00:00 2001 From: jjstyle Date: Tue, 16 Dec 2025 17:58:37 +0900 Subject: [PATCH] first commit2 --- .history/worker_20251205190214.php | 85 ++++++++++++++++++++++++++++ .history/worker_20251216162000.php | 89 ++++++++++++++++++++++++++++++ .history/worker_20251216162106.php | 89 ++++++++++++++++++++++++++++++ .history/worker_20251216162320.php | 89 ++++++++++++++++++++++++++++++ composer.json | 13 +++++ worker.php | 89 ++++++++++++++++++++++++++++++ 6 files changed, 454 insertions(+) create mode 100644 .history/worker_20251205190214.php create mode 100644 .history/worker_20251216162000.php create mode 100644 .history/worker_20251216162106.php create mode 100644 .history/worker_20251216162320.php create mode 100644 composer.json create mode 100644 worker.php diff --git a/.history/worker_20251205190214.php b/.history/worker_20251205190214.php new file mode 100644 index 0000000..de4733f --- /dev/null +++ b/.history/worker_20251205190214.php @@ -0,0 +1,85 @@ +connect($redisHost, $redisPort); + +// 그룹 생성 (이미 있으면 예외 발생하므로 무시) +try { + $redis->xGroup('CREATE', $stream, $group, '$'); +} catch (Exception $e) {} + +// PDO 연결 +$dsn = sprintf('mysql:host=%s;dbname=%s;charset=utf8mb4', getenv('DB_HOST'), getenv('DB_NAME')); +$pdo = new PDO($dsn, getenv('DB_USER'), getenv('DB_PASS'), [ + PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION, + PDO::ATTR_EMULATE_PREPARES => false, +]); + +while (true) { + $res = $redis->xReadGroup($group, $consumer, [$stream => '>'], $batchSize, 5000); + if (!$res) { + // 대기 후 루프 + continue; + } + $entries = $res[$stream]; + $ids = []; + $values = []; + foreach ($entries as $id => $fields) { + $ids[] = $id; + $articleNumbr = $fields['articleNumbr'] ?? ''; + $reqeustType = $fields['reqeustType'] ?? ''; + $requestDatetime = $fields['requestDatetime'] ?? ''; + $values[] = [ + 'articleNumbr' => $articleNumbr, + 'reqeustType' => $reqeustType, + 'requestDatetime' => $requestDatetime + ]; + } + + if (count($values) === 0) { + continue; + } + + // 벌크 인서트 준비 (prepared statements) + $placeholders = []; + $params = []; + foreach ($values as $v) { + $placeholders[] = "(?, ?, ?)"; + $params[] = $v['articleNumbr']; + $params[] = $v['reqeustType']; + $params[] = $v['requestDatetime']; + } + $sql = "INSERT INTO vrfc_requests (articleNumbr, reqeustType, requestDatetime) VALUES " . implode(',', $placeholders); + + try { + $pdo->beginTransaction(); + $stmt = $pdo->prepare($sql); + $stmt->execute($params); + $pdo->commit(); + + // ACK 및 삭제 + foreach ($ids as $id) { + $redis->xAck($stream, $group, $id); + $redis->xDel($stream, $id); + } + } catch (Exception $e) { + $pdo->rollBack(); + error_log("DB insert error: " . $e->getMessage()); + // 실패 항목을 실패 스트림으로 이동 + foreach ($entries as $id => $fields) { + $redis->xAdd('vrfc_failures', '*', 'id', $id, 'payload', json_encode($fields)); + $redis->xAck($stream, $group, $id); + $redis->xDel($stream, $id); + } + // 잠깐 대기 + sleep(2); + } +} diff --git a/.history/worker_20251216162000.php b/.history/worker_20251216162000.php new file mode 100644 index 0000000..1c94e63 --- /dev/null +++ b/.history/worker_20251216162000.php @@ -0,0 +1,89 @@ +connect($redisHost, $redisPort); + +// 그룹 생성 (이미 있으면 예외 발생하므로 무시) +try { + $redis->xGroup('CREATE', $stream, $group, '$'); +} catch (Exception $e) {} + +// 저장 경로가 없으면 생성 (권한 문제 주의) +if (!is_dir($storagePath)) { + mkdir($storagePath, 0777, true); +} + + +while (true) { + // 1. Stream에서 메시지 읽기 + $res = $redis->xReadGroup($group, $consumer, [$stream => '>'], $batchSize, 5000); + if (!$res) { + continue; + } + + $entries = $res[$stream]; + $ids = []; + $processedCount = 0; + + foreach ($entries as $id => $fields) { + $ids[] = $id; + $articleNumbr = $fields['articleNumbr'] ?? ''; + + if (empty($articleNumbr)) { + // 매물 번호가 없으면 실패 스트림으로 이동 후 다음 루프로 + $fields['fail_reason'] = 'Missing articleNumbr'; + $redis->xAdd('vrfc_failures', '*', $fields); + $redis->xAck($stream, $group, $id); + $redis->xDel($stream, $id); + continue; + } + + // 2. 파일 저장 로직 실행 + + // 파일 이름: 매물번호_요청시간.json + $fileName = sprintf('%s_%s.json', $articleNumbr, $fields['requestDatetime']); + $filePath = $storagePath . '/' . $fileName; + + // JSON 데이터 (stream fields를 그대로 사용) + $jsonData = json_encode($fields, JSON_PRETTY_PRINT); + + try { + // 파일에 JSON 데이터 쓰기 + $result = file_put_contents($filePath, $jsonData); + + if ($result === false) { + throw new Exception("File write failed for: {$filePath}"); + } + + error_log("SUCCESS: Saved request for {$articleNumbr} to {$fileName}"); + $processedCount++; + + } catch (Exception $e) { + error_log("File save error: " . $e->getMessage()); + + // 3. 파일 저장 실패 시 실패 스트림으로 이동 (DLQ) + $fields['fail_reason'] = $e->getMessage(); + $redis->xAdd('vrfc_failures', '*', $fields); + + // 잠깐 대기 + sleep(2); + } + + // 4. 성공 및 실패 처리 후 ACK 및 삭제 + $redis->xAck($stream, $group, $id); + $redis->xDel($stream, $id); + } +} \ No newline at end of file diff --git a/.history/worker_20251216162106.php b/.history/worker_20251216162106.php new file mode 100644 index 0000000..1c94e63 --- /dev/null +++ b/.history/worker_20251216162106.php @@ -0,0 +1,89 @@ +connect($redisHost, $redisPort); + +// 그룹 생성 (이미 있으면 예외 발생하므로 무시) +try { + $redis->xGroup('CREATE', $stream, $group, '$'); +} catch (Exception $e) {} + +// 저장 경로가 없으면 생성 (권한 문제 주의) +if (!is_dir($storagePath)) { + mkdir($storagePath, 0777, true); +} + + +while (true) { + // 1. Stream에서 메시지 읽기 + $res = $redis->xReadGroup($group, $consumer, [$stream => '>'], $batchSize, 5000); + if (!$res) { + continue; + } + + $entries = $res[$stream]; + $ids = []; + $processedCount = 0; + + foreach ($entries as $id => $fields) { + $ids[] = $id; + $articleNumbr = $fields['articleNumbr'] ?? ''; + + if (empty($articleNumbr)) { + // 매물 번호가 없으면 실패 스트림으로 이동 후 다음 루프로 + $fields['fail_reason'] = 'Missing articleNumbr'; + $redis->xAdd('vrfc_failures', '*', $fields); + $redis->xAck($stream, $group, $id); + $redis->xDel($stream, $id); + continue; + } + + // 2. 파일 저장 로직 실행 + + // 파일 이름: 매물번호_요청시간.json + $fileName = sprintf('%s_%s.json', $articleNumbr, $fields['requestDatetime']); + $filePath = $storagePath . '/' . $fileName; + + // JSON 데이터 (stream fields를 그대로 사용) + $jsonData = json_encode($fields, JSON_PRETTY_PRINT); + + try { + // 파일에 JSON 데이터 쓰기 + $result = file_put_contents($filePath, $jsonData); + + if ($result === false) { + throw new Exception("File write failed for: {$filePath}"); + } + + error_log("SUCCESS: Saved request for {$articleNumbr} to {$fileName}"); + $processedCount++; + + } catch (Exception $e) { + error_log("File save error: " . $e->getMessage()); + + // 3. 파일 저장 실패 시 실패 스트림으로 이동 (DLQ) + $fields['fail_reason'] = $e->getMessage(); + $redis->xAdd('vrfc_failures', '*', $fields); + + // 잠깐 대기 + sleep(2); + } + + // 4. 성공 및 실패 처리 후 ACK 및 삭제 + $redis->xAck($stream, $group, $id); + $redis->xDel($stream, $id); + } +} \ No newline at end of file diff --git a/.history/worker_20251216162320.php b/.history/worker_20251216162320.php new file mode 100644 index 0000000..1c94e63 --- /dev/null +++ b/.history/worker_20251216162320.php @@ -0,0 +1,89 @@ +connect($redisHost, $redisPort); + +// 그룹 생성 (이미 있으면 예외 발생하므로 무시) +try { + $redis->xGroup('CREATE', $stream, $group, '$'); +} catch (Exception $e) {} + +// 저장 경로가 없으면 생성 (권한 문제 주의) +if (!is_dir($storagePath)) { + mkdir($storagePath, 0777, true); +} + + +while (true) { + // 1. Stream에서 메시지 읽기 + $res = $redis->xReadGroup($group, $consumer, [$stream => '>'], $batchSize, 5000); + if (!$res) { + continue; + } + + $entries = $res[$stream]; + $ids = []; + $processedCount = 0; + + foreach ($entries as $id => $fields) { + $ids[] = $id; + $articleNumbr = $fields['articleNumbr'] ?? ''; + + if (empty($articleNumbr)) { + // 매물 번호가 없으면 실패 스트림으로 이동 후 다음 루프로 + $fields['fail_reason'] = 'Missing articleNumbr'; + $redis->xAdd('vrfc_failures', '*', $fields); + $redis->xAck($stream, $group, $id); + $redis->xDel($stream, $id); + continue; + } + + // 2. 파일 저장 로직 실행 + + // 파일 이름: 매물번호_요청시간.json + $fileName = sprintf('%s_%s.json', $articleNumbr, $fields['requestDatetime']); + $filePath = $storagePath . '/' . $fileName; + + // JSON 데이터 (stream fields를 그대로 사용) + $jsonData = json_encode($fields, JSON_PRETTY_PRINT); + + try { + // 파일에 JSON 데이터 쓰기 + $result = file_put_contents($filePath, $jsonData); + + if ($result === false) { + throw new Exception("File write failed for: {$filePath}"); + } + + error_log("SUCCESS: Saved request for {$articleNumbr} to {$fileName}"); + $processedCount++; + + } catch (Exception $e) { + error_log("File save error: " . $e->getMessage()); + + // 3. 파일 저장 실패 시 실패 스트림으로 이동 (DLQ) + $fields['fail_reason'] = $e->getMessage(); + $redis->xAdd('vrfc_failures', '*', $fields); + + // 잠깐 대기 + sleep(2); + } + + // 4. 성공 및 실패 처리 후 ACK 및 삭제 + $redis->xAck($stream, $group, $id); + $redis->xDel($stream, $id); + } +} \ No newline at end of file diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..9c46ea5 --- /dev/null +++ b/composer.json @@ -0,0 +1,13 @@ +{ + "name": "project/php-worker", + "description": "PHP worker for external API calls", + "type": "project", + "require": { + "guzzlehttp/guzzle": "^7.0" + }, + "autoload": { + "psr-4": { + "App\\": "src/" + } + } +} diff --git a/worker.php b/worker.php new file mode 100644 index 0000000..1c94e63 --- /dev/null +++ b/worker.php @@ -0,0 +1,89 @@ +connect($redisHost, $redisPort); + +// 그룹 생성 (이미 있으면 예외 발생하므로 무시) +try { + $redis->xGroup('CREATE', $stream, $group, '$'); +} catch (Exception $e) {} + +// 저장 경로가 없으면 생성 (권한 문제 주의) +if (!is_dir($storagePath)) { + mkdir($storagePath, 0777, true); +} + + +while (true) { + // 1. Stream에서 메시지 읽기 + $res = $redis->xReadGroup($group, $consumer, [$stream => '>'], $batchSize, 5000); + if (!$res) { + continue; + } + + $entries = $res[$stream]; + $ids = []; + $processedCount = 0; + + foreach ($entries as $id => $fields) { + $ids[] = $id; + $articleNumbr = $fields['articleNumbr'] ?? ''; + + if (empty($articleNumbr)) { + // 매물 번호가 없으면 실패 스트림으로 이동 후 다음 루프로 + $fields['fail_reason'] = 'Missing articleNumbr'; + $redis->xAdd('vrfc_failures', '*', $fields); + $redis->xAck($stream, $group, $id); + $redis->xDel($stream, $id); + continue; + } + + // 2. 파일 저장 로직 실행 + + // 파일 이름: 매물번호_요청시간.json + $fileName = sprintf('%s_%s.json', $articleNumbr, $fields['requestDatetime']); + $filePath = $storagePath . '/' . $fileName; + + // JSON 데이터 (stream fields를 그대로 사용) + $jsonData = json_encode($fields, JSON_PRETTY_PRINT); + + try { + // 파일에 JSON 데이터 쓰기 + $result = file_put_contents($filePath, $jsonData); + + if ($result === false) { + throw new Exception("File write failed for: {$filePath}"); + } + + error_log("SUCCESS: Saved request for {$articleNumbr} to {$fileName}"); + $processedCount++; + + } catch (Exception $e) { + error_log("File save error: " . $e->getMessage()); + + // 3. 파일 저장 실패 시 실패 스트림으로 이동 (DLQ) + $fields['fail_reason'] = $e->getMessage(); + $redis->xAdd('vrfc_failures', '*', $fields); + + // 잠깐 대기 + sleep(2); + } + + // 4. 성공 및 실패 처리 후 ACK 및 삭제 + $redis->xAck($stream, $group, $id); + $redis->xDel($stream, $id); + } +} \ No newline at end of file