Initial commit: Cloud Instances API
Multi-cloud VM instance database with Cloudflare Workers - Linode, Vultr, AWS connector integration - D1 database with regions, instances, pricing - Query API with filtering, caching, pagination - Cron-based auto-sync (daily + 6-hourly) - Health monitoring endpoint Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
362
src/services/sync.ts
Normal file
362
src/services/sync.ts
Normal file
@@ -0,0 +1,362 @@
|
||||
/**
|
||||
* Sync Service - Orchestrates synchronization of cloud provider data
|
||||
*
|
||||
* Features:
|
||||
* - Multi-provider synchronization (Linode, Vultr, AWS)
|
||||
* - Stage-based sync process with error recovery
|
||||
* - Provider status tracking and reporting
|
||||
* - Batch operations for efficiency
|
||||
*
|
||||
* @example
|
||||
* const orchestrator = new SyncOrchestrator(db, vault);
|
||||
* const report = await orchestrator.syncAll(['linode']);
|
||||
*/
|
||||
|
||||
import { VaultClient } from '../connectors/vault';
|
||||
import { LinodeConnector } from '../connectors/linode';
|
||||
import { VultrConnector } from '../connectors/vultr';
|
||||
import { AWSConnector } from '../connectors/aws';
|
||||
import { RepositoryFactory } from '../repositories';
|
||||
import type {
|
||||
ProviderSyncResult,
|
||||
SyncReport,
|
||||
RegionInput,
|
||||
InstanceTypeInput,
|
||||
PricingInput,
|
||||
} from '../types';
|
||||
|
||||
/**
|
||||
* Synchronization stages
|
||||
*/
|
||||
export enum SyncStage {
|
||||
INIT = 'init',
|
||||
FETCH_CREDENTIALS = 'fetch_credentials',
|
||||
FETCH_REGIONS = 'fetch_regions',
|
||||
FETCH_INSTANCES = 'fetch_instances',
|
||||
NORMALIZE = 'normalize',
|
||||
PERSIST = 'persist',
|
||||
VALIDATE = 'validate',
|
||||
COMPLETE = 'complete',
|
||||
}
|
||||
|
||||
/**
|
||||
* Cloud provider connector interface
|
||||
* All provider connectors must implement this interface
|
||||
*/
|
||||
export interface CloudConnector {
|
||||
/** Authenticate and validate credentials */
|
||||
authenticate(): Promise<void>;
|
||||
|
||||
/** Fetch all available regions */
|
||||
getRegions(): Promise<RegionInput[]>;
|
||||
|
||||
/** Fetch all instance types */
|
||||
getInstanceTypes(): Promise<InstanceTypeInput[]>;
|
||||
|
||||
/** Fetch pricing data for instances and regions */
|
||||
getPricing(instanceTypeIds: number[], regionIds: number[]): Promise<PricingInput[]>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sync orchestrator for managing provider synchronization
|
||||
*/
|
||||
export class SyncOrchestrator {
|
||||
private repos: RepositoryFactory;
|
||||
|
||||
constructor(
|
||||
db: D1Database,
|
||||
private vault: VaultClient
|
||||
) {
|
||||
this.repos = new RepositoryFactory(db);
|
||||
console.log('[SyncOrchestrator] Initialized');
|
||||
}
|
||||
|
||||
/**
|
||||
* Synchronize a single provider
|
||||
*
|
||||
* @param provider - Provider name (linode, vultr, aws)
|
||||
* @returns Sync result with statistics and error information
|
||||
*/
|
||||
async syncProvider(provider: string): Promise<ProviderSyncResult> {
|
||||
const startTime = Date.now();
|
||||
let stage = SyncStage.INIT;
|
||||
|
||||
console.log(`[SyncOrchestrator] Starting sync for provider: ${provider}`);
|
||||
|
||||
try {
|
||||
// Stage 1: Initialize - Update provider status to syncing
|
||||
stage = SyncStage.INIT;
|
||||
await this.repos.providers.updateSyncStatus(provider, 'syncing');
|
||||
console.log(`[SyncOrchestrator] ${provider} → ${stage}`);
|
||||
|
||||
// Stage 2: Fetch credentials from Vault
|
||||
stage = SyncStage.FETCH_CREDENTIALS;
|
||||
const connector = await this.createConnector(provider);
|
||||
await connector.authenticate();
|
||||
console.log(`[SyncOrchestrator] ${provider} → ${stage}`);
|
||||
|
||||
// Get provider record
|
||||
const providerRecord = await this.repos.providers.findByName(provider);
|
||||
if (!providerRecord) {
|
||||
throw new Error(`Provider not found in database: ${provider}`);
|
||||
}
|
||||
|
||||
// Stage 3: Fetch regions from provider API
|
||||
stage = SyncStage.FETCH_REGIONS;
|
||||
const regions = await connector.getRegions();
|
||||
console.log(`[SyncOrchestrator] ${provider} → ${stage} (${regions.length} regions)`);
|
||||
|
||||
// Stage 4: Fetch instance types from provider API
|
||||
stage = SyncStage.FETCH_INSTANCES;
|
||||
const instances = await connector.getInstanceTypes();
|
||||
console.log(`[SyncOrchestrator] ${provider} → ${stage} (${instances.length} instances)`);
|
||||
|
||||
// Stage 5: Normalize data (add provider_id)
|
||||
stage = SyncStage.NORMALIZE;
|
||||
const normalizedRegions = regions.map(r => ({
|
||||
...r,
|
||||
provider_id: providerRecord.id,
|
||||
}));
|
||||
const normalizedInstances = instances.map(i => ({
|
||||
...i,
|
||||
provider_id: providerRecord.id,
|
||||
}));
|
||||
console.log(`[SyncOrchestrator] ${provider} → ${stage}`);
|
||||
|
||||
// Stage 6: Persist to database
|
||||
stage = SyncStage.PERSIST;
|
||||
const regionsCount = await this.repos.regions.upsertMany(
|
||||
providerRecord.id,
|
||||
normalizedRegions
|
||||
);
|
||||
const instancesCount = await this.repos.instances.upsertMany(
|
||||
providerRecord.id,
|
||||
normalizedInstances
|
||||
);
|
||||
|
||||
// Fetch pricing data - need instance and region IDs from DB
|
||||
const dbRegions = await this.repos.regions.findByProvider(providerRecord.id);
|
||||
const dbInstances = await this.repos.instances.findByProvider(providerRecord.id);
|
||||
|
||||
const regionIds = dbRegions.map(r => r.id);
|
||||
const instanceTypeIds = dbInstances.map(i => i.id);
|
||||
|
||||
const pricing = await connector.getPricing(instanceTypeIds, regionIds);
|
||||
const pricingCount = await this.repos.pricing.upsertMany(pricing);
|
||||
|
||||
console.log(`[SyncOrchestrator] ${provider} → ${stage} (regions: ${regionsCount}, instances: ${instancesCount}, pricing: ${pricingCount})`);
|
||||
|
||||
// Stage 7: Validate
|
||||
stage = SyncStage.VALIDATE;
|
||||
if (regionsCount === 0 || instancesCount === 0) {
|
||||
throw new Error('No data was synced - possible API or parsing issue');
|
||||
}
|
||||
console.log(`[SyncOrchestrator] ${provider} → ${stage}`);
|
||||
|
||||
// Stage 8: Complete - Update provider status to success
|
||||
stage = SyncStage.COMPLETE;
|
||||
await this.repos.providers.updateSyncStatus(provider, 'success');
|
||||
|
||||
const duration = Date.now() - startTime;
|
||||
console.log(`[SyncOrchestrator] ${provider} → ${stage} (${duration}ms)`);
|
||||
|
||||
return {
|
||||
provider,
|
||||
success: true,
|
||||
regions_synced: regionsCount,
|
||||
instances_synced: instancesCount,
|
||||
pricing_synced: pricingCount,
|
||||
duration_ms: duration,
|
||||
};
|
||||
|
||||
} catch (error) {
|
||||
const duration = Date.now() - startTime;
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
|
||||
|
||||
console.error(`[SyncOrchestrator] ${provider} failed at ${stage}:`, error);
|
||||
|
||||
// Update provider status to error
|
||||
try {
|
||||
await this.repos.providers.updateSyncStatus(provider, 'error', errorMessage);
|
||||
} catch (statusError) {
|
||||
console.error(`[SyncOrchestrator] Failed to update provider status:`, statusError);
|
||||
}
|
||||
|
||||
return {
|
||||
provider,
|
||||
success: false,
|
||||
regions_synced: 0,
|
||||
instances_synced: 0,
|
||||
pricing_synced: 0,
|
||||
duration_ms: duration,
|
||||
error: errorMessage,
|
||||
error_details: {
|
||||
stage,
|
||||
message: errorMessage,
|
||||
stack: error instanceof Error ? error.stack : undefined,
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Synchronize all providers
|
||||
* Runs synchronizations in parallel for efficiency
|
||||
*
|
||||
* @param providers - Array of provider names to sync (defaults to all supported providers)
|
||||
* @returns Complete sync report with statistics
|
||||
*/
|
||||
async syncAll(providers = ['linode', 'vultr', 'aws']): Promise<SyncReport> {
|
||||
const startedAt = new Date().toISOString();
|
||||
const startTime = Date.now();
|
||||
|
||||
console.log(`[SyncOrchestrator] Starting sync for providers: ${providers.join(', ')}`);
|
||||
|
||||
// Run all provider syncs in parallel
|
||||
const results = await Promise.allSettled(
|
||||
providers.map(p => this.syncProvider(p))
|
||||
);
|
||||
|
||||
// Extract results
|
||||
const providerResults: ProviderSyncResult[] = results.map((result, index) => {
|
||||
if (result.status === 'fulfilled') {
|
||||
return result.value;
|
||||
} else {
|
||||
// Handle rejected promises
|
||||
const provider = providers[index];
|
||||
const errorMessage = result.reason instanceof Error
|
||||
? result.reason.message
|
||||
: 'Unknown error';
|
||||
|
||||
console.error(`[SyncOrchestrator] ${provider} promise rejected:`, result.reason);
|
||||
|
||||
return {
|
||||
provider,
|
||||
success: false,
|
||||
regions_synced: 0,
|
||||
instances_synced: 0,
|
||||
pricing_synced: 0,
|
||||
duration_ms: 0,
|
||||
error: errorMessage,
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
const completedAt = new Date().toISOString();
|
||||
const totalDuration = Date.now() - startTime;
|
||||
|
||||
// Calculate summary
|
||||
const successful = providerResults.filter(r => r.success);
|
||||
const failed = providerResults.filter(r => !r.success);
|
||||
|
||||
const summary = {
|
||||
total_providers: providers.length,
|
||||
successful_providers: successful.length,
|
||||
failed_providers: failed.length,
|
||||
total_regions: providerResults.reduce((sum, r) => sum + r.regions_synced, 0),
|
||||
total_instances: providerResults.reduce((sum, r) => sum + r.instances_synced, 0),
|
||||
total_pricing: providerResults.reduce((sum, r) => sum + r.pricing_synced, 0),
|
||||
};
|
||||
|
||||
const report: SyncReport = {
|
||||
success: failed.length === 0,
|
||||
started_at: startedAt,
|
||||
completed_at: completedAt,
|
||||
total_duration_ms: totalDuration,
|
||||
providers: providerResults,
|
||||
summary,
|
||||
};
|
||||
|
||||
console.log(`[SyncOrchestrator] Sync complete:`, {
|
||||
total: summary.total_providers,
|
||||
success: summary.successful_providers,
|
||||
failed: summary.failed_providers,
|
||||
duration: `${totalDuration}ms`,
|
||||
});
|
||||
|
||||
return report;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create connector for a specific provider
|
||||
*
|
||||
* @param provider - Provider name
|
||||
* @returns Connector instance for the provider
|
||||
* @throws Error if provider is not supported
|
||||
*/
|
||||
private async createConnector(provider: string): Promise<CloudConnector> {
|
||||
switch (provider.toLowerCase()) {
|
||||
case 'linode': {
|
||||
const connector = new LinodeConnector(this.vault);
|
||||
return {
|
||||
authenticate: () => connector.initialize(),
|
||||
getRegions: async () => {
|
||||
const regions = await connector.fetchRegions();
|
||||
const providerRecord = await this.repos.providers.findByName('linode');
|
||||
const providerId = providerRecord?.id ?? 0;
|
||||
return regions.map(r => connector.normalizeRegion(r, providerId));
|
||||
},
|
||||
getInstanceTypes: async () => {
|
||||
const instances = await connector.fetchInstanceTypes();
|
||||
const providerRecord = await this.repos.providers.findByName('linode');
|
||||
const providerId = providerRecord?.id ?? 0;
|
||||
return instances.map(i => connector.normalizeInstance(i, providerId));
|
||||
},
|
||||
getPricing: async () => {
|
||||
// Linode pricing is included in instance types
|
||||
return [];
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
case 'vultr': {
|
||||
const connector = new VultrConnector(this.vault);
|
||||
return {
|
||||
authenticate: () => connector.initialize(),
|
||||
getRegions: async () => {
|
||||
const regions = await connector.fetchRegions();
|
||||
const providerRecord = await this.repos.providers.findByName('vultr');
|
||||
const providerId = providerRecord?.id ?? 0;
|
||||
return regions.map(r => connector.normalizeRegion(r, providerId));
|
||||
},
|
||||
getInstanceTypes: async () => {
|
||||
const plans = await connector.fetchPlans();
|
||||
const providerRecord = await this.repos.providers.findByName('vultr');
|
||||
const providerId = providerRecord?.id ?? 0;
|
||||
return plans.map(p => connector.normalizeInstance(p, providerId));
|
||||
},
|
||||
getPricing: async () => {
|
||||
// Vultr pricing is included in plans
|
||||
return [];
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
case 'aws': {
|
||||
const connector = new AWSConnector(this.vault);
|
||||
return {
|
||||
authenticate: () => connector.initialize(),
|
||||
getRegions: async () => {
|
||||
const regions = await connector.fetchRegions();
|
||||
const providerRecord = await this.repos.providers.findByName('aws');
|
||||
const providerId = providerRecord?.id ?? 0;
|
||||
return regions.map(r => connector.normalizeRegion(r, providerId));
|
||||
},
|
||||
getInstanceTypes: async () => {
|
||||
const instances = await connector.fetchInstanceTypes();
|
||||
const providerRecord = await this.repos.providers.findByName('aws');
|
||||
const providerId = providerRecord?.id ?? 0;
|
||||
return instances.map(i => connector.normalizeInstance(i, providerId));
|
||||
},
|
||||
getPricing: async () => {
|
||||
// AWS pricing is included in instance types from ec2.shop
|
||||
return [];
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
default:
|
||||
throw new Error(`Unsupported provider: ${provider}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user