| const axios = require('axios') |
| const ccrAccountService = require('./ccrAccountService') |
| const logger = require('../utils/logger') |
| const config = require('../../config/config') |
| const { parseVendorPrefixedModel } = require('../utils/modelHelper') |
|
|
| class CcrRelayService { |
| constructor() { |
| this.defaultUserAgent = 'claude-relay-service/1.0.0' |
| } |
|
|
| |
| async relayRequest( |
| requestBody, |
| apiKeyData, |
| clientRequest, |
| clientResponse, |
| clientHeaders, |
| accountId, |
| options = {} |
| ) { |
| let abortController = null |
| let account = null |
|
|
| try { |
| |
| account = await ccrAccountService.getAccount(accountId) |
| if (!account) { |
| throw new Error('CCR account not found') |
| } |
|
|
| logger.info( |
| `📤 Processing CCR API request for key: ${apiKeyData.name || apiKeyData.id}, account: ${account.name} (${accountId})` |
| ) |
| logger.debug(`🌐 Account API URL: ${account.apiUrl}`) |
| logger.debug(`🔍 Account supportedModels: ${JSON.stringify(account.supportedModels)}`) |
| logger.debug(`🔑 Account has apiKey: ${!!account.apiKey}`) |
| logger.debug(`📝 Request model: ${requestBody.model}`) |
|
|
| |
| const { baseModel } = parseVendorPrefixedModel(requestBody.model) |
| logger.debug(`🔄 Parsed base model: ${baseModel} from original: ${requestBody.model}`) |
|
|
| let mappedModel = baseModel |
| if ( |
| account.supportedModels && |
| typeof account.supportedModels === 'object' && |
| !Array.isArray(account.supportedModels) |
| ) { |
| const newModel = ccrAccountService.getMappedModel(account.supportedModels, baseModel) |
| if (newModel !== baseModel) { |
| logger.info(`🔄 Mapping model from ${baseModel} to ${newModel}`) |
| mappedModel = newModel |
| } |
| } |
|
|
| |
| const modifiedRequestBody = { |
| ...requestBody, |
| model: mappedModel |
| } |
|
|
| |
| const proxyAgent = ccrAccountService._createProxyAgent(account.proxy) |
|
|
| |
| abortController = new AbortController() |
|
|
| |
| const handleClientDisconnect = () => { |
| logger.info('🔌 Client disconnected, aborting CCR request') |
| if (abortController && !abortController.signal.aborted) { |
| abortController.abort() |
| } |
| } |
|
|
| |
| if (clientRequest) { |
| clientRequest.once('close', handleClientDisconnect) |
| } |
| if (clientResponse) { |
| clientResponse.once('close', handleClientDisconnect) |
| } |
|
|
| |
| const cleanUrl = account.apiUrl.replace(/\/$/, '') |
| let apiEndpoint |
|
|
| if (options.customPath) { |
| |
| const baseUrl = cleanUrl.replace(/\/v1\/messages$/, '') |
| apiEndpoint = `${baseUrl}${options.customPath}` |
| } else { |
| |
| apiEndpoint = cleanUrl.endsWith('/v1/messages') ? cleanUrl : `${cleanUrl}/v1/messages` |
| } |
|
|
| logger.debug(`🎯 Final API endpoint: ${apiEndpoint}`) |
| logger.debug(`[DEBUG] Options passed to relayRequest: ${JSON.stringify(options)}`) |
| logger.debug(`[DEBUG] Client headers received: ${JSON.stringify(clientHeaders)}`) |
|
|
| |
| const filteredHeaders = this._filterClientHeaders(clientHeaders) |
| logger.debug(`[DEBUG] Filtered client headers: ${JSON.stringify(filteredHeaders)}`) |
|
|
| |
| const userAgent = |
| account.userAgent || |
| clientHeaders?.['user-agent'] || |
| clientHeaders?.['User-Agent'] || |
| this.defaultUserAgent |
|
|
| |
| const requestConfig = { |
| method: 'POST', |
| url: apiEndpoint, |
| data: modifiedRequestBody, |
| headers: { |
| 'Content-Type': 'application/json', |
| 'anthropic-version': '2023-06-01', |
| 'User-Agent': userAgent, |
| ...filteredHeaders |
| }, |
| timeout: config.requestTimeout || 600000, |
| signal: abortController.signal, |
| validateStatus: () => true |
| } |
|
|
| if (proxyAgent) { |
| requestConfig.httpAgent = proxyAgent |
| requestConfig.httpsAgent = proxyAgent |
| requestConfig.proxy = false |
| } |
|
|
| |
| if (account.apiKey && account.apiKey.startsWith('sk-ant-')) { |
| |
| requestConfig.headers['x-api-key'] = account.apiKey |
| logger.debug('[DEBUG] Using x-api-key authentication for sk-ant-* API key') |
| } else { |
| |
| requestConfig.headers['Authorization'] = `Bearer ${account.apiKey}` |
| logger.debug('[DEBUG] Using Authorization Bearer authentication') |
| } |
|
|
| logger.debug( |
| `[DEBUG] Initial headers before beta: ${JSON.stringify(requestConfig.headers, null, 2)}` |
| ) |
|
|
| |
| if (options.betaHeader) { |
| logger.debug(`[DEBUG] Adding beta header: ${options.betaHeader}`) |
| requestConfig.headers['anthropic-beta'] = options.betaHeader |
| } else { |
| logger.debug('[DEBUG] No beta header to add') |
| } |
|
|
| |
| logger.debug( |
| '📤 Sending request to CCR API with headers:', |
| JSON.stringify(requestConfig.headers, null, 2) |
| ) |
| const response = await axios(requestConfig) |
|
|
| |
| if (clientRequest) { |
| clientRequest.removeListener('close', handleClientDisconnect) |
| } |
| if (clientResponse) { |
| clientResponse.removeListener('close', handleClientDisconnect) |
| } |
|
|
| logger.debug(`🔗 CCR API response: ${response.status}`) |
| logger.debug(`[DEBUG] Response headers: ${JSON.stringify(response.headers)}`) |
| logger.debug(`[DEBUG] Response data type: ${typeof response.data}`) |
| logger.debug( |
| `[DEBUG] Response data length: ${response.data ? (typeof response.data === 'string' ? response.data.length : JSON.stringify(response.data).length) : 0}` |
| ) |
| logger.debug( |
| `[DEBUG] Response data preview: ${typeof response.data === 'string' ? response.data.substring(0, 200) : JSON.stringify(response.data).substring(0, 200)}` |
| ) |
|
|
| |
| if (response.status === 401) { |
| logger.warn(`🚫 Unauthorized error detected for CCR account ${accountId}`) |
| await ccrAccountService.markAccountUnauthorized(accountId) |
| } else if (response.status === 429) { |
| logger.warn(`🚫 Rate limit detected for CCR account ${accountId}`) |
| |
| await ccrAccountService.checkQuotaUsage(accountId).catch((err) => { |
| logger.error('❌ Failed to check quota after 429 error:', err) |
| }) |
|
|
| await ccrAccountService.markAccountRateLimited(accountId) |
| } else if (response.status === 529) { |
| logger.warn(`🚫 Overload error detected for CCR account ${accountId}`) |
| await ccrAccountService.markAccountOverloaded(accountId) |
| } else if (response.status === 200 || response.status === 201) { |
| |
| const isRateLimited = await ccrAccountService.isAccountRateLimited(accountId) |
| if (isRateLimited) { |
| await ccrAccountService.removeAccountRateLimit(accountId) |
| } |
| const isOverloaded = await ccrAccountService.isAccountOverloaded(accountId) |
| if (isOverloaded) { |
| await ccrAccountService.removeAccountOverload(accountId) |
| } |
| } |
|
|
| |
| await this._updateLastUsedTime(accountId) |
|
|
| const responseBody = |
| typeof response.data === 'string' ? response.data : JSON.stringify(response.data) |
| logger.debug(`[DEBUG] Final response body to return: ${responseBody}`) |
|
|
| return { |
| statusCode: response.status, |
| headers: response.headers, |
| body: responseBody, |
| accountId |
| } |
| } catch (error) { |
| |
| if (error.name === 'AbortError' || error.code === 'ECONNABORTED') { |
| logger.info('Request aborted due to client disconnect') |
| throw new Error('Client disconnected') |
| } |
|
|
| logger.error( |
| `❌ CCR relay request failed (Account: ${account?.name || accountId}):`, |
| error.message |
| ) |
|
|
| throw error |
| } |
| } |
|
|
| |
| async relayStreamRequestWithUsageCapture( |
| requestBody, |
| apiKeyData, |
| responseStream, |
| clientHeaders, |
| usageCallback, |
| accountId, |
| streamTransformer = null, |
| options = {} |
| ) { |
| let account = null |
| try { |
| |
| account = await ccrAccountService.getAccount(accountId) |
| if (!account) { |
| throw new Error('CCR account not found') |
| } |
|
|
| logger.info( |
| `📡 Processing streaming CCR API request for key: ${apiKeyData.name || apiKeyData.id}, account: ${account.name} (${accountId})` |
| ) |
| logger.debug(`🌐 Account API URL: ${account.apiUrl}`) |
|
|
| |
| const { baseModel } = parseVendorPrefixedModel(requestBody.model) |
| logger.debug(`🔄 Parsed base model: ${baseModel} from original: ${requestBody.model}`) |
|
|
| let mappedModel = baseModel |
| if ( |
| account.supportedModels && |
| typeof account.supportedModels === 'object' && |
| !Array.isArray(account.supportedModels) |
| ) { |
| const newModel = ccrAccountService.getMappedModel(account.supportedModels, baseModel) |
| if (newModel !== baseModel) { |
| logger.info(`🔄 [Stream] Mapping model from ${baseModel} to ${newModel}`) |
| mappedModel = newModel |
| } |
| } |
|
|
| |
| const modifiedRequestBody = { |
| ...requestBody, |
| model: mappedModel |
| } |
|
|
| |
| const proxyAgent = ccrAccountService._createProxyAgent(account.proxy) |
|
|
| |
| await this._makeCcrStreamRequest( |
| modifiedRequestBody, |
| account, |
| proxyAgent, |
| clientHeaders, |
| responseStream, |
| accountId, |
| usageCallback, |
| streamTransformer, |
| options |
| ) |
|
|
| |
| await this._updateLastUsedTime(accountId) |
| } catch (error) { |
| logger.error(`❌ CCR stream relay failed (Account: ${account?.name || accountId}):`, error) |
| throw error |
| } |
| } |
|
|
| |
| async _makeCcrStreamRequest( |
| body, |
| account, |
| proxyAgent, |
| clientHeaders, |
| responseStream, |
| accountId, |
| usageCallback, |
| streamTransformer = null, |
| requestOptions = {} |
| ) { |
| return new Promise((resolve, reject) => { |
| let aborted = false |
|
|
| |
| const cleanUrl = account.apiUrl.replace(/\/$/, '') |
| const apiEndpoint = cleanUrl.endsWith('/v1/messages') ? cleanUrl : `${cleanUrl}/v1/messages` |
|
|
| logger.debug(`🎯 Final API endpoint for stream: ${apiEndpoint}`) |
|
|
| |
| const filteredHeaders = this._filterClientHeaders(clientHeaders) |
| logger.debug(`[DEBUG] Filtered client headers: ${JSON.stringify(filteredHeaders)}`) |
|
|
| |
| const userAgent = |
| account.userAgent || |
| clientHeaders?.['user-agent'] || |
| clientHeaders?.['User-Agent'] || |
| this.defaultUserAgent |
|
|
| |
| const requestConfig = { |
| method: 'POST', |
| url: apiEndpoint, |
| data: body, |
| headers: { |
| 'Content-Type': 'application/json', |
| 'anthropic-version': '2023-06-01', |
| 'User-Agent': userAgent, |
| ...filteredHeaders |
| }, |
| timeout: config.requestTimeout || 600000, |
| responseType: 'stream', |
| validateStatus: () => true |
| } |
|
|
| if (proxyAgent) { |
| requestConfig.httpAgent = proxyAgent |
| requestConfig.httpsAgent = proxyAgent |
| requestConfig.proxy = false |
| } |
|
|
| |
| if (account.apiKey && account.apiKey.startsWith('sk-ant-')) { |
| |
| requestConfig.headers['x-api-key'] = account.apiKey |
| logger.debug('[DEBUG] Using x-api-key authentication for sk-ant-* API key') |
| } else { |
| |
| requestConfig.headers['Authorization'] = `Bearer ${account.apiKey}` |
| logger.debug('[DEBUG] Using Authorization Bearer authentication') |
| } |
|
|
| |
| if (requestOptions.betaHeader) { |
| requestConfig.headers['anthropic-beta'] = requestOptions.betaHeader |
| } |
|
|
| |
| const request = axios(requestConfig) |
|
|
| request |
| .then((response) => { |
| logger.debug(`🌊 CCR stream response status: ${response.status}`) |
|
|
| |
| if (response.status !== 200) { |
| logger.error( |
| `❌ CCR API returned error status: ${response.status} | Account: ${account?.name || accountId}` |
| ) |
|
|
| if (response.status === 401) { |
| ccrAccountService.markAccountUnauthorized(accountId) |
| } else if (response.status === 429) { |
| ccrAccountService.markAccountRateLimited(accountId) |
| |
| ccrAccountService.checkQuotaUsage(accountId).catch((err) => { |
| logger.error('❌ Failed to check quota after 429 error:', err) |
| }) |
| } else if (response.status === 529) { |
| ccrAccountService.markAccountOverloaded(accountId) |
| } |
|
|
| |
| if (!responseStream.headersSent) { |
| const errorHeaders = { |
| 'Content-Type': response.headers['content-type'] || 'application/json', |
| 'Cache-Control': 'no-cache', |
| Connection: 'keep-alive' |
| } |
| |
| delete errorHeaders['Transfer-Encoding'] |
| delete errorHeaders['Content-Length'] |
| responseStream.writeHead(response.status, errorHeaders) |
| } |
|
|
| |
| response.data.on('data', (chunk) => { |
| if (!responseStream.destroyed) { |
| responseStream.write(chunk) |
| } |
| }) |
|
|
| response.data.on('end', () => { |
| if (!responseStream.destroyed) { |
| responseStream.end() |
| } |
| resolve() |
| }) |
| return |
| } |
|
|
| |
| ccrAccountService.isAccountRateLimited(accountId).then((isRateLimited) => { |
| if (isRateLimited) { |
| ccrAccountService.removeAccountRateLimit(accountId) |
| } |
| }) |
| ccrAccountService.isAccountOverloaded(accountId).then((isOverloaded) => { |
| if (isOverloaded) { |
| ccrAccountService.removeAccountOverload(accountId) |
| } |
| }) |
|
|
| |
| if (!responseStream.headersSent) { |
| const headers = { |
| 'Content-Type': 'text/event-stream', |
| 'Cache-Control': 'no-cache', |
| Connection: 'keep-alive', |
| 'Access-Control-Allow-Origin': '*', |
| 'Access-Control-Allow-Headers': 'Cache-Control' |
| } |
| responseStream.writeHead(200, headers) |
| } |
|
|
| |
| let rawBuffer = '' |
| const collectedUsage = {} |
|
|
| response.data.on('data', (chunk) => { |
| if (aborted || responseStream.destroyed) { |
| return |
| } |
|
|
| try { |
| const chunkStr = chunk.toString('utf8') |
| rawBuffer += chunkStr |
|
|
| |
| const lines = rawBuffer.split('\n') |
| rawBuffer = lines.pop() |
|
|
| for (const line of lines) { |
| if (line.trim()) { |
| |
| const usageData = this._parseSSELineForUsage(line) |
| if (usageData) { |
| Object.assign(collectedUsage, usageData) |
| } |
|
|
| |
| let outputLine = line |
| if (streamTransformer && typeof streamTransformer === 'function') { |
| outputLine = streamTransformer(line) |
| } |
|
|
| |
| if (outputLine && !responseStream.destroyed) { |
| responseStream.write(`${outputLine}\n`) |
| } |
| } else { |
| |
| if (!responseStream.destroyed) { |
| responseStream.write('\n') |
| } |
| } |
| } |
| } catch (err) { |
| logger.error('❌ Error processing SSE chunk:', err) |
| } |
| }) |
|
|
| response.data.on('end', () => { |
| if (!responseStream.destroyed) { |
| responseStream.end() |
| } |
|
|
| |
| if (usageCallback && Object.keys(collectedUsage).length > 0) { |
| try { |
| logger.debug(`📊 Collected usage data: ${JSON.stringify(collectedUsage)}`) |
| |
| usageCallback({ ...collectedUsage, accountId, model: body.model }) |
| } catch (err) { |
| logger.error('❌ Error in usage callback:', err) |
| } |
| } |
|
|
| resolve() |
| }) |
|
|
| response.data.on('error', (err) => { |
| logger.error('❌ Stream data error:', err) |
| if (!responseStream.destroyed) { |
| responseStream.end() |
| } |
| reject(err) |
| }) |
|
|
| |
| responseStream.on('close', () => { |
| logger.info('🔌 Client disconnected from CCR stream') |
| aborted = true |
| if (response.data && typeof response.data.destroy === 'function') { |
| response.data.destroy() |
| } |
| }) |
|
|
| responseStream.on('error', (err) => { |
| logger.error('❌ Response stream error:', err) |
| aborted = true |
| }) |
| }) |
| .catch((error) => { |
| if (!responseStream.headersSent) { |
| responseStream.writeHead(500, { 'Content-Type': 'application/json' }) |
| } |
|
|
| const errorResponse = { |
| error: { |
| type: 'internal_error', |
| message: 'CCR API request failed' |
| } |
| } |
|
|
| if (!responseStream.destroyed) { |
| responseStream.write(`data: ${JSON.stringify(errorResponse)}\n\n`) |
| responseStream.end() |
| } |
|
|
| reject(error) |
| }) |
| }) |
| } |
|
|
| |
| _parseSSELineForUsage(line) { |
| try { |
| if (line.startsWith('data: ')) { |
| const data = line.substring(6).trim() |
| if (data === '[DONE]') { |
| return null |
| } |
|
|
| const jsonData = JSON.parse(data) |
|
|
| |
| if (jsonData.usage) { |
| return { |
| input_tokens: jsonData.usage.input_tokens || 0, |
| output_tokens: jsonData.usage.output_tokens || 0, |
| cache_creation_input_tokens: jsonData.usage.cache_creation_input_tokens || 0, |
| cache_read_input_tokens: jsonData.usage.cache_read_input_tokens || 0, |
| |
| cache_creation_input_tokens_ephemeral_5m: |
| jsonData.usage.cache_creation_input_tokens_ephemeral_5m || 0, |
| cache_creation_input_tokens_ephemeral_1h: |
| jsonData.usage.cache_creation_input_tokens_ephemeral_1h || 0 |
| } |
| } |
|
|
| |
| if (jsonData.type === 'message_delta' && jsonData.delta && jsonData.delta.usage) { |
| return { |
| input_tokens: jsonData.delta.usage.input_tokens || 0, |
| output_tokens: jsonData.delta.usage.output_tokens || 0, |
| cache_creation_input_tokens: jsonData.delta.usage.cache_creation_input_tokens || 0, |
| cache_read_input_tokens: jsonData.delta.usage.cache_read_input_tokens || 0, |
| cache_creation_input_tokens_ephemeral_5m: |
| jsonData.delta.usage.cache_creation_input_tokens_ephemeral_5m || 0, |
| cache_creation_input_tokens_ephemeral_1h: |
| jsonData.delta.usage.cache_creation_input_tokens_ephemeral_1h || 0 |
| } |
| } |
| } |
| } catch (err) { |
| |
| } |
|
|
| return null |
| } |
|
|
| |
| _filterClientHeaders(clientHeaders) { |
| if (!clientHeaders) { |
| return {} |
| } |
|
|
| const filteredHeaders = {} |
| const allowedHeaders = [ |
| 'accept-language', |
| 'anthropic-beta', |
| 'anthropic-dangerous-direct-browser-access' |
| ] |
|
|
| |
| for (const [key, value] of Object.entries(clientHeaders)) { |
| const lowerKey = key.toLowerCase() |
| if (allowedHeaders.includes(lowerKey)) { |
| filteredHeaders[key] = value |
| } |
| } |
|
|
| return filteredHeaders |
| } |
|
|
| |
| async _updateLastUsedTime(accountId) { |
| try { |
| const redis = require('../models/redis') |
| const client = redis.getClientSafe() |
| await client.hset(`ccr_account:${accountId}`, 'lastUsedAt', new Date().toISOString()) |
| } catch (error) { |
| logger.error(`❌ Failed to update last used time for CCR account ${accountId}:`, error) |
| } |
| } |
| } |
|
|
| module.exports = new CcrRelayService() |
|
|