refactor: comprehensive code review fixes (security, performance, QA)

## Security Improvements
- Fix timing attack in verifyApiKey with fixed 256-byte buffer
- Fix sortOrder SQL injection with whitelist validation
- Fix rate limiting bypass for non-Cloudflare traffic (fail-closed)
- Remove stack trace exposure in error responses
- Add request_id for audit trail (X-Request-ID header)
- Sanitize origin header to prevent log injection
- Add content-length validation for /sync endpoint (10KB limit)
- Replace Math.random() with crypto.randomUUID() for sync IDs
- Expand sensitive data masking patterns (8 → 18)

## Performance Improvements
- Reduce rate limiter KV reads from 3 to 1 per request (66% reduction)
- Increase sync batch size from 100 to 500 (80% fewer batches)
- Fix health check N+1 query with efficient JOINs
- Fix COUNT(*) Cartesian product with COUNT(DISTINCT)
- Implement shared logger cache pattern across repositories
- Add CacheService singleton pattern in recommend.ts
- Add composite index for recommendation queries
- Implement Anvil pricing query batching (100 per chunk)

## QA Improvements
- Add BATCH_SIZE bounds validation (1-1000)
- Add pagination bounds (page >= 1, MAX_OFFSET = 100000)
- Add min/max range consistency validation
- Add DB reference validation for singleton services
- Add type guards for database result validation
- Add timeout mechanism for external API calls (10-60s)
- Use SUPPORTED_PROVIDERS constant instead of hardcoded list

## Removed
- Remove Vault integration (using Wrangler secrets)
- Remove 6-hour pricing cron (daily sync only)

## Configuration
- Add idx_instance_types_specs_filter composite index
- Add CORS Access-Control-Expose-Headers

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
kappa
2026-01-25 23:50:37 +09:00
parent 9f3d3a245a
commit 3a8dd705e6
47 changed files with 2031 additions and 2459 deletions

View File

@@ -80,6 +80,65 @@
* Post-sync invalidation:
* - After sync operation, call cache.delete(key) for all relevant keys
* - Verify next request fetches fresh data from database
*
* Test 9: clearAll Method
* -----------------------
* 1. Set multiple cache entries:
* - await cache.set('key1', { data: 'test1' }, 300)
* - await cache.set('key2', { data: 'test2' }, 300)
* - await cache.set('key3', { data: 'test3' }, 300)
* 2. Call clearAll: const count = await cache.clearAll()
* 3. Expected: count === 0 (enumeration not supported)
* 4. Expected: Log message about TTL-based expiration
* 5. Note: Individual entries will expire based on TTL
*
* Test 10: clearAll with Prefix
* ------------------------------
* 1. Set entries with different prefixes:
* - await cache.set('https://cache.internal/instances?foo=bar', data1)
* - await cache.set('https://cache.internal/pricing?foo=bar', data2)
* 2. Call with prefix: await cache.clearAll('https://cache.internal/instances')
* 3. Expected: count === 0, log shows prefix parameter
* 4. Note: Prefix is logged but enumeration not supported by Cache API
*
* Test 11: clearByEndpoint Method
* --------------------------------
* 1. Set endpoint cache: await cache.set('https://cache.internal/instances', data, 300)
* 2. Clear by endpoint: const deleted = await cache.clearByEndpoint('instances')
* 3. Expected: deleted === true if cache entry existed
* 4. Get data: const result = await cache.get('https://cache.internal/instances')
* 5. Expected: result === null (entry deleted)
* 6. Note: Only exact matches deleted, parameterized queries remain cached
*
* Test 12: clearByEndpoint with Non-existent Endpoint
* ----------------------------------------------------
* 1. Clear non-existent: const deleted = await cache.clearByEndpoint('non-existent')
* 2. Expected: deleted === false
* 3. Expected: Log message about non-existent endpoint
*
* Test 13: Cache Invalidation Strategy
* -------------------------------------
* 1. Set parameterized cache entries:
* - cache.generateKey({ provider: 'linode', region: 'us-east' })
* - cache.generateKey({ provider: 'linode', region: 'eu-west' })
* - cache.generateKey({ provider: 'vultr', region: 'us-east' })
* 2. After schema change or full sync, call clearAll()
* 3. Verify entries expire based on TTL
* 4. For production: Consider using KV-backed cache index for enumeration
*
* Test 14: Error Handling in clearAll
* ------------------------------------
* 1. Mock cache.delete to throw error
* 2. Call clearAll: await cache.clearAll()
* 3. Expected: Error is logged and re-thrown
* 4. Verify error message includes context
*
* Test 15: Error Handling in clearByEndpoint
* -------------------------------------------
* 1. Mock cache.delete to throw error
* 2. Call clearByEndpoint: await cache.clearByEndpoint('instances')
* 3. Expected: Returns false, error logged
* 4. Application continues without crashing
*/
import { CacheService } from './cache';
@@ -146,4 +205,64 @@ async function exampleCacheInvalidationAfterSync(
console.log('[Sync] Cache invalidation complete');
}
export { exampleInstanceEndpointWithCache, exampleCacheInvalidationAfterSync };
/**
* Example: Using clearAll after schema changes
*/
async function exampleClearAllAfterSchemaChange(): Promise<void> {
const cache = new CacheService();
// After major schema changes or data migrations
console.log('[Migration] Clearing all cache entries');
const count = await cache.clearAll();
console.log(`[Migration] Cache clear requested. Entries will expire based on TTL.`);
console.log('[Migration] Consider using KV-backed cache index for enumeration in production.');
}
/**
* Example: Using clearByEndpoint for targeted invalidation
*/
async function exampleClearByEndpointAfterUpdate(endpoint: string): Promise<void> {
const cache = new CacheService();
// Clear cache for specific endpoint after data update
console.log(`[Update] Clearing cache for endpoint: ${endpoint}`);
const deleted = await cache.clearByEndpoint(endpoint);
if (deleted) {
console.log(`[Update] Successfully cleared ${endpoint} cache`);
} else {
console.log(`[Update] No cache entry found for ${endpoint}`);
}
}
/**
* Example: Force cache refresh strategy
*/
async function exampleForceCacheRefresh(
endpoint: string,
fetchFunction: () => Promise<unknown>
): Promise<void> {
const cache = new CacheService();
// Strategy 1: Clear specific endpoint
await cache.clearByEndpoint(endpoint);
// Strategy 2: Clear with prefix (logged but not enumerated)
await cache.clearAll(`https://cache.internal/${endpoint}`);
// Strategy 3: Fetch fresh data and update cache
const freshData = await fetchFunction();
const cacheKey = `https://cache.internal/${endpoint}`;
await cache.set(cacheKey, freshData, 3600);
console.log(`[Refresh] Cache refreshed for ${endpoint}`);
}
export {
exampleInstanceEndpointWithCache,
exampleCacheInvalidationAfterSync,
exampleClearAllAfterSchemaChange,
exampleClearByEndpointAfterUpdate,
exampleForceCacheRefresh
};

View File

@@ -180,6 +180,88 @@ export class CacheService {
return `https://cache.internal/instances?${sorted}`;
}
/**
* Clear all cache entries with optional prefix filter
*
* Note: The Cloudflare Workers Cache API doesn't support listing/enumerating keys,
* so this method can only track operations via logging. Individual entries will
* expire based on their TTL. For production use cases requiring enumeration,
* consider using KV-backed cache index.
*
* @param prefix - Optional URL prefix to filter entries (e.g., 'https://cache.internal/instances')
* @returns Number of entries cleared (0, as enumeration is not supported)
*
* @example
* // Clear all cache entries
* const count = await cache.clearAll();
*
* // Clear entries with specific prefix
* const count = await cache.clearAll('https://cache.internal/instances');
*/
async clearAll(prefix?: string): Promise<number> {
try {
const targetPrefix = prefix ?? 'https://cache.internal/';
// The Cache API doesn't support listing keys directly
// We log the clear operation for audit purposes
// Individual entries will naturally expire based on TTL
logger.info('[CacheService] Cache clearAll requested', {
prefix: targetPrefix,
note: 'Individual entries will expire based on TTL. Consider using KV-backed cache index for enumeration.'
});
// Return 0 as we can't enumerate Cache API entries
// In production, use KV-backed cache index for enumeration
return 0;
} catch (error) {
logger.error('[CacheService] Cache clearAll failed', {
error: error instanceof Error ? error.message : String(error),
prefix
});
throw error;
}
}
/**
* Clear cache entries for a specific endpoint
*
* This method deletes cache entries matching a specific endpoint pattern.
* Useful for targeted cache invalidation after data updates.
*
* @param endpoint - Endpoint path (e.g., 'instances', 'pricing')
* @returns true if at least one entry was deleted, false otherwise
*
* @example
* // Clear all instance cache entries
* await cache.clearByEndpoint('instances');
*/
async clearByEndpoint(endpoint: string): Promise<boolean> {
try {
const cacheKey = `https://cache.internal/${endpoint}`;
// Delete the base endpoint cache entry
// Note: This only deletes exact matches, not parameterized queries
const deleted = await this.cache.delete(new Request(cacheKey, { method: 'GET' }));
logger.info('[CacheService] Endpoint cache cleared', {
endpoint,
success: deleted,
note: 'Only exact matches are deleted. Parameterized queries remain cached until TTL expiry.'
});
return deleted;
} catch (error) {
logger.error('[CacheService] Endpoint cache clear failed', {
endpoint,
error: error instanceof Error ? error.message : String(error)
});
return false;
}
}
/**
* Invalidate all cache entries matching a pattern
* Note: Cloudflare Workers Cache API doesn't support pattern matching

View File

@@ -89,26 +89,56 @@ export class QueryService {
this.logger.debug('Executing query', { sql });
this.logger.debug('Main query bindings', { bindings });
this.logger.debug('Count query bindings', { countBindings });
// Execute count and main queries in a single batch for performance
const [countResult, queryResult] = await this.db.batch([
this.db.prepare(countSql).bind(...countBindings),
this.db.prepare(sql).bind(...bindings),
]);
// Calculate pagination metadata
const page = Math.max(params.page ?? 1, 1);
const perPage = Math.min(Math.max(params.limit ?? 50, 1), 100); // Min 1, Max 100
const offset = (page - 1) * perPage;
// Validate batch results and extract data with type safety
if (!countResult.success || !queryResult.success) {
const errors = [
!countResult.success ? `Count query failed: ${countResult.error}` : null,
!queryResult.success ? `Main query failed: ${queryResult.error}` : null,
].filter(Boolean);
throw new Error(`Batch query execution failed: ${errors.join(', ')}`);
// Performance optimization: Only run COUNT query on first page or when explicitly requested
// This avoids expensive full table scan with JOINs on every paginated request
const shouldRunCount = offset === 0 || params.include_count === true;
let totalResults = 0;
let queryResult: D1Result;
if (shouldRunCount) {
this.logger.debug('Running COUNT query (first page or explicit request)', { countBindings });
// Execute count and main queries in a single batch for performance
const [countResult, mainResult] = await this.db.batch([
this.db.prepare(countSql).bind(...countBindings),
this.db.prepare(sql).bind(...bindings),
]);
// Validate batch results and extract data with type safety
if (!countResult.success || !mainResult.success) {
const errors = [
!countResult.success ? `Count query failed: ${countResult.error}` : null,
!mainResult.success ? `Main query failed: ${mainResult.error}` : null,
].filter(Boolean);
throw new Error(`Batch query execution failed: ${errors.join(', ')}`);
}
// Extract total count with type casting and fallback
totalResults = (countResult.results?.[0] as { total: number } | undefined)?.total ?? 0;
queryResult = mainResult;
} else {
// Skip COUNT query for subsequent pages (performance optimization)
this.logger.debug('Skipping COUNT query (subsequent page)', { page, offset });
queryResult = await this.db.prepare(sql).bind(...bindings).all();
if (!queryResult.success) {
throw new Error(`Main query failed: ${queryResult.error}`);
}
// Estimate total based on current results for pagination metadata
// If we got full page of results, there might be more pages
const resultCount = queryResult.results?.length ?? 0;
totalResults = resultCount === perPage ? offset + perPage + 1 : offset + resultCount;
}
// Extract total count with type casting and fallback
const totalResults = (countResult.results?.[0] as { total: number } | undefined)?.total ?? 0;
// Extract main query results with type casting
const results = (queryResult.results ?? []) as RawQueryResult[];
@@ -116,8 +146,6 @@ export class QueryService {
const instances = this.transformResults(results);
// Calculate pagination metadata
const page = params.page ?? 1;
const perPage = Math.min(params.limit ?? 50, 100); // Max 100
const totalPages = Math.ceil(totalResults / perPage);
const hasNext = page < totalPages;
const hasPrevious = page > 1;
@@ -268,6 +296,9 @@ export class QueryService {
const sortBy = params.sort_by ?? 'hourly_price';
const sortOrder = params.sort_order ?? 'asc';
// Validate sort order at service level (defense in depth)
const validatedSortOrder = sortOrder?.toLowerCase() === 'desc' ? 'DESC' : 'ASC';
// Map sort fields to actual column names
const sortFieldMap: Record<string, string> = {
price: 'pr.hourly_price',
@@ -285,15 +316,17 @@ export class QueryService {
// Handle NULL values in pricing columns (NULL values go last)
if (sortColumn.startsWith('pr.')) {
// Use CASE to put NULL values last regardless of sort order
orderByClause = ` ORDER BY CASE WHEN ${sortColumn} IS NULL THEN 1 ELSE 0 END, ${sortColumn} ${sortOrder.toUpperCase()}`;
orderByClause = ` ORDER BY CASE WHEN ${sortColumn} IS NULL THEN 1 ELSE 0 END, ${sortColumn} ${validatedSortOrder}`;
} else {
orderByClause = ` ORDER BY ${sortColumn} ${sortOrder.toUpperCase()}`;
orderByClause = ` ORDER BY ${sortColumn} ${validatedSortOrder}`;
}
// Build LIMIT and OFFSET
const page = params.page ?? 1;
const limit = Math.min(params.limit ?? 50, 100); // Max 100
const offset = (page - 1) * limit;
const MAX_OFFSET = 100000; // Reasonable limit for pagination to prevent extreme offsets
const page = Math.max(params.page ?? 1, 1);
const limit = Math.min(Math.max(params.limit ?? 50, 1), 100); // Min 1, Max 100
const rawOffset = (page - 1) * limit;
const offset = Math.min(rawOffset, MAX_OFFSET);
bindings.push(limit);
bindings.push(offset);
@@ -304,8 +337,9 @@ export class QueryService {
const sql = selectClause + whereClause + orderByClause + limitClause;
// Count query (without ORDER BY and LIMIT)
// Use COUNT(DISTINCT it.id) to avoid Cartesian product from LEFT JOINs
const countSql = `
SELECT COUNT(*) as total
SELECT COUNT(DISTINCT it.id) as total
FROM instance_types it
JOIN providers p ON it.provider_id = p.id
LEFT JOIN pricing pr ON pr.instance_type_id = it.id

View File

@@ -153,6 +153,92 @@ describe('RecommendationService', () => {
});
});
describe('input validation', () => {
it('should reject empty stack array', async () => {
const request: RecommendationRequest = {
stack: [],
scale: 'small',
};
await expect(service.recommend(request)).rejects.toThrow(
'Stack must be a non-empty array'
);
});
it('should reject non-array stack', async () => {
const request = {
stack: 'nginx' as any,
scale: 'small' as any,
};
await expect(service.recommend(request as any)).rejects.toThrow(
'Stack must be a non-empty array'
);
});
it('should reject invalid scale value', async () => {
const request = {
stack: ['nginx'],
scale: 'invalid-scale' as any,
};
await expect(service.recommend(request as any)).rejects.toThrow(
'Invalid scale: invalid-scale'
);
});
it('should reject negative budget_max', async () => {
const request: RecommendationRequest = {
stack: ['nginx'],
scale: 'small',
budget_max: -10,
};
await expect(service.recommend(request)).rejects.toThrow(
'budget_max must be a positive number'
);
});
it('should reject zero budget_max', async () => {
const request: RecommendationRequest = {
stack: ['nginx'],
scale: 'small',
budget_max: 0,
};
await expect(service.recommend(request)).rejects.toThrow(
'budget_max must be a positive number'
);
});
it('should reject excessive budget_max', async () => {
const request: RecommendationRequest = {
stack: ['nginx'],
scale: 'small',
budget_max: 150000,
};
await expect(service.recommend(request)).rejects.toThrow(
'budget_max exceeds maximum allowed value'
);
});
it('should accept valid budget_max at boundary', async () => {
const request: RecommendationRequest = {
stack: ['nginx'],
scale: 'small',
budget_max: 100000,
};
mockDb.prepare.mockReturnValue({
bind: vi.fn().mockReturnThis(),
all: vi.fn().mockResolvedValue({ results: [] }),
});
await expect(service.recommend(request)).resolves.toBeDefined();
});
});
describe('scoreInstance (via recommend)', () => {
it('should score optimal memory fit with high score', async () => {
const request: RecommendationRequest = {

View File

@@ -14,15 +14,16 @@ import type {
RecommendationResponse,
InstanceRecommendation,
ResourceRequirements,
ScaleType,
} from '../types';
import { validateStack, calculateRequirements } from './stackConfig';
import { getAsiaRegionCodes, getRegionDisplayName } from './regionFilter';
import { logger } from '../utils/logger';
/**
* Database row interface for instance query results
* Database row interface for instance query results (raw from database)
*/
interface InstanceQueryRow {
interface InstanceQueryRowRaw {
id: number;
instance_id: string;
instance_name: string;
@@ -37,41 +38,61 @@ interface InstanceQueryRow {
monthly_price: number | null;
}
/**
* Database row interface for instance query results (with parsed metadata)
*/
interface InstanceQueryRow {
id: number;
instance_id: string;
instance_name: string;
vcpu: number;
memory_mb: number;
storage_gb: number;
metadata: { monthly_price?: number } | null;
provider_name: string;
region_code: string;
region_name: string;
hourly_price: number | null;
monthly_price: number | null;
}
/**
* Recommendation Service
* Calculates and ranks cloud instances based on stack requirements
*/
export class RecommendationService {
// Cache parsed metadata to avoid repeated JSON.parse calls
private metadataCache = new Map<number, { monthly_price?: number }>();
constructor(private db: D1Database) {}
/**
* Generate instance recommendations based on stack and scale
*
* Process:
* 1. Validate stack components
* 2. Calculate resource requirements
* 3. Query Asia-Pacific instances matching requirements
* 4. Score and rank instances
* 5. Return top 5 recommendations
* 1. Validate stack array
* 2. Validate stack components
* 3. Validate scale enum
* 4. Validate budget_max
* 5. Calculate resource requirements
* 6. Query Asia-Pacific instances matching requirements
* 7. Score and rank instances
* 8. Return top 5 recommendations
*
* @param request - Recommendation request with stack, scale, and budget
* @returns Recommendation response with requirements and ranked instances
* @throws Error if stack validation fails or database query fails
* @throws Error if validation fails or database query fails
*/
async recommend(request: RecommendationRequest): Promise<RecommendationResponse> {
// Clear metadata cache for new recommendation request
this.metadataCache.clear();
logger.info('[Recommendation] Processing request', {
stack: request.stack,
scale: request.scale,
budget_max: request.budget_max,
});
// 1. Validate stack components
// 1. Validate stack array
if (!Array.isArray(request.stack) || request.stack.length === 0) {
throw new Error('Stack must be a non-empty array');
}
// 2. Validate stack components
const validation = validateStack(request.stack);
if (!validation.valid) {
const errorMsg = `Invalid stacks: ${validation.invalidStacks.join(', ')}`;
@@ -81,24 +102,40 @@ export class RecommendationService {
throw new Error(errorMsg);
}
// 2. Calculate resource requirements based on stack and scale
// 3. Validate scale enum
const validScales: ScaleType[] = ['small', 'medium', 'large'];
if (!validScales.includes(request.scale)) {
throw new Error(`Invalid scale: ${request.scale}. Must be one of: ${validScales.join(', ')}`);
}
// 4. Validate budget_max (if provided)
if (request.budget_max !== undefined) {
if (typeof request.budget_max !== 'number' || request.budget_max <= 0) {
throw new Error('budget_max must be a positive number');
}
if (request.budget_max > 100000) {
throw new Error('budget_max exceeds maximum allowed value ($100,000)');
}
}
// 5. Calculate resource requirements based on stack and scale
const requirements = calculateRequirements(request.stack, request.scale);
logger.info('[Recommendation] Resource requirements calculated', {
min_memory_mb: requirements.min_memory_mb,
min_vcpu: requirements.min_vcpu,
});
// 3. Query instances from Asia-Pacific regions
// 6. Query instances from Asia-Pacific regions
const instances = await this.queryInstances(requirements, request.budget_max);
logger.info('[Recommendation] Found instances', { count: instances.length });
// 4. Calculate match scores and sort by score (highest first)
// 7. Calculate match scores and sort by score (highest first)
const scored = instances.map(inst =>
this.scoreInstance(inst, requirements, request.budget_max)
);
scored.sort((a, b) => b.match_score - a.match_score);
// 5. Return top 5 recommendations with rank
// 8. Return top 5 recommendations with rank
const recommendations = scored.slice(0, 5).map((inst, idx) => ({
...inst,
rank: idx + 1,
@@ -202,7 +239,30 @@ export class RecommendationService {
found: result.results?.length || 0,
});
return (result.results as unknown as InstanceQueryRow[]) || [];
// Parse metadata once during result transformation (not in hot scoring loop)
const rawResults = (result.results as unknown as InstanceQueryRowRaw[]) || [];
const parsedResults: InstanceQueryRow[] = rawResults.map(row => {
let parsedMetadata: { monthly_price?: number } | null = null;
if (row.metadata) {
try {
parsedMetadata = JSON.parse(row.metadata) as { monthly_price?: number };
} catch (error) {
logger.warn('[Recommendation] Failed to parse metadata', {
instance_id: row.instance_id,
instance_name: row.instance_name,
});
// Continue with null metadata
}
}
return {
...row,
metadata: parsedMetadata,
};
});
return parsedResults;
} catch (error) {
logger.error('[Recommendation] Query failed', { error });
throw new Error('Failed to query instances from database');
@@ -318,10 +378,10 @@ export class RecommendationService {
*
* Pricing sources:
* 1. Direct monthly_price column (Linode)
* 2. metadata JSON field (Vultr, AWS) - cached to avoid repeated JSON.parse
* 2. Pre-parsed metadata field (Vultr, AWS)
* 3. Calculate from hourly_price if available
*
* @param instance - Instance query result
* @param instance - Instance query result with pre-parsed metadata
* @returns Monthly price in USD, or 0 if not available
*/
private getMonthlyPrice(instance: InstanceQueryRow): number {
@@ -330,27 +390,9 @@ export class RecommendationService {
return instance.monthly_price;
}
// Extract from metadata (Vultr, AWS) with caching
if (instance.metadata) {
// Check cache first
if (!this.metadataCache.has(instance.id)) {
try {
const meta = JSON.parse(instance.metadata) as { monthly_price?: number };
this.metadataCache.set(instance.id, meta);
} catch (error) {
logger.warn('[Recommendation] Failed to parse metadata', {
instance: instance.instance_name,
error,
});
// Cache empty object to prevent repeated parse attempts
this.metadataCache.set(instance.id, {});
}
}
const cachedMeta = this.metadataCache.get(instance.id);
if (cachedMeta?.monthly_price) {
return cachedMeta.monthly_price;
}
// Extract from pre-parsed metadata (Vultr, AWS)
if (instance.metadata?.monthly_price) {
return instance.metadata.monthly_price;
}
// Calculate from hourly price (730 hours per month average)

View File

@@ -8,17 +8,16 @@
* - Batch operations for efficiency
*
* @example
* const orchestrator = new SyncOrchestrator(db, vault);
* const orchestrator = new SyncOrchestrator(db, env);
* const report = await orchestrator.syncAll(['linode']);
*/
import { VaultClient } from '../connectors/vault';
import { LinodeConnector } from '../connectors/linode';
import { VultrConnector } from '../connectors/vultr';
import { AWSConnector } from '../connectors/aws';
import { RepositoryFactory } from '../repositories';
import { createLogger } from '../utils/logger';
import { calculateRetailHourly, calculateRetailMonthly } from '../constants';
import { calculateRetailHourly, calculateRetailMonthly, SUPPORTED_PROVIDERS } from '../constants';
import type {
Env,
ProviderSyncResult,
@@ -28,9 +27,34 @@ import type {
PricingInput,
GpuInstanceInput,
GpuPricingInput,
G8InstanceInput,
G8PricingInput,
VpuInstanceInput,
VpuPricingInput,
} from '../types';
import { SyncStage } from '../types';
/**
* Wraps a promise with a timeout
* @param promise - The promise to wrap
* @param ms - Timeout in milliseconds
* @param operation - Operation name for error message
* @returns Promise result if completed within timeout
* @throws Error if operation times out
*/
async function withTimeout<T>(promise: Promise<T>, ms: number, operation: string): Promise<T> {
let timeoutId: ReturnType<typeof setTimeout>;
const timeoutPromise = new Promise<never>((_, reject) => {
timeoutId = setTimeout(() => reject(new Error(`${operation} timed out after ${ms}ms`)), ms);
});
try {
return await Promise.race([promise, timeoutPromise]);
} finally {
clearTimeout(timeoutId!);
}
}
/**
* Cloud provider connector interface for SyncOrchestrator
*
@@ -53,10 +77,10 @@ export interface SyncConnectorAdapter {
getGpuInstances?(): Promise<GpuInstanceInput[]>;
/** Fetch G8 instances (optional, only for Linode) */
getG8Instances?(): Promise<any[]>;
getG8Instances?(): Promise<G8InstanceInput[]>;
/** Fetch VPU instances (optional, only for Linode) */
getVpuInstances?(): Promise<any[]>;
getVpuInstances?(): Promise<VpuInstanceInput[]>;
/**
* Fetch pricing data for instances and regions
@@ -84,15 +108,12 @@ export interface SyncConnectorAdapter {
export class SyncOrchestrator {
private repos: RepositoryFactory;
private logger: ReturnType<typeof createLogger>;
private env?: Env;
constructor(
db: D1Database,
private vault: VaultClient,
env?: Env
private env: Env
) {
this.repos = new RepositoryFactory(db, env);
this.env = env;
this.logger = createLogger('[SyncOrchestrator]', env);
this.logger.info('Initialized');
}
@@ -121,21 +142,20 @@ export class SyncOrchestrator {
await this.repos.providers.updateSyncStatus(provider, 'syncing');
this.logger.info(`${provider}${stage}`);
// Stage 2: Fetch credentials from Vault
stage = SyncStage.FETCH_CREDENTIALS;
// Stage 2: Initialize connector and authenticate
const connector = await this.createConnector(provider, providerRecord.id);
await connector.authenticate();
this.logger.info(`${provider}${stage}`);
await withTimeout(connector.authenticate(), 10000, `${provider} authentication`);
this.logger.info(`${provider}initialized`);
// Stage 3: Fetch regions from provider API
stage = SyncStage.FETCH_REGIONS;
const regions = await connector.getRegions();
const regions = await withTimeout(connector.getRegions(), 15000, `${provider} fetch regions`);
this.logger.info(`${provider}${stage}`, { regions: regions.length });
// Stage 4: Fetch instance types from provider API
stage = SyncStage.FETCH_INSTANCES;
const instances = await connector.getInstanceTypes();
const instances = await withTimeout(connector.getInstanceTypes(), 30000, `${provider} fetch instances`);
this.logger.info(`${provider}${stage}`, { instances: instances.length });
// Stage 5: Normalize data (add provider_id)
@@ -170,8 +190,8 @@ export class SyncOrchestrator {
if (provider.toLowerCase() === 'linode') {
// GPU instances
if ('getGpuInstances' in connector) {
const gpuInstances = await (connector as any).getGpuInstances();
if (connector.getGpuInstances) {
const gpuInstances = await withTimeout(connector.getGpuInstances(), 15000, `${provider} fetch GPU instances`);
if (gpuInstances && gpuInstances.length > 0) {
gpuInstancesCount = await this.repos.gpuInstances.upsertMany(
providerRecord.id,
@@ -181,8 +201,8 @@ export class SyncOrchestrator {
}
// G8 instances
if ('getG8Instances' in connector) {
const g8Instances = await (connector as any).getG8Instances();
if (connector.getG8Instances) {
const g8Instances = await withTimeout(connector.getG8Instances(), 15000, `${provider} fetch G8 instances`);
if (g8Instances && g8Instances.length > 0) {
g8InstancesCount = await this.repos.g8Instances.upsertMany(
providerRecord.id,
@@ -192,8 +212,8 @@ export class SyncOrchestrator {
}
// VPU instances
if ('getVpuInstances' in connector) {
const vpuInstances = await (connector as any).getVpuInstances();
if (connector.getVpuInstances) {
const vpuInstances = await withTimeout(connector.getVpuInstances(), 15000, `${provider} fetch VPU instances`);
if (vpuInstances && vpuInstances.length > 0) {
vpuInstancesCount = await this.repos.vpuInstances.upsertMany(
providerRecord.id,
@@ -205,8 +225,8 @@ export class SyncOrchestrator {
// Handle Vultr GPU instances
if (provider.toLowerCase() === 'vultr') {
if ('getGpuInstances' in connector) {
const gpuInstances = await (connector as any).getGpuInstances();
if (connector.getGpuInstances) {
const gpuInstances = await withTimeout(connector.getGpuInstances(), 15000, `${provider} fetch GPU instances`);
if (gpuInstances && gpuInstances.length > 0) {
gpuInstancesCount = await this.repos.gpuInstances.upsertMany(
providerRecord.id,
@@ -234,9 +254,27 @@ export class SyncOrchestrator {
throw new Error('Failed to fetch regions/instances for pricing');
}
// Type-safe extraction of IDs and mapping data from batch results
const regionIds = (dbRegionsResult.results as Array<{ id: number }>).map(r => r.id);
const dbInstancesData = dbInstancesResult.results as Array<{ id: number; instance_id: string }>;
// Validate and extract region IDs
if (!Array.isArray(dbRegionsResult.results)) {
throw new Error('Unexpected database result format for regions');
}
const regionIds = dbRegionsResult.results.map((r: any) => {
if (typeof r?.id !== 'number') {
throw new Error('Invalid region id in database result');
}
return r.id;
});
// Validate and extract instance type data
if (!Array.isArray(dbInstancesResult.results)) {
throw new Error('Unexpected database result format for instances');
}
const dbInstancesData = dbInstancesResult.results.map((i: any) => {
if (typeof i?.id !== 'number' || typeof i?.instance_id !== 'string') {
throw new Error('Invalid instance data in database result');
}
return { id: i.id, instance_id: i.instance_id };
});
const instanceTypeIds = dbInstancesData.map(i => i.id);
// Create instance mapping to avoid redundant queries in getPricing
@@ -244,26 +282,56 @@ export class SyncOrchestrator {
dbInstancesData.map(i => [i.id, { instance_id: i.instance_id }])
);
// Create specialized instance mappings
// Create specialized instance mappings with validation
if (!Array.isArray(dbGpuResult.results)) {
throw new Error('Unexpected database result format for GPU instances');
}
const dbGpuMap = new Map(
(dbGpuResult.results as Array<{ id: number; instance_id: string }>).map(i => [i.id, { instance_id: i.instance_id }])
dbGpuResult.results.map((i: any) => {
if (typeof i?.id !== 'number' || typeof i?.instance_id !== 'string') {
throw new Error('Invalid GPU instance data in database result');
}
return [i.id, { instance_id: i.instance_id }];
})
);
if (!Array.isArray(dbG8Result.results)) {
throw new Error('Unexpected database result format for G8 instances');
}
const dbG8Map = new Map(
(dbG8Result.results as Array<{ id: number; instance_id: string }>).map(i => [i.id, { instance_id: i.instance_id }])
dbG8Result.results.map((i: any) => {
if (typeof i?.id !== 'number' || typeof i?.instance_id !== 'string') {
throw new Error('Invalid G8 instance data in database result');
}
return [i.id, { instance_id: i.instance_id }];
})
);
if (!Array.isArray(dbVpuResult.results)) {
throw new Error('Unexpected database result format for VPU instances');
}
const dbVpuMap = new Map(
(dbVpuResult.results as Array<{ id: number; instance_id: string }>).map(i => [i.id, { instance_id: i.instance_id }])
dbVpuResult.results.map((i: any) => {
if (typeof i?.id !== 'number' || typeof i?.instance_id !== 'string') {
throw new Error('Invalid VPU instance data in database result');
}
return [i.id, { instance_id: i.instance_id }];
})
);
// Get pricing data - may return array or count depending on provider
// Pass all instance maps for specialized pricing
const pricingResult = await connector.getPricing(
instanceTypeIds,
regionIds,
dbInstanceMap,
dbGpuMap,
dbG8Map,
dbVpuMap
const pricingResult = await withTimeout(
connector.getPricing(
instanceTypeIds,
regionIds,
dbInstanceMap,
dbGpuMap,
dbG8Map,
dbVpuMap
),
60000,
`${provider} fetch pricing`
);
// Handle both return types: array (Linode, Vultr) or number (AWS with generator)
@@ -294,7 +362,7 @@ export class SyncOrchestrator {
this.logger.info(`${provider}${stage}`);
// Stage 8: Sync Anvil Pricing (if applicable)
stage = 'SYNC_ANVIL_PRICING' as SyncStage;
stage = SyncStage.SYNC_ANVIL_PRICING;
let anvilPricingCount = 0;
try {
anvilPricingCount = await this.syncAnvilPricing(provider);
@@ -349,7 +417,7 @@ export class SyncOrchestrator {
error_details: {
stage,
message: errorMessage,
stack: error instanceof Error ? error.stack : undefined,
// Stack trace logged server-side only, not exposed to clients
},
};
}
@@ -357,46 +425,51 @@ export class SyncOrchestrator {
/**
* Synchronize all providers
* Runs synchronizations in parallel for efficiency
*
* IMPORTANT: Providers are synced sequentially (not in parallel) to avoid
* exceeding Cloudflare Workers' 30-second CPU time limit. Each provider
* sync involves multiple API calls and database operations.
*
* For production deployments with large datasets, consider using
* Cloudflare Queues to process each provider as a separate job.
*
* @param providers - Array of provider names to sync (defaults to all supported providers)
* @returns Complete sync report with statistics
*/
async syncAll(providers = ['linode', 'vultr', 'aws']): Promise<SyncReport> {
async syncAll(providers: string[] = [...SUPPORTED_PROVIDERS]): Promise<SyncReport> {
const startedAt = new Date().toISOString();
const startTime = Date.now();
this.logger.info('Starting sync for providers', { providers: providers.join(', ') });
this.logger.info('Starting sequential sync for providers', { providers: providers.join(', ') });
// Run all provider syncs in parallel
const results = await Promise.allSettled(
providers.map(p => this.syncProvider(p))
);
// Run provider syncs sequentially to avoid CPU timeout
// Each provider sync is independent and can complete within time limits
const providerResults: ProviderSyncResult[] = [];
// Extract results
const providerResults: ProviderSyncResult[] = results.map((result, index) => {
if (result.status === 'fulfilled') {
return result.value;
} else {
// Handle rejected promises
const provider = providers[index];
const errorMessage = result.reason instanceof Error
? result.reason.message
: 'Unknown error';
for (const provider of providers) {
try {
const result = await this.syncProvider(provider);
providerResults.push(result);
this.logger.error(`${provider} promise rejected`, { error: result.reason instanceof Error ? result.reason.message : String(result.reason) });
return {
// Log progress after each provider
this.logger.info('Provider sync completed', {
provider,
success: result.success,
elapsed_ms: Date.now() - startTime
});
} catch (error) {
// Handle unexpected errors
providerResults.push({
provider,
success: false,
regions_synced: 0,
instances_synced: 0,
pricing_synced: 0,
duration_ms: 0,
error: errorMessage,
};
error: error instanceof Error ? error.message : 'Unknown error',
});
}
});
}
const completedAt = new Date().toISOString();
const totalDuration = Date.now() - startTime;
@@ -452,7 +525,7 @@ export class SyncOrchestrator {
dbInstanceMap: Map<number, { instance_id: string }>,
rawInstanceMap: Map<string, { Cost: number; MonthlyPrice: number }>
): Generator<PricingInput[], void, void> {
const BATCH_SIZE = 100;
const BATCH_SIZE = 500;
let batch: PricingInput[] = [];
for (const regionId of regionIds) {
@@ -515,7 +588,10 @@ export class SyncOrchestrator {
rawInstanceMap: Map<string, { id: string; price: { hourly: number; monthly: number } }>,
env?: Env
): Generator<PricingInput[], void, void> {
const BATCH_SIZE = parseInt(env?.SYNC_BATCH_SIZE || '100', 10);
const BATCH_SIZE = Math.min(
Math.max(parseInt(env?.SYNC_BATCH_SIZE || '500', 10) || 500, 1),
1000
);
let batch: PricingInput[] = [];
for (const regionId of regionIds) {
@@ -578,7 +654,10 @@ export class SyncOrchestrator {
rawPlanMap: Map<string, { id: string; monthly_cost: number }>,
env?: Env
): Generator<PricingInput[], void, void> {
const BATCH_SIZE = parseInt(env?.SYNC_BATCH_SIZE || '100', 10);
const BATCH_SIZE = Math.min(
Math.max(parseInt(env?.SYNC_BATCH_SIZE || '500', 10) || 500, 1),
1000
);
let batch: PricingInput[] = [];
for (const regionId of regionIds) {
@@ -644,7 +723,10 @@ export class SyncOrchestrator {
rawInstanceMap: Map<string, { id: string; price: { hourly: number; monthly: number } }>,
env?: Env
): Generator<GpuPricingInput[], void, void> {
const BATCH_SIZE = parseInt(env?.SYNC_BATCH_SIZE || '100', 10);
const BATCH_SIZE = Math.min(
Math.max(parseInt(env?.SYNC_BATCH_SIZE || '500', 10) || 500, 1),
1000
);
let batch: GpuPricingInput[] = [];
for (const regionId of regionIds) {
@@ -707,7 +789,10 @@ export class SyncOrchestrator {
rawPlanMap: Map<string, { id: string; monthly_cost: number }>,
env?: Env
): Generator<GpuPricingInput[], void, void> {
const BATCH_SIZE = parseInt(env?.SYNC_BATCH_SIZE || '100', 10);
const BATCH_SIZE = Math.min(
Math.max(parseInt(env?.SYNC_BATCH_SIZE || '500', 10) || 500, 1),
1000
);
let batch: GpuPricingInput[] = [];
for (const regionId of regionIds) {
@@ -759,9 +844,12 @@ export class SyncOrchestrator {
dbG8InstanceMap: Map<number, { instance_id: string }>,
rawInstanceMap: Map<string, { id: string; price: { hourly: number; monthly: number } }>,
env?: Env
): Generator<any[], void, void> {
const BATCH_SIZE = parseInt(env?.SYNC_BATCH_SIZE || '100', 10);
let batch: any[] = [];
): Generator<G8PricingInput[], void, void> {
const BATCH_SIZE = Math.min(
Math.max(parseInt(env?.SYNC_BATCH_SIZE || '500', 10) || 500, 1),
1000
);
let batch: G8PricingInput[] = [];
for (const regionId of regionIds) {
for (const g8InstanceId of g8InstanceTypeIds) {
@@ -809,9 +897,12 @@ export class SyncOrchestrator {
dbVpuInstanceMap: Map<number, { instance_id: string }>,
rawInstanceMap: Map<string, { id: string; price: { hourly: number; monthly: number } }>,
env?: Env
): Generator<any[], void, void> {
const BATCH_SIZE = parseInt(env?.SYNC_BATCH_SIZE || '100', 10);
let batch: any[] = [];
): Generator<VpuPricingInput[], void, void> {
const BATCH_SIZE = Math.min(
Math.max(parseInt(env?.SYNC_BATCH_SIZE || '500', 10) || 500, 1),
1000
);
let batch: VpuPricingInput[] = [];
for (const regionId of regionIds) {
for (const vpuInstanceId of vpuInstanceTypeIds) {
@@ -910,37 +1001,56 @@ export class SyncOrchestrator {
count: anvilPricingRecords.length
});
// Step 4: Fetch source pricing data in batch
const sourcePricingResult = await this.repos.db
.prepare(`
SELECT
instance_type_id,
region_id,
hourly_price,
monthly_price
FROM pricing
WHERE instance_type_id IN (${anvilPricingRecords.map(() => '?').join(',')})
AND region_id IN (${anvilPricingRecords.map(() => '?').join(',')})
`)
.bind(
...anvilPricingRecords.map(r => r.source_instance_id),
...anvilPricingRecords.map(r => r.source_region_id)
)
.all<{
instance_type_id: number;
region_id: number;
hourly_price: number;
monthly_price: number;
}>();
// Step 4: Fetch source pricing data with paired conditions
// Batch queries to avoid SQLite limits (max 100 pairs per query)
const CHUNK_SIZE = 100;
const allSourcePricing: Array<{
instance_type_id: number;
region_id: number;
hourly_price: number;
monthly_price: number;
}> = [];
if (!sourcePricingResult.success || sourcePricingResult.results.length === 0) {
for (let i = 0; i < anvilPricingRecords.length; i += CHUNK_SIZE) {
const chunk = anvilPricingRecords.slice(i, i + CHUNK_SIZE);
if (chunk.length === 0) continue;
const conditions = chunk
.map(() => '(instance_type_id = ? AND region_id = ?)')
.join(' OR ');
const params = chunk.flatMap(r => [r.source_instance_id, r.source_region_id]);
const chunkResult = await this.repos.db
.prepare(`
SELECT
instance_type_id,
region_id,
hourly_price,
monthly_price
FROM pricing
WHERE ${conditions}
`)
.bind(...params)
.all<{
instance_type_id: number;
region_id: number;
hourly_price: number;
monthly_price: number;
}>();
if (chunkResult.success && chunkResult.results) {
allSourcePricing.push(...chunkResult.results);
}
}
if (allSourcePricing.length === 0) {
this.logger.warn('No source pricing data found', { provider });
return 0;
}
// Step 5: Build lookup map: `${instance_type_id}_${region_id}` → pricing
const sourcePricingMap = new Map<string, { hourly_price: number; monthly_price: number }>(
sourcePricingResult.results.map(p => [
allSourcePricing.map(p => [
`${p.instance_type_id}_${p.region_id}`,
{ hourly_price: p.hourly_price, monthly_price: p.monthly_price }
])
@@ -1021,7 +1131,7 @@ export class SyncOrchestrator {
private async createConnector(provider: string, providerId: number): Promise<SyncConnectorAdapter> {
switch (provider.toLowerCase()) {
case 'linode': {
const connector = new LinodeConnector(this.vault);
const connector = new LinodeConnector(this.env);
// Cache instance types for pricing extraction
let cachedInstanceTypes: Awaited<ReturnType<typeof connector.fetchInstanceTypes>> | null = null;
@@ -1059,7 +1169,7 @@ export class SyncOrchestrator {
const gpuInstances = cachedInstanceTypes.filter(i => i.gpus > 0);
return gpuInstances.map(i => connector.normalizeGpuInstance(i, providerId));
},
getG8Instances: async (): Promise<any[]> => {
getG8Instances: async (): Promise<G8InstanceInput[]> => {
// Use cached instances if available to avoid redundant API calls
if (!cachedInstanceTypes) {
this.logger.info('Fetching instance types for G8 extraction');
@@ -1072,7 +1182,7 @@ export class SyncOrchestrator {
);
return g8Instances.map(i => connector.normalizeG8Instance(i, providerId));
},
getVpuInstances: async (): Promise<any[]> => {
getVpuInstances: async (): Promise<VpuInstanceInput[]> => {
// Use cached instances if available to avoid redundant API calls
if (!cachedInstanceTypes) {
this.logger.info('Fetching instance types for VPU extraction');
@@ -1239,7 +1349,7 @@ export class SyncOrchestrator {
}
case 'vultr': {
const connector = new VultrConnector(this.vault);
const connector = new VultrConnector(this.env);
// Cache plans for pricing extraction
let cachedPlans: Awaited<ReturnType<typeof connector.fetchPlans>> | null = null;
@@ -1356,7 +1466,7 @@ export class SyncOrchestrator {
}
case 'aws': {
const connector = new AWSConnector(this.vault);
const connector = new AWSConnector(this.env);
// Cache instance types for pricing extraction
let cachedInstanceTypes: Awaited<ReturnType<typeof connector.fetchInstanceTypes>> | null = null;