refactor: 코드 리뷰 2차 품질 개선
- Cron 핸들러 retry 로직 executeWithRetry 헬퍼로 추출 (DRY) - Singleton 설정 변경 감지 추가 (CacheService, QueryService) - SYNC_BATCH_SIZE 환경변수 일관되게 사용 (100) - GPU/G8/VPU 인스턴스 fetch Promise.all로 병렬화 - parsePositiveNumber 빈 문자열 처리 추가 - Health check DB 쿼리 5초 timeout 추가 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
233
src/index.ts
233
src/index.ts
@@ -9,6 +9,61 @@ import app from './app';
|
||||
import { createLogger } from './utils/logger';
|
||||
import { SyncOrchestrator } from './services/sync';
|
||||
|
||||
/**
|
||||
* Generic retry helper with exponential backoff
|
||||
* Executes an operation with automatic retry on failure
|
||||
*/
|
||||
async function executeWithRetry<T>(
|
||||
operation: () => Promise<T>,
|
||||
options: {
|
||||
maxRetries: number;
|
||||
operationName: string;
|
||||
logger: ReturnType<typeof createLogger>;
|
||||
}
|
||||
): Promise<T> {
|
||||
const { maxRetries, operationName, logger } = options;
|
||||
|
||||
for (let attempt = 1; attempt <= maxRetries; attempt++) {
|
||||
try {
|
||||
logger.info(`Starting ${operationName} attempt`, {
|
||||
attempt_number: attempt,
|
||||
max_retries: maxRetries
|
||||
});
|
||||
|
||||
const result = await operation();
|
||||
return result;
|
||||
|
||||
} catch (error) {
|
||||
const willRetry = attempt < maxRetries;
|
||||
const retryDelayMs = willRetry ? Math.min(Math.pow(2, attempt - 1) * 1000, 10000) : 0;
|
||||
|
||||
logger.error(`${operationName} attempt failed`, {
|
||||
attempt_number: attempt,
|
||||
max_retries: maxRetries,
|
||||
will_retry: willRetry,
|
||||
retry_delay_ms: retryDelayMs,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
stack: error instanceof Error ? error.stack : undefined
|
||||
});
|
||||
|
||||
if (willRetry) {
|
||||
// Wait before retry with exponential backoff
|
||||
await new Promise(resolve => setTimeout(resolve, retryDelayMs));
|
||||
} else {
|
||||
// Final failure - re-throw to make cron failure visible
|
||||
logger.error(`${operationName} failed after all retries`, {
|
||||
total_attempts: maxRetries,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TypeScript exhaustiveness check - should never reach here
|
||||
throw new Error(`${operationName}: Unexpected retry loop exit`);
|
||||
}
|
||||
|
||||
export default {
|
||||
/**
|
||||
* HTTP Request Handler (delegated to Hono)
|
||||
@@ -45,68 +100,37 @@ export default {
|
||||
const executeSyncWithRetry = async (): Promise<void> => {
|
||||
const orchestrator = new SyncOrchestrator(env.DB, env);
|
||||
|
||||
for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) {
|
||||
try {
|
||||
logger.info('Starting full sync attempt', {
|
||||
attempt_number: attempt,
|
||||
max_retries: MAX_RETRIES
|
||||
});
|
||||
|
||||
const report = await orchestrator.syncAll(['linode', 'vultr', 'aws']);
|
||||
|
||||
logger.info('Daily full sync complete', {
|
||||
attempt_number: attempt,
|
||||
total_regions: report.summary.total_regions,
|
||||
total_instances: report.summary.total_instances,
|
||||
total_pricing: report.summary.total_pricing,
|
||||
successful_providers: report.summary.successful_providers,
|
||||
failed_providers: report.summary.failed_providers,
|
||||
duration_ms: report.total_duration_ms
|
||||
});
|
||||
|
||||
// Alert on partial failures
|
||||
if (report.summary.failed_providers > 0) {
|
||||
const failedProviders = report.providers
|
||||
.filter(p => !p.success)
|
||||
.map(p => p.provider);
|
||||
|
||||
logger.warn('Some providers failed during sync', {
|
||||
failed_count: report.summary.failed_providers,
|
||||
failed_providers: failedProviders,
|
||||
errors: report.providers
|
||||
.filter(p => !p.success)
|
||||
.map(p => ({ provider: p.provider, error: p.error }))
|
||||
});
|
||||
}
|
||||
|
||||
// Success - exit retry loop
|
||||
return;
|
||||
|
||||
} catch (error) {
|
||||
const willRetry = attempt < MAX_RETRIES;
|
||||
const retryDelayMs = willRetry ? Math.min(Math.pow(2, attempt - 1) * 1000, 10000) : 0;
|
||||
|
||||
logger.error('Sync attempt failed', {
|
||||
attempt_number: attempt,
|
||||
max_retries: MAX_RETRIES,
|
||||
will_retry: willRetry,
|
||||
retry_delay_ms: retryDelayMs,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
stack: error instanceof Error ? error.stack : undefined
|
||||
});
|
||||
|
||||
if (willRetry) {
|
||||
// Wait before retry with exponential backoff
|
||||
await new Promise(resolve => setTimeout(resolve, retryDelayMs));
|
||||
} else {
|
||||
// Final failure - re-throw to make cron failure visible
|
||||
logger.error('Daily sync failed after all retries', {
|
||||
total_attempts: MAX_RETRIES,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
const report = await executeWithRetry(
|
||||
async () => orchestrator.syncAll(['linode', 'vultr', 'aws']),
|
||||
{
|
||||
maxRetries: MAX_RETRIES,
|
||||
operationName: 'full sync',
|
||||
logger
|
||||
}
|
||||
);
|
||||
|
||||
logger.info('Daily full sync complete', {
|
||||
total_regions: report.summary.total_regions,
|
||||
total_instances: report.summary.total_instances,
|
||||
total_pricing: report.summary.total_pricing,
|
||||
successful_providers: report.summary.successful_providers,
|
||||
failed_providers: report.summary.failed_providers,
|
||||
duration_ms: report.total_duration_ms
|
||||
});
|
||||
|
||||
// Alert on partial failures
|
||||
if (report.summary.failed_providers > 0) {
|
||||
const failedProviders = report.providers
|
||||
.filter(p => !p.success)
|
||||
.map(p => p.provider);
|
||||
|
||||
logger.warn('Some providers failed during sync', {
|
||||
failed_count: report.summary.failed_providers,
|
||||
failed_providers: failedProviders,
|
||||
errors: report.providers
|
||||
.filter(p => !p.success)
|
||||
.map(p => ({ provider: p.provider, error: p.error }))
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
@@ -119,66 +143,35 @@ export default {
|
||||
const executePricingSyncWithRetry = async (): Promise<void> => {
|
||||
const orchestrator = new SyncOrchestrator(env.DB, env);
|
||||
|
||||
for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) {
|
||||
try {
|
||||
logger.info('Starting pricing sync attempt', {
|
||||
attempt_number: attempt,
|
||||
max_retries: MAX_RETRIES
|
||||
});
|
||||
|
||||
const report = await orchestrator.syncAllPricingOnly(['linode', 'vultr', 'aws']);
|
||||
|
||||
logger.info('Pricing sync complete', {
|
||||
attempt_number: attempt,
|
||||
total_pricing: report.summary.total_pricing,
|
||||
successful_providers: report.summary.successful_providers,
|
||||
failed_providers: report.summary.failed_providers,
|
||||
duration_ms: report.total_duration_ms
|
||||
});
|
||||
|
||||
// Alert on partial failures
|
||||
if (report.summary.failed_providers > 0) {
|
||||
const failedProviders = report.providers
|
||||
.filter(p => !p.success)
|
||||
.map(p => p.provider);
|
||||
|
||||
logger.warn('Some providers failed during pricing sync', {
|
||||
failed_count: report.summary.failed_providers,
|
||||
failed_providers: failedProviders,
|
||||
errors: report.providers
|
||||
.filter(p => !p.success)
|
||||
.map(p => ({ provider: p.provider, error: p.error }))
|
||||
});
|
||||
}
|
||||
|
||||
// Success - exit retry loop
|
||||
return;
|
||||
|
||||
} catch (error) {
|
||||
const willRetry = attempt < MAX_RETRIES;
|
||||
const retryDelayMs = willRetry ? Math.min(Math.pow(2, attempt - 1) * 1000, 10000) : 0;
|
||||
|
||||
logger.error('Pricing sync attempt failed', {
|
||||
attempt_number: attempt,
|
||||
max_retries: MAX_RETRIES,
|
||||
will_retry: willRetry,
|
||||
retry_delay_ms: retryDelayMs,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
stack: error instanceof Error ? error.stack : undefined
|
||||
});
|
||||
|
||||
if (willRetry) {
|
||||
// Wait before retry with exponential backoff
|
||||
await new Promise(resolve => setTimeout(resolve, retryDelayMs));
|
||||
} else {
|
||||
// Final failure - re-throw to make cron failure visible
|
||||
logger.error('Pricing sync failed after all retries', {
|
||||
total_attempts: MAX_RETRIES,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
const report = await executeWithRetry(
|
||||
async () => orchestrator.syncAllPricingOnly(['linode', 'vultr', 'aws']),
|
||||
{
|
||||
maxRetries: MAX_RETRIES,
|
||||
operationName: 'pricing sync',
|
||||
logger
|
||||
}
|
||||
);
|
||||
|
||||
logger.info('Pricing sync complete', {
|
||||
total_pricing: report.summary.total_pricing,
|
||||
successful_providers: report.summary.successful_providers,
|
||||
failed_providers: report.summary.failed_providers,
|
||||
duration_ms: report.total_duration_ms
|
||||
});
|
||||
|
||||
// Alert on partial failures
|
||||
if (report.summary.failed_providers > 0) {
|
||||
const failedProviders = report.providers
|
||||
.filter(p => !p.success)
|
||||
.map(p => p.provider);
|
||||
|
||||
logger.warn('Some providers failed during pricing sync', {
|
||||
failed_count: report.summary.failed_providers,
|
||||
failed_providers: failedProviders,
|
||||
errors: report.providers
|
||||
.filter(p => !p.success)
|
||||
.map(p => ({ provider: p.provider, error: p.error }))
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -61,6 +61,27 @@ interface DetailedHealthResponse extends PublicHealthResponse {
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps a promise with a timeout
|
||||
* @param promise - The promise to wrap
|
||||
* @param ms - Timeout in milliseconds
|
||||
* @param operation - Operation name for error message
|
||||
* @returns Promise result if completed within timeout
|
||||
* @throws Error if operation times out
|
||||
*/
|
||||
async function withTimeout<T>(promise: Promise<T>, ms: number, operation: string): Promise<T> {
|
||||
let timeoutId: ReturnType<typeof setTimeout>;
|
||||
const timeoutPromise = new Promise<never>((_, reject) => {
|
||||
timeoutId = setTimeout(() => reject(new Error(`${operation} timed out after ${ms}ms`)), ms);
|
||||
});
|
||||
|
||||
try {
|
||||
return await Promise.race([promise, timeoutPromise]);
|
||||
} finally {
|
||||
clearTimeout(timeoutId!);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check database connectivity and measure latency
|
||||
*/
|
||||
@@ -68,8 +89,12 @@ async function checkDatabaseHealth(db: D1Database): Promise<DatabaseHealth> {
|
||||
try {
|
||||
const startTime = Date.now();
|
||||
|
||||
// Simple connectivity check
|
||||
await db.prepare('SELECT 1').first();
|
||||
// Simple connectivity check with 5-second timeout
|
||||
await withTimeout(
|
||||
db.prepare('SELECT 1').first(),
|
||||
5000,
|
||||
'Database health check'
|
||||
);
|
||||
|
||||
const latency = Date.now() - startTime;
|
||||
|
||||
|
||||
@@ -33,17 +33,19 @@ import {
|
||||
*/
|
||||
let cachedQueryService: QueryService | null = null;
|
||||
let cachedDb: D1Database | null = null;
|
||||
let cachedEnv: Env | null = null;
|
||||
|
||||
/**
|
||||
* Get or create QueryService singleton
|
||||
* Lazy initialization on first request, then reused for subsequent requests
|
||||
* Invalidates cache if database binding changes (rolling deploy scenario)
|
||||
* Invalidates cache if database or environment binding changes (rolling deploy scenario)
|
||||
*/
|
||||
function getQueryService(db: D1Database, env: Env): QueryService {
|
||||
// Invalidate cache if db binding changed (rolling deploy scenario)
|
||||
if (!cachedQueryService || cachedDb !== db) {
|
||||
// Invalidate cache if db or env binding changed (rolling deploy scenario)
|
||||
if (!cachedQueryService || cachedDb !== db || cachedEnv !== env) {
|
||||
cachedQueryService = new QueryService(db, env);
|
||||
cachedDb = db;
|
||||
cachedEnv = env;
|
||||
logger.debug('[Instances] QueryService singleton initialized/refreshed');
|
||||
}
|
||||
return cachedQueryService;
|
||||
@@ -128,7 +130,7 @@ function parseQueryParams(url: URL): {
|
||||
name: string,
|
||||
value: string | null
|
||||
): number | undefined | { error: { code: string; message: string; parameter: string } } {
|
||||
if (value === null) return undefined;
|
||||
if (value === null || value === '') return undefined;
|
||||
|
||||
const parsed = Number(value);
|
||||
if (isNaN(parsed) || parsed < 0) {
|
||||
|
||||
@@ -48,10 +48,13 @@ export interface CacheResult<T> {
|
||||
* Prevents race conditions from multiple route-level singletons
|
||||
*/
|
||||
let globalCacheService: CacheService | null = null;
|
||||
let cachedTtl: number | null = null;
|
||||
let cachedKv: KVNamespace | null = null;
|
||||
|
||||
/**
|
||||
* Get or create global CacheService singleton
|
||||
* Thread-safe factory function that ensures only one CacheService instance exists
|
||||
* Detects configuration changes (TTL or KV namespace) and refreshes singleton
|
||||
*
|
||||
* @param ttl - TTL in seconds for cache entries
|
||||
* @param kv - KV namespace for cache index (enables pattern invalidation)
|
||||
@@ -61,9 +64,11 @@ let globalCacheService: CacheService | null = null;
|
||||
* const cache = getGlobalCacheService(CACHE_TTL.INSTANCES, env.RATE_LIMIT_KV);
|
||||
*/
|
||||
export function getGlobalCacheService(ttl: number, kv: KVNamespace | null): CacheService {
|
||||
if (!globalCacheService) {
|
||||
if (!globalCacheService || cachedTtl !== ttl || cachedKv !== kv) {
|
||||
globalCacheService = new CacheService(ttl, kv);
|
||||
logger.debug('[CacheService] Global singleton initialized');
|
||||
cachedTtl = ttl;
|
||||
cachedKv = kv;
|
||||
logger.debug('[CacheService] Global singleton initialized/refreshed');
|
||||
}
|
||||
return globalCacheService;
|
||||
}
|
||||
|
||||
@@ -189,37 +189,39 @@ export class SyncOrchestrator {
|
||||
let vpuInstancesCount = 0;
|
||||
|
||||
if (provider.toLowerCase() === 'linode') {
|
||||
// GPU instances
|
||||
if (connector.getGpuInstances) {
|
||||
const gpuInstances = await withTimeout(connector.getGpuInstances(), 15000, `${provider} fetch GPU instances`);
|
||||
if (gpuInstances && gpuInstances.length > 0) {
|
||||
gpuInstancesCount = await this.repos.gpuInstances.upsertMany(
|
||||
providerRecord.id,
|
||||
gpuInstances
|
||||
);
|
||||
}
|
||||
// Parallel fetch all specialized instances for Linode
|
||||
const [gpuInstances, g8Instances, vpuInstances] = await Promise.all([
|
||||
connector.getGpuInstances
|
||||
? withTimeout(connector.getGpuInstances(), 15000, `${provider} fetch GPU instances`)
|
||||
: Promise.resolve([]),
|
||||
connector.getG8Instances
|
||||
? withTimeout(connector.getG8Instances(), 15000, `${provider} fetch G8 instances`)
|
||||
: Promise.resolve([]),
|
||||
connector.getVpuInstances
|
||||
? withTimeout(connector.getVpuInstances(), 15000, `${provider} fetch VPU instances`)
|
||||
: Promise.resolve([])
|
||||
]);
|
||||
|
||||
// Sequential upsert (database operations)
|
||||
if (gpuInstances.length > 0) {
|
||||
gpuInstancesCount = await this.repos.gpuInstances.upsertMany(
|
||||
providerRecord.id,
|
||||
gpuInstances
|
||||
);
|
||||
}
|
||||
|
||||
// G8 instances
|
||||
if (connector.getG8Instances) {
|
||||
const g8Instances = await withTimeout(connector.getG8Instances(), 15000, `${provider} fetch G8 instances`);
|
||||
if (g8Instances && g8Instances.length > 0) {
|
||||
g8InstancesCount = await this.repos.g8Instances.upsertMany(
|
||||
providerRecord.id,
|
||||
g8Instances
|
||||
);
|
||||
}
|
||||
if (g8Instances.length > 0) {
|
||||
g8InstancesCount = await this.repos.g8Instances.upsertMany(
|
||||
providerRecord.id,
|
||||
g8Instances
|
||||
);
|
||||
}
|
||||
|
||||
// VPU instances
|
||||
if (connector.getVpuInstances) {
|
||||
const vpuInstances = await withTimeout(connector.getVpuInstances(), 15000, `${provider} fetch VPU instances`);
|
||||
if (vpuInstances && vpuInstances.length > 0) {
|
||||
vpuInstancesCount = await this.repos.vpuInstances.upsertMany(
|
||||
providerRecord.id,
|
||||
vpuInstances
|
||||
);
|
||||
}
|
||||
if (vpuInstances.length > 0) {
|
||||
vpuInstancesCount = await this.repos.vpuInstances.upsertMany(
|
||||
providerRecord.id,
|
||||
vpuInstances
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -813,24 +815,29 @@ export class SyncOrchestrator {
|
||||
|
||||
/**
|
||||
* Generate AWS pricing records in batches using Generator pattern
|
||||
* Minimizes memory usage by yielding batches of 100 records at a time
|
||||
* Minimizes memory usage by yielding batches at a time (configurable)
|
||||
*
|
||||
* @param instanceTypeIds - Array of database instance type IDs
|
||||
* @param regionIds - Array of database region IDs
|
||||
* @param dbInstanceMap - Map of instance type ID to DB instance data
|
||||
* @param rawInstanceMap - Map of instance_id (API ID) to raw AWS data
|
||||
* @yields Batches of PricingInput records (100 per batch)
|
||||
* @param env - Environment configuration for SYNC_BATCH_SIZE
|
||||
* @yields Batches of PricingInput records (configurable batch size)
|
||||
*
|
||||
* Manual Test:
|
||||
* Generator yields ~252 batches for ~25,230 total records (870 instances × 29 regions)
|
||||
* Generator yields batches for ~25,230 total records (870 instances × 29 regions)
|
||||
*/
|
||||
private *generateAWSPricingBatches(
|
||||
instanceTypeIds: number[],
|
||||
regionIds: number[],
|
||||
dbInstanceMap: Map<number, { instance_id: string }>,
|
||||
rawInstanceMap: Map<string, { Cost: number; MonthlyPrice: number }>
|
||||
rawInstanceMap: Map<string, { Cost: number; MonthlyPrice: number }>,
|
||||
env?: Env
|
||||
): Generator<PricingInput[], void, void> {
|
||||
const BATCH_SIZE = 500;
|
||||
const BATCH_SIZE = Math.min(
|
||||
Math.max(parseInt(env?.SYNC_BATCH_SIZE || '100', 10) || 100, 1),
|
||||
1000
|
||||
);
|
||||
let batch: PricingInput[] = [];
|
||||
|
||||
for (const regionId of regionIds) {
|
||||
@@ -894,7 +901,7 @@ export class SyncOrchestrator {
|
||||
env?: Env
|
||||
): Generator<PricingInput[], void, void> {
|
||||
const BATCH_SIZE = Math.min(
|
||||
Math.max(parseInt(env?.SYNC_BATCH_SIZE || '500', 10) || 500, 1),
|
||||
Math.max(parseInt(env?.SYNC_BATCH_SIZE || '100', 10) || 100, 1),
|
||||
1000
|
||||
);
|
||||
let batch: PricingInput[] = [];
|
||||
@@ -960,7 +967,7 @@ export class SyncOrchestrator {
|
||||
env?: Env
|
||||
): Generator<PricingInput[], void, void> {
|
||||
const BATCH_SIZE = Math.min(
|
||||
Math.max(parseInt(env?.SYNC_BATCH_SIZE || '500', 10) || 500, 1),
|
||||
Math.max(parseInt(env?.SYNC_BATCH_SIZE || '100', 10) || 100, 1),
|
||||
1000
|
||||
);
|
||||
let batch: PricingInput[] = [];
|
||||
@@ -1029,7 +1036,7 @@ export class SyncOrchestrator {
|
||||
env?: Env
|
||||
): Generator<GpuPricingInput[], void, void> {
|
||||
const BATCH_SIZE = Math.min(
|
||||
Math.max(parseInt(env?.SYNC_BATCH_SIZE || '500', 10) || 500, 1),
|
||||
Math.max(parseInt(env?.SYNC_BATCH_SIZE || '100', 10) || 100, 1),
|
||||
1000
|
||||
);
|
||||
let batch: GpuPricingInput[] = [];
|
||||
@@ -1095,7 +1102,7 @@ export class SyncOrchestrator {
|
||||
env?: Env
|
||||
): Generator<GpuPricingInput[], void, void> {
|
||||
const BATCH_SIZE = Math.min(
|
||||
Math.max(parseInt(env?.SYNC_BATCH_SIZE || '500', 10) || 500, 1),
|
||||
Math.max(parseInt(env?.SYNC_BATCH_SIZE || '100', 10) || 100, 1),
|
||||
1000
|
||||
);
|
||||
let batch: GpuPricingInput[] = [];
|
||||
@@ -1151,7 +1158,7 @@ export class SyncOrchestrator {
|
||||
env?: Env
|
||||
): Generator<G8PricingInput[], void, void> {
|
||||
const BATCH_SIZE = Math.min(
|
||||
Math.max(parseInt(env?.SYNC_BATCH_SIZE || '500', 10) || 500, 1),
|
||||
Math.max(parseInt(env?.SYNC_BATCH_SIZE || '100', 10) || 100, 1),
|
||||
1000
|
||||
);
|
||||
let batch: G8PricingInput[] = [];
|
||||
@@ -1204,7 +1211,7 @@ export class SyncOrchestrator {
|
||||
env?: Env
|
||||
): Generator<VpuPricingInput[], void, void> {
|
||||
const BATCH_SIZE = Math.min(
|
||||
Math.max(parseInt(env?.SYNC_BATCH_SIZE || '500', 10) || 500, 1),
|
||||
Math.max(parseInt(env?.SYNC_BATCH_SIZE || '100', 10) || 100, 1),
|
||||
1000
|
||||
);
|
||||
let batch: VpuPricingInput[] = [];
|
||||
@@ -1903,7 +1910,8 @@ export class SyncOrchestrator {
|
||||
instanceTypeIds,
|
||||
regionIds,
|
||||
dbInstanceMap,
|
||||
rawInstanceMap
|
||||
rawInstanceMap,
|
||||
this.env
|
||||
);
|
||||
|
||||
// Process batches incrementally
|
||||
|
||||
Reference in New Issue
Block a user