Qwen2API-A / src /utils /redis.js
github-actions[bot]
Sync from GitHub Viciy2023/Qwen2API-A@ae093476e9bc5b0a599620b5925df3a20057038e
f120063
const Redis = require('ioredis')
const config = require('../config/index.js')
const { logger } = require('./logger')
/**
* Redis 连接管理器
* 实现按需连接机制,仅在读写操作时建立连接
*/
// 连接配置
const REDIS_CONFIG = {
maxRetries: 3,
connectTimeout: 10000,
commandTimeout: 5000,
retryDelayOnFailover: 200,
maxRetriesPerRequest: 3,
enableOfflineQueue: false,
lazyConnect: true,
keepAlive: 30000,
connectionName: 'qwen2api_on_demand'
}
// 连接状态
let redis = null
let isConnecting = false
let connectionPromise = null
let lastActivity = 0
let idleTimer = null
// 空闲超时时间 (5分钟)
const IDLE_TIMEOUT = 5 * 60 * 1000
/**
* 判断是否需要TLS
*/
const isTLS = config.redisURL && (config.redisURL.startsWith('rediss://') || config.redisURL.includes('--tls'))
/**
* 创建Redis连接配置
*/
const createRedisConfig = () => ({
...REDIS_CONFIG,
// TLS配置
...(isTLS ? {
tls: {
rejectUnauthorized: true
}
} : {}),
// 重试策略
retryStrategy(times) {
if (times > REDIS_CONFIG.maxRetries) {
logger.error(`Redis连接重试次数超限: ${times}`, 'REDIS')
return null
}
const delay = Math.min(100 * Math.pow(2, times), 3000)
logger.info(`Redis重试连接: ${times}, 延迟: ${delay}ms`, 'REDIS', '🔄')
return delay
},
// 错误重连策略
reconnectOnError(err) {
const targetErrors = ['READONLY', 'ETIMEDOUT', 'ECONNRESET', 'EPIPE']
return targetErrors.some(e => err.message.includes(e))
}
})
/**
* 更新活动时间并重置空闲定时器
*/
const updateActivity = () => {
lastActivity = Date.now()
// 清除现有定时器
if (idleTimer) {
clearTimeout(idleTimer)
}
// 设置新的空闲定时器
idleTimer = setTimeout(() => {
if (redis && Date.now() - lastActivity > IDLE_TIMEOUT) {
logger.info('Redis连接空闲超时,断开连接', 'REDIS', '🔌')
disconnectRedis()
}
}, IDLE_TIMEOUT)
}
/**
* 建立Redis连接
*/
const connectRedis = async () => {
if (redis && redis.status === 'ready') {
updateActivity()
return redis
}
if (isConnecting && connectionPromise) {
return connectionPromise
}
isConnecting = true
connectionPromise = new Promise(async (resolve, reject) => {
try {
logger.info('建立Redis连接...', 'REDIS', '🔌')
const newRedis = new Redis(config.redisURL, createRedisConfig())
// 设置事件监听器
newRedis.on('connect', () => {
logger.success('Redis连接建立', 'REDIS')
})
newRedis.on('ready', () => {
logger.success('Redis准备就绪', 'REDIS')
redis = newRedis
isConnecting = false
updateActivity()
resolve(redis)
})
newRedis.on('error', (err) => {
logger.error('Redis连接错误', 'REDIS', '', err)
if (isConnecting) {
isConnecting = false
reject(err)
}
})
newRedis.on('close', () => {
logger.info('Redis连接关闭', 'REDIS', '🔌')
redis = null
})
newRedis.on('reconnecting', (delay) => {
logger.info(`Redis重新连接中...延迟: ${delay}ms`, 'REDIS', '🔄')
})
// 等待连接就绪
await newRedis.connect()
} catch (error) {
isConnecting = false
logger.error('Redis连接失败', 'REDIS', '', error)
reject(error)
}
})
return connectionPromise
}
/**
* 断开Redis连接
*/
const disconnectRedis = async () => {
if (idleTimer) {
clearTimeout(idleTimer)
idleTimer = null
}
if (redis) {
try {
await redis.disconnect()
logger.info('Redis连接已断开', 'REDIS', '🔌')
} catch (error) {
logger.error('断开Redis连接时出错', 'REDIS', '', error)
} finally {
redis = null
isConnecting = false
connectionPromise = null
}
}
}
/**
* 确保Redis连接可用
*/
const ensureConnection = async () => {
if (config.dataSaveMode !== 'redis') {
logger.error('当前数据保存模式不是Redis', 'REDIS')
throw new Error('当前数据保存模式不是Redis')
}
if (!redis || redis.status !== 'ready') {
return await connectRedis()
}
updateActivity()
return redis
}
/**
* 获取所有账户
* @returns {Promise<Array>} 所有账户信息数组
*/
const getAllAccounts = async () => {
try {
const client = await ensureConnection()
// 使用SCAN命令替代KEYS命令,避免阻塞Redis服务器
const keys = []
let cursor = '0'
do {
const result = await client.scan(cursor, 'MATCH', 'user:*', 'COUNT', 100)
cursor = result[0]
keys.push(...result[1])
} while (cursor !== '0')
if (!keys.length) {
logger.info('没有找到任何账户', 'REDIS', '✅')
return []
}
// 使用pipeline一次性获取所有账户数据
const pipeline = client.pipeline()
keys.forEach(key => {
pipeline.hgetall(key)
})
const results = await pipeline.exec()
if (!results) {
logger.error('获取账户数据失败', 'REDIS')
return []
}
const accounts = results.map((result, index) => {
// result格式为[err, value]
const [err, accountData] = result
if (err) {
logger.error(`获取账户 ${keys[index]} 数据失败`, 'REDIS', '', err)
return null
}
if (!accountData || Object.keys(accountData).length === 0) {
logger.error(`账户 ${keys[index]} 数据为空`, 'REDIS')
return null
}
return {
email: keys[index].replace('user:', ''),
password: accountData.password || '',
token: accountData.token || '',
expires: accountData.expires || ''
}
}).filter(Boolean) // 过滤掉null值
logger.success(`获取所有账户成功,共 ${accounts.length} 个账户`, 'REDIS')
return accounts
} catch (err) {
logger.error('获取账户时出错', 'REDIS', '', err)
return []
}
}
/**
* 设置账户
* @param {string} key - 键名(邮箱)
* @param {Object} value - 账户信息
* @returns {Promise<boolean>} 设置是否成功
*/
const setAccount = async (key, value) => {
try {
const client = await ensureConnection()
const { password, token, expires } = value
await client.hset(`user:${key}`, {
password: password || '',
token: token || '',
expires: expires || ''
})
logger.success(`账户 ${key} 设置成功`, 'REDIS')
return true
} catch (err) {
logger.error(`设置账户 ${key} 失败`, 'REDIS', '', err)
return false
}
}
/**
* 删除账户
* @param {string} key - 键名(邮箱)
* @returns {Promise<boolean>} 删除是否成功
*/
const deleteAccount = async (key) => {
try {
const client = await ensureConnection()
const result = await client.del(`user:${key}`)
if (result > 0) {
logger.success(`账户 ${key} 删除成功`, 'REDIS')
return true
} else {
logger.warn(`账户 ${key} 不存在`, 'REDIS')
return false
}
} catch (err) {
logger.error(`删除账户 ${key} 失败`, 'REDIS', '', err)
return false
}
}
/**
* 检查键是否存在
* @param {string} key - 键名
* @returns {Promise<boolean>} 键是否存在
*/
const checkKeyExists = async (key = 'headers') => {
try {
const client = await ensureConnection()
const exists = await client.exists(key)
const result = exists === 1
logger.info(`键 "${key}" ${result ? '存在' : '不存在'}`, 'REDIS', result ? '✅' : '❌')
return result
} catch (err) {
logger.error(`检查键 "${key}" 时出错`, 'REDIS', '', err)
return false
}
}
/**
* 获取连接状态
* @returns {Object} 连接状态信息
*/
const getConnectionStatus = () => {
return {
connected: redis && redis.status === 'ready',
status: redis ? redis.status : 'disconnected',
lastActivity: lastActivity,
idleTimeout: IDLE_TIMEOUT,
config: REDIS_CONFIG
}
}
/**
* 手动断开连接(用于应用关闭时清理)
*/
const cleanup = async () => {
logger.info('清理Redis连接...', 'REDIS', '🧹')
await disconnectRedis()
}
// 创建兼容的Redis客户端对象
const redisClient = {
getAllAccounts,
setAccount,
deleteAccount,
checkKeyExists,
getConnectionStatus,
cleanup,
// 直接Redis命令的代理方法(按需连接)
async hset(key, ...args) {
const client = await ensureConnection()
return client.hset(key, ...args)
},
async hget(key, field) {
const client = await ensureConnection()
return client.hget(key, field)
},
async hgetall(key) {
const client = await ensureConnection()
return client.hgetall(key)
},
async exists(key) {
const client = await ensureConnection()
return client.exists(key)
},
async keys(pattern) {
const client = await ensureConnection()
// 使用SCAN命令替代KEYS命令,避免阻塞Redis服务器
const keys = []
let cursor = '0'
do {
const result = await client.scan(cursor, 'MATCH', pattern, 'COUNT', 100)
cursor = result[0]
keys.push(...result[1])
} while (cursor !== '0')
return keys
},
async del(key) {
const client = await ensureConnection()
return client.del(key)
}
}
// 进程退出时清理连接
process.on('exit', cleanup)
process.on('SIGINT', cleanup)
process.on('SIGTERM', cleanup)
// 根据配置决定是否导出Redis客户端
module.exports = config.dataSaveMode === 'redis' ? redisClient : null