一个基于Node.js的视频流处理器库,支持HTTP/HTTPS和WebSocket视频流处理,以及视频去水印(delog)功能,提供智能任务队列管理和可扩展的任务获取中间件系统。
npm install
系统的配置文件默认位于以下位置:
config/app.json - 包含应用的主要配置config/iavs/config.js - 包含IAVS平台的登录信息和摄像头配置config/video-clip/config.json - 包含VideoClip平台的任务获取配置系统支持配置文件自动重载功能,当配置文件发生变化时,系统会自动检测并应用新的配置,无需重启应用。
fs.watch API 监听配置文件所在目录的变化config/app.json - 主配置文件config/iavs/config.js - IAVS平台配置文件config/video-clip/config.json - VideoClip平台配置文件主配置文件 config/app.json 的结构如下:
{
"taskQueue": {
"maxConcurrency": null,
"minConcurrency": 1,
"maxBandwidth": null,
"maxCPUUsage": 90,
"maxMemoryUsage": 80,
"monitorInterval": 1000,
"enableDynamicConcurrency": true,
"concurrencyAdjustmentInterval": 2000,
"concurrencyAdjustmentStep": 2,
"lowLoadThreshold": 40,
"highLoadThreshold": 80
},
"fetchers": {
"http": {
"enabled": false,
"port": 3000,
"endpoint": "/api/tasks",
"interval": 1000
},
"dummy": {
"enabled": false,
"interval": 5000,
"taskCount": 5
},
"file": {
"enabled": false,
"watchDir": "./tasks",
"filePattern": "*.json"
},
"iavs": {
"enabled": false,
"configPath": "",
"maxRetryCount": 3,
"deleteOriginalAfterDelog": true,
"heartbeatFile": "./heartbeat"
},
"video-clip": {
"enabled": true,
"configPath": "",
"useGrpcConfig": true
}
},
"grpc": {
"enabled": true,
"host": "localhost",
"port": 50056,
"interval": 60000
},
"logger": {
"level": "info",
"format": "json"
}
}
| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
maxConcurrency | Number | null | 最大并发数,null表示根据CPU核心数自动计算 |
minConcurrency | Number | 1 | 最小并发数 |
maxBandwidth | Number | null | 最大总带宽(字节/秒),null表示无限制 |
maxCPUUsage | Number | 80 | 最大CPU使用率(%) |
maxMemoryUsage | Number | 80 | 最大内存使用率(%) |
monitorInterval | Number | 1000 | 监控间隔(毫秒) |
enableDynamicConcurrency | Boolean | true | 是否启用动态并发调节 |
concurrencyAdjustmentInterval | Number | 5000 | 并发调节间隔(毫秒) |
concurrencyAdjustmentStep | Number | 1 | 并发数调节步长 |
lowLoadThreshold | Number | 40 | 低负载阈值(%),低于此值时增加并发数 |
highLoadThreshold | Number | 70 | 高负载阈值(%),高于此值时减少并发数 |
| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
http.enabled | Boolean | false | 是否启用HTTP API任务获取 |
http.port | Number | 3000 | HTTP API端口 |
http.endpoint | String | "/api/tasks" | HTTP API端点 |
http.interval | Number | 1000 | 轮询间隔(毫秒) |
dummy.enabled | Boolean | false | 是否启用示例任务获取 |
dummy.interval | Number | 5000 | 任务生成间隔(毫秒) |
dummy.taskCount | Number | 5 | 生成的任务数量 |
file.enabled | Boolean | false | 是否启用文件监控任务获取 |
file.watchDir | String | "./tasks" | 监控目录 |
file.filePattern | String | "*.json" | 文件匹配模式 |
iavs.enabled | Boolean | false | 是否启用IAVS平台任务获取 |
iavs.configPath | String | "" | IAVS配置文件路径 |
iavs.maxRetryCount | Number | 3 | 最大重试次数 |
iavs.deleteOriginalAfterDelog | Boolean | true | delog完成后是否删除原始文件 |
iavs.heartbeatFile | String | "./heartbeat" | 心跳文件路径 |
video-clip.enabled | Boolean | true | 是否启用VideoClip平台任务获取 |
video-clip.configPath | String | "" | VideoClip配置文件路径 |
video-clip.useGrpcConfig | Boolean | true | 是否使用gRPC配置 |
| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
grpc.enabled | Boolean | true | 是否启用gRPC客户端 |
grpc.host | String | "localhost" | gRPC服务器地址 |
grpc.port | Number | 50056 | gRPC服务器端口 |
grpc.interval | Number | 60000 | 配置同步间隔(毫秒) |
| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
logger.level | String | "info" | 日志级别,可选值:debug, info, warn, error |
logger.format | String | "json" | 日志格式,可选值:json, simple |
IAVS配置文件默认位于 config/iavs/config.js,包含平台登录信息和摄像头配置:
{
"name": "downloader1",
"delogCpuPercent": 20,
"threads": 2,
"startDate": "2025-10-01",
"downloadMode": "websocket", // http or websocket
"duration": 600, // 下载时长(秒)
"iavs": {
"url": "http://60.188.49.211:8082",
"username": "admin",
"password": "admin"
},
"cameraGroup": [
{
"name": "group1",
"storagePath": "./output",
"location": "own",
"cameras": [
{
"name": "camera1",
"avObjName": "av/camera1/1@192.168.1.1:8080",
"delogAreas": [{"left": 100, "top": 100, "width": 200, "height": 50}]
}
]
}
]
}
| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
name | String | "downloader1" | 下载器名称 |
delogCpuPercent | Number | 20 | 去水印任务的CPU使用率限制(%) |
threads | Number | 2 | 线程数 |
startDate | String | "2025-10-01" | 开始日期 |
downloadMode | String | "websocket" | 下载模式,可选值:http, websocket |
duration | Number | 600 | 下载时长(秒) |
iavs.url | String | "http://60.188.49.211:8082" | IAVS平台URL |
iavs.username | String | "admin" | IAVS平台用户名 |
iavs.password | String | "admin" | IAVS平台密码 |
cameraGroup | Array | [] | 摄像头组配置 |
cameraGroup[].name | String | "group1" | 摄像头组名称 |
cameraGroup[].storagePath | String | "./output" | 存储路径 |
cameraGroup[].location | String | "own" | 位置 |
cameraGroup[].cameras | Array | [] | 摄像头配置 |
cameraGroup[].cameras[].name | String | "camera1" | 摄像头名称 |
cameraGroup[].cameras[].avObjName | String | "av/camera1/1@192.168.1.1:8080" | 摄像头对象名称 |
cameraGroup[].cameras[].delogAreas | Array | [] | 去水印区域配置 |
VideoClip配置文件默认位于 config/video-clip/config.json,包含VideoClip平台的任务获取配置:
{
"unitCode": "",
"api": {
"baseUrl": "http://172.16.3.70:8088",
"timeout": 10000,
"downloadTask": "/videocollect/api/v1/task/applyDownloadTask",
"downloadReceipt": "/videocollect/api/v1/task/receiptDownloadTask",
"transcodeTask": "/videocollect/api/v1/task/applyTransferTask",
"transcodeReceipt": "/videocollect/api/v1/task/receiptTransferTask"
},
"downloadMode": "ws",
"locationOrder": ["own", "dev", "server"],
"addressReplacements": {},
"addressCheckTimeout": 3000,
"fetch": {
"interval": 5000,
"retryDelay": 3000,
"maxRetries": 3
},
"progress": {
"reportInterval": 5000,
"initialDelay": 5000,
"retryCount": 3,
"retryDelay": 1000
},
"persistence": {
"enabled": true,
"filePath": "data/video-clip-pending-tasks.json"
},
"retry": {
"maxRetries": 3,
"minDurationRatio": 0.5
},
"useGrpcConfig": true
}
| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
unitCode | String | "" | 单元代码 |
api.baseUrl | String | "http://172.16.3.70:8088" | API基础地址 |
api.timeout | Number | 10000 | API超时时间(毫秒) |
api.downloadTask | String | "/videocollect/api/v1/task/applyDownloadTask" | 下载任务API端点 |
api.downloadReceipt | String | "/videocollect/api/v1/task/receiptDownloadTask" | 下载回执API端点 |
api.transcodeTask | String | "/videocollect/api/v1/task/applyTransferTask" | 转码任务API端点 |
api.transcodeReceipt | String | "/videocollect/api/v1/task/receiptTransferTask" | 转码回执API端点 |
downloadMode | String | "ws" | 下载模式,可选值:ws, http |
locationOrder | Array | ["own", "dev", "server"] | Location优先级 |
addressReplacements | Object | {} | 地址替换配置 |
addressCheckTimeout | Number | 3000 | 地址检测超时(毫秒) |
fetch.interval | Number | 5000 | 任务获取间隔(毫秒) |
fetch.retryDelay | Number | 3000 | 获取重试延迟(毫秒) |
fetch.maxRetries | Number | 3 | 获取最大重试次数 |
progress.reportInterval | Number | 5000 | 进度上报间隔(毫秒) |
progress.initialDelay | Number | 5000 | 首次上报延迟(毫秒) |
progress.retryCount | Number | 3 | 进度上报重试次数 |
progress.retryDelay | Number | 1000 | 上报重试延迟(毫秒) |
persistence.enabled | Boolean | true | 是否启用持久化 |
persistence.filePath | String | "data/video-clip-pending-tasks.json" | 持久化文件路径 |
retry.maxRetries | Number | 3 | 任务最大重试次数 |
retry.minDurationRatio | Number | 0.5 | 最小视频时长比例 |
useGrpcConfig | Boolean | true | 是否使用gRPC配置 |
gRPC配置用于从外部gRPC服务获取配置,支持动态更新:
{
"grpc": {
"enabled": true,
"host": "localhost",
"port": 50056,
"interval": 60000
}
}
| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
enabled | Boolean | true | 是否启用gRPC客户端 |
host | String | "localhost" | gRPC服务器地址 |
port | Number | 50056 | gRPC服务器端口 |
interval | Number | 60000 | 配置同步间隔(毫秒) |
gRPC配置更新时会触发以下事件:
grpc-config-updated: 当配置更新时触发,包含 unitId 和 videoPathimport { TaskQueue } from './src/index.js';
// 创建任务队列实例
const queue = new TaskQueue({
maxConcurrency: 4, // 最大并发数
minConcurrency: 1, // 最小并发数
maxBandwidth: 10 * 1024 * 1024, // 最大总带宽10MB/s
monitorInterval: 1000, // 监控间隔1秒
// 并发调整相关配置
enableDynamicConcurrency: true, // 启用动态并发调节
concurrencyAdjustmentInterval: 2000, // 并发调节间隔(毫秒)
concurrencyAdjustmentStep: 2, // 并发数调节步长
lowLoadThreshold: 20, // 低负载阈值(%),低于此值时增加并发数
highLoadThreshold: 90, // 高负载阈值(%),高于此值时减少并发数
maxSystemLoad: 90, // 系统最大负载阈值,超过此值拒绝新任务
loadRecoveryThreshold: 70, // 负载恢复阈值,低于此值恢复接受任务
statusPrintInterval: 10000 // 状态打印间隔(毫秒)
});
// 启动队列
queue.start();
// 定义自定义回调函数
const callbacks = {
'download-speed': (data) => {
console.log(`下载速度: ${(data.bytesPerSecond / 1024 / 1024).toFixed(2)} MB/s, 任务ID: ${data.taskId}`);
},
'progress': (data) => {
console.log(`处理进度: ${data.timemark}, 已用时: ${data.elapsedTime.toFixed(2)}秒, 任务ID: ${data.taskId}`);
},
'end': (data) => {
console.log(`任务完成, 总时长: ${data.totalTime.toFixed(2)}秒, 总字节: ${(data.totalBytes / 1024 / 1024).toFixed(2)} MB, 任务ID: ${data.taskId}`);
},
'completed': (data) => {
console.log(`任务${data.taskId}完成, 结果: ${JSON.stringify(data.result)}`);
},
'failed': (data) => {
console.error(`任务${data.taskId}失败, 错误: ${data.error.message}`);
}
};
// 提交HTTP任务
const result1 = await queue.submitTask('http', {
httpUrl: 'https://vjs.zencdn.net/v/oceans.mp4',
duration: 20,
outputPath: 'output/http_test_1.mp4'
}, callbacks);
// 提交WebSocket任务
const result2 = await queue.submitTask('ws', {
wsUrl: 'ws://example.com/stream.flv',
duration: 20,
outputPath: 'output/ws_test_1.mp4'
}, callbacks);
// 提交视频去水印任务
const result3 = await queue.submitTask('delog', {
inputPath: 'input/video.mp4',
delogAreas: [
{ x: 100, y: 100, width: 200, height: 100 },
{ x: 500, y: 200, width: 150, height: 150 }
],
outputPath: 'output/delog_test_1.mp4',
outputResolution: '1280:720'
}, callbacks);
// 获取队列状态
const queueStatus = queue.getQueueStatus();
console.log('队列状态:', queueStatus);
// 停止队列
queue.stop();
import { HTTPStreamProcessor } from './src/index.js';
const processor = new HTTPStreamProcessor();
// 添加事件监听
processor.on('download-speed', (data) => {
console.log(`下载速度: ${(data.bytesPerSecond / 1024 / 1024).toFixed(2)} MB/s`);
});
processor.on('progress', (data) => {
console.log(`处理进度: ${data.timemark}, 已用时: ${data.elapsedTime.toFixed(2)}秒`);
});
// 处理HTTP视频流
await processor.processStream(
'http://example.com/stream.flv', // HTTP/HTTPS视频地址
60, // 输出视频时长(秒)
'output.mp4', // 输出文件路径
'flv' // 输入格式(可选,ffmpeg会自动检测)
);
import { WSFLVProcessor } from './src/index.js';
const processor = new WSFLVProcessor();
// 添加事件监听
processor.on('download-speed', (data) => {
console.log(`下载速度: ${(data.bytesPerSecond / 1024 / 1024).toFixed(2)} MB/s`);
});
processor.on('progress', (data) => {
console.log(`处理进度: ${data.timemark}, 已用时: ${data.elapsedTime.toFixed(2)}秒`);
});
// 处理WebSocket FLV流
await processor.processFLVStream(
'ws://example.com/stream.flv', // WebSocket FLV地址
60, // 输出视频时长(秒)
'output.mp4' // 输出文件路径
);
import { DelogProcessor } from './src/index.js';
const processor = new DelogProcessor();
// 添加事件监听
processor.on('progress', (data) => {
console.log(`处理进度: ${data.timemark}, 已用时: ${data.elapsedTime.toFixed(2)}秒`);
});
processor.on('start', (data) => {
console.log(`开始处理视频: ${data.inputPath}`);
console.log(`去水印区域: ${JSON.stringify(data.delogAreas)}`);
console.log(`输出分辨率: ${data.outputResolution}`);
});
processor.on('end', (data) => {
console.log(`处理完成: ${data.outputPath}`);
console.log(`总时长: ${data.totalTime.toFixed(2)}秒`);
});
// 处理视频去水印
await processor.processFile(
'input/video.mp4', // 输入视频文件路径
[
{ x: 100, y: 100, width: 200, height: 100 }, // 去水印区域1
{ x: 500, y: 200, width: 150, height: 150 } // 去水印区域2
],
'output/delogged_video.mp4', // 输出视频路径
'1280:720' // 输出分辨率(可选,默认1280:720)
);
# 启动常驻守护进程
npm start
# 或使用daemon命令
npm run daemon
curl -X POST -H "Content-Type: application/json" -d '{
"taskType": "http",
"params": {
"httpUrl": "https://vjs.zencdn.net/v/oceans.mp4",
"duration": 20,
"outputPath": "output/api_test.mp4"
},
"callbacks": {
"completed": "function(data) { console.log(\"Task completed: \" + data.taskId); }"
}
}' http://localhost:3000/api/tasks
在 config/app.json 中添加或修改以下配置:
{
"fetchers": {
"iavs": {
"enabled": true,
"configPath": "config/iavs/config.js",
"maxRetryCount": 3
}
}
}
IAVS配置文件默认位于 config/iavs/config.js,包含平台登录信息和摄像头配置。详细配置选项请参考 配置管理 章节。
{
"name": "downloader1",
"delogCpuPercent": 20,
"threads": 2,
"startDate": "2025-10-01",
"downloadMode": "websocket", // http or websocket
"iavs": {
"url": "http://60.188.49.211:8082",
"username": "admin",
"password": "admin"
},
"cameraGroup": [
{
"name": "group1",
"storagePath": "/path/to/storage",
"location": "own",
"cameras": [
{
"name": "camera1",
"avObjName": "av/camera1/1@192.168.1.1:8080",
"delogAreas": [{"left": 100, "top": 100, "width": 200, "height": 50}]
}
]
}
]
}
IAVS任务获取中间件现在支持delog任务队列机制,具体实现如下:
delog任务队列:当下载任务完成时,delog任务会被添加到一个先进先出的队列中,而不是直接提交。
优先调度delog任务:在调度摄像头任务之前,会优先处理delog任务队列中的任务,确保delog任务能够及时处理。
任务提交状态跟踪:只有当delog任务提交成功时,才会从队列中移除,否则会留在队列中等待下次调度。
系统过载处理:当系统过载时,会停止调度新的任务,包括delog任务,直到系统负载恢复。
错误处理和重试:当任务提交失败时,会记录错误信息,并在系统负载恢复后重新尝试提交。
这种机制确保了delog任务能够按照正确的顺序处理,并且在系统负载过高时不会导致系统崩溃。
IAVS任务获取中间件现在支持智能下载重试机制,确保视频时长准确,具体实现如下:
视频长度验证:下载完成后,系统会使用FFmpeg检查视频的实际时长,并与请求的时长进行比较。
任务重试队列:如果视频时长不足(低于要求的90%),任务会被添加到重试队列中,等待重新下载。
可配置的重试次数:通过 maxRetryCount 配置项可以设置最大重试次数,默认值为3次。
失败处理:当达到最大重试次数后,系统会生成一个 .failed 文件,包含失败原因、下载URL和视频长度信息。
系统负载感知:任务重试会考虑系统负载情况,当系统过载时会暂停重试,直到负载恢复。
配置示例:
在 config/app.json 中添加或修改以下配置:
{
"fetchers": {
"iavs": {
"enabled": true,
"configPath": "config/iavs/config.js",
"maxRetryCount": 3
}
}
}
失败文件格式:
当任务失败时,系统会生成一个与下载文件同名但扩展名为 .failed 的文件,包含以下信息:
{
"reason": "Failed after 3 attempts: Video duration insufficient (50s < 60s)",
"downloadUrl": "ws://example.com/stream.flv",
"requestedDuration": 60,
"actualDuration": 50
}
这种机制确保了系统能够自动处理视频长度不足的情况,提高了下载任务的成功率和可靠性。
VideoClip任务获取中间件从 video-auto-clip-system API 获取下载和转码任务。
在 config/app.json 中添加或修改以下配置:
{
"fetchers": {
"video-clip": {
"enabled": true,
"configPath": "config/video-clip/config.json",
"useGrpcConfig": true
}
},
"grpc": {
"enabled": true,
"host": "localhost",
"port": 50056,
"interval": 60000
}
}
VideoClip配置文件默认位于 config/video-clip/config.json,包含API配置和任务获取配置。详细配置选项请参考 配置管理 章节。
{
"unitCode": "",
"api": {
"baseUrl": "http://172.16.3.70:8088",
"timeout": 10000,
"downloadTask": "/videocollect/api/v1/task/applyDownloadTask",
"downloadReceipt": "/videocollect/api/v1/task/receiptDownloadTask",
"transcodeTask": "/videocollect/api/v1/task/applyTransferTask",
"transcodeReceipt": "/videocollect/api/v1/task/receiptTransferTask"
},
"downloadMode": "ws",
"locationOrder": ["own", "dev", "server"],
"fetch": {
"interval": 5000,
"retryDelay": 3000,
"maxRetries": 3
},
"progress": {
"reportInterval": 5000,
"initialDelay": 5000
},
"persistence": {
"enabled": true,
"filePath": "data/video-clip-pending-tasks.json"
},
"retry": {
"maxRetries": 3,
"minDurationRatio": 0.5
},
"useGrpcConfig": true
}
任务获取:从VideoClip API获取下载和转码任务
下载任务处理:
转码任务处理:
持久化机制:
重试机制:
gRPC配置集成:
new TaskQueue(options = {})
参数:
options:配置选项
maxConcurrency:最大并发数,null表示根据CPU核心数自动计算,默认nullminConcurrency:最小并发数,默认1maxBandwidth:最大总带宽(字节/秒),null表示无限制,默认nullmaxCPUUsage:最大CPU使用率(%),默认80maxMemoryUsage:最大内存使用率(%),默认80monitorInterval:监控间隔(毫秒),默认1000enableDynamicConcurrency:是否启用动态并发调节,默认trueconcurrencyAdjustmentInterval:并发调节间隔(毫秒),默认2000concurrencyAdjustmentStep:并发数调节步长,默认2lowLoadThreshold:低负载阈值(%),低于此值时增加并发数,默认20highLoadThreshold:高负载阈值(%),高于此值时减少并发数,默认90maxSystemLoad:系统最大负载阈值,超过此值拒绝新任务,默认90loadRecoveryThreshold:负载恢复阈值,低于此值恢复接受任务,默认70statusPrintInterval:状态打印间隔,默认10000start()
功能:启动任务队列
stop(cancelRunningTasks = false)
功能:停止任务队列
参数:
cancelRunningTasks:是否取消正在执行的任务,默认falseasync submitTask(taskType, params, callbacks = {})
功能:提交任务到队列
参数:
taskType:任务类型,'http'、'ws'或'delog'params:任务参数
{ httpUrl, duration, outputPath, inputFormat }{ wsUrl, duration, outputPath }{ inputPath, delogAreas, outputPath, outputResolution }callbacks:自定义回调函数,键为事件类型,值为回调函数返回值:
getQueueStatus()
功能:获取当前队列状态
返回值:
getStats()
功能:获取队列统计信息
返回值:
new HTTPStreamProcessor(options = {})
参数:
options:配置选项
chunkSize:数据块大小,默认65536(64KB)timeout:超时时间,默认30000(30秒)async processStream(httpUrl, duration, outputPath, inputFormat = null, taskId = null)
参数:
httpUrl:HTTP/HTTPS视频地址duration:输出视频时长(秒)outputPath:输出文件路径inputFormat:输入格式(可选,ffmpeg会自动检测)taskId:任务ID(可选)返回值:
outputPath:输出文件路径totalTime:总处理时间(秒)totalBytes:总下载字节数taskId:任务IDnew WSFLVProcessor(options = {})
参数:
options:配置选项
chunkSize:数据块大小,默认65536(64KB)timeout:超时时间,默认30000(30秒)reconnectAttempts:重连次数,默认0async processFLVStream(wsUrl, duration, outputPath, taskId = null)
参数:
wsUrl:WebSocket FLV地址duration:输出视频时长(秒)outputPath:输出文件路径taskId:任务ID(可选)返回值:
outputPath:输出文件路径totalTime:总处理时间(秒)totalBytes:总下载字节数taskId:任务IDrequestedDuration:请求的视频时长(秒)actualDuration:实际的视频时长(秒)WSFLVProcessor使用滑动窗口方法计算实时下载速度,具体实现如下:
滑动窗口数据存储:使用downloadData数组存储最近1秒内的下载数据,每个元素包含时间戳和字节数。
数据清理:每次接收到数据时,移除超过1秒的数据,确保窗口大小保持在1秒内。
速度计算:计算窗口内所有数据的总字节数,作为过去1秒的下载速度。
事件发射:每1秒发射一次download-speed事件,包含实时下载速度和其他相关信息。
// 存储最近1秒内的下载数据
const downloadData = [];
ws.on('message', (data) => {
const now = Date.now();
const byteLength = data.byteLength || data.length;
// 更新统计信息
totalBytes += byteLength;
lastByteTime = now;
// 添加当前时间和字节数到下载数据数组
downloadData.push({ time: now, bytes: byteLength });
// 移除超过1秒的数据
const oneSecondAgo = now - 1000;
while (downloadData.length > 0 && downloadData[0].time < oneSecondAgo) {
downloadData.shift();
}
// 计算过去1秒内的总字节数
const bytesInLastSecond = downloadData.reduce((sum, item) => sum + item.bytes, 0);
// 计算实时下载速度(每秒字节数)
const downloadSpeed = bytesInLastSecond;
// 限制download-speed事件发射频率
if (now - lastSpeedEventTime >= SPEED_EVENT_INTERVAL) {
// 触发下载速度事件
this.emit('download-speed', {
bytesPerSecond: downloadSpeed,
bytesReceived: totalBytes,
elapsedTime: elapsedSeconds,
currentTime: now,
taskId
});
lastSpeedEventTime = now;
}
});
这种实现方式确保了下载速度的实时性,避免了使用平均速度导致的延迟和不准确问题。
new DelogProcessor(options = {})
参数:
options:配置选项
chunkSize:数据块大小,默认65536(64KB)timeout:超时时间,默认30000(30秒)async processFile(inputPath, delogAreas, outputPath, outputResolution = '1280:720', taskId = null)
参数:
inputPath:输入视频文件路径delogAreas:去水印区域数组,每个区域包含 { left, top, width, height }outputPath:输出视频文件路径outputResolution:输出视频分辨率,格式为 width:height,默认 '1280:720'taskId:任务ID(可选)返回值:
outputPath:输出文件路径totalTime:总处理时间(秒)totalBytes:总处理字节数taskId:任务IDduration:视频总时长(秒)new Task(taskType, params, callbacks = {})
参数:
taskType:任务类型,'http'、'ws'或'delog'params:任务参数callbacks:自定义回调函数execute()
功能:执行任务
返回值:
cancel()
功能:取消任务
getStatus()
功能:获取任务状态
返回值:
getStats()
功能:获取任务统计信息
返回值:
new SystemMonitor(options = {})
参数:
options:配置选项
monitorInterval:监控间隔(毫秒),默认1000startMonitoring()
功能:开始系统监控
stopMonitoring()
功能:停止系统监控
getCPUUsage()
功能:获取CPU使用率
返回值:
getMemoryUsage()
功能:获取内存使用率
返回值:
getSystemLoad()
功能:获取系统负载
返回值:
getSystemStats()
功能:获取系统统计信息
返回值:
new BandwidthController(options = {})
参数:
options:配置选项
maxBandwidth:最大总带宽(字节/秒),null表示无限制,默认nullmonitorInterval:监控间隔(毫秒),默认1000startMonitoring()
功能:开始带宽监控
stopMonitoring()
功能:停止带宽监控
canAddTask()
功能:检查是否可以添加新任务
返回值:
updateTaskBandwidth(taskId, bytesReceived, interval)
功能:更新任务带宽使用情况
参数:
taskId:任务IDbytesReceived:接收的字节数interval:时间间隔(毫秒)releaseBandwidth(taskId)
功能:释放任务带宽
参数:
taskId:任务IDgetCurrentBandwidthStats()
功能:获取当前带宽统计信息
返回值:
| 事件名 | 触发时机 | 事件数据 |
|---|---|---|
started | 队列启动时 | - |
stopped | 队列停止时 | - |
task-submitted | 任务提交时 | { taskId, taskType, status } |
task-status-changed | 任务状态变化时 | 任务状态对象,包含id、taskType、status等详细信息 |
task-completed | 任务完成时 | { taskId, taskType, result, duration } |
task-failed | 任务失败时 | { taskId, taskType, error } |
task-cancelled | 任务取消时 | { taskId, taskType } |
task-bandwidth-updated | 任务带宽更新时 | { taskId, bytesPerSecond, bytesReceived, maxBandwidth } |
task-progress | 任务进度更新时 | { taskId, timemark, elapsedTime, duration } |
system-load-changed | 系统负载变化时 | { load, isOverloaded, maxLoad, runningTasks, maxConcurrency, recoveryThreshold } |
bandwidth-changed | 带宽变化时 | 带宽统计数据,包含currentBandwidth、maxBandwidth、avgBandwidth等 |
queue-status-changed | 队列状态变化时 | 完整的队列状态对象,包含运行中的任务详情 |
concurrency-adjusted | 并发数调整时 | { oldConcurrency, newConcurrency, cpuUsage, memoryUsage, systemLoad } |
- oldConcurrency:调整前的并发数 | ||
- newConcurrency:调整后的并发数 | ||
- cpuUsage:调整时的CPU使用率(%) | ||
- memoryUsage:调整时的内存使用率(%) | ||
- systemLoad:调整时的系统负载(%) | ||
system-overloaded | 系统过载时 | { currentLoad, maxLoad, recoveryThreshold, runningTasks, maxConcurrency } |
system-load-recovered | 系统负载恢复时 | { currentLoad, maxLoad, recoveryThreshold, runningTasks, maxConcurrency } |
| 事件名 | 触发时机 | 事件数据 |
|---|---|---|
start | 处理开始时 | { httpUrl, duration, outputPath, inputFormat, startTime, taskId } |
download-speed | 每500毫秒 | { bytesPerSecond, bytesReceived, elapsedTime, currentTime, taskId } |
progress | FFmpeg进度更新时 | { timemark, currentTime, elapsedTime, duration, taskId } |
ffmpeg-start | FFmpeg开始处理时 | { commandLine, taskId } |
end | 处理完成时 | { outputPath, totalTime, totalBytes, taskId } |
error | 发生错误时 | Error对象,包含taskId |
| 事件名 | 触发时机 | 事件数据 |
|---|---|---|
start | 处理开始时 | { wsUrl, duration, outputPath, startTime, taskId } |
connect | 连接到WebSocket服务器时 | { wsUrl, taskId } |
download-speed | 每1秒 | { bytesPerSecond, bytesReceived, elapsedTime, currentTime, taskId } |
progress | FFmpeg进度更新时 | { timemark, currentTime, elapsedTime, duration, taskId } |
ffmpeg-start | FFmpeg开始处理时 | { commandLine, taskId } |
end | 处理完成时 | { outputPath, totalTime, totalBytes, taskId } |
error | 发生错误时 | Error对象,包含taskId |
disconnect | 与WebSocket服务器断开连接时 | { code, reason, taskId } |
reconnect-attempt | 尝试重新连接时 | { attempt, maxAttempts, taskId } |
| 事件名 | 触发时机 | 事件数据 |
|---|---|---|
start | 处理开始时 | { inputPath, delogAreas, outputPath, outputResolution, startTime, taskId } |
progress | FFmpeg进度更新时 | { timemark, currentTime, elapsedTime, duration, taskId } |
ffmpeg-start | FFmpeg开始处理时 | { commandLine, taskId } |
end | 处理完成时 | { outputPath, totalTime, totalBytes, taskId } |
error | 发生错误时 | Error对象,包含taskId |
| 事件名 | 触发时机 | 事件数据 |
|---|---|---|
status-changed | 任务状态变化时 | 任务状态对象,包含id、taskType、status、createdAt、startedAt、completedAt、duration、bytesReceived、currentBandwidth、maxBandwidth、error |
completed | 任务完成时 | 任务执行结果对象,包含outputPath、totalTime、totalBytes、taskId |
failed | 任务失败时 | Error对象,包含错误信息 |
cancelled | 任务取消时 | - |
bandwidth-updated | 任务带宽更新时 | { taskId, bytesPerSecond, bytesReceived, maxBandwidth } |
progress | 任务进度更新时 | { taskId, timemark, elapsedTime, duration } |
started | 任务开始时 | { taskId, startTime } |
download-speed | 下载速度更新时 | 转发处理器的download-speed事件,包含taskId |
ffmpeg-start | FFmpeg开始处理时 | 转发处理器的ffmpeg-start事件,包含taskId |
start | 处理器开始处理时 | 转发处理器的start事件,包含taskId |
end | 处理器处理完成时 | 转发处理器的end事件,包含taskId |
connect | WebSocket连接成功时 | 转发WSFLVProcessor的connect事件,包含taskId |
disconnect | WebSocket断开连接时 | 转发WSFLVProcessor的disconnect事件,包含taskId |
reconnect-attempt | WebSocket尝试重连时 | 转发WSFLVProcessor的reconnect-attempt事件,包含taskId |
error | 处理器发生错误时 | 转发处理器的error事件,包含taskId |
| 事件名 | 触发时机 | 事件数据 |
|---|---|---|
system-load | 系统负载变化时 | 系统负载数据 |
| 事件名 | 触发时机 | 事件数据 |
|---|---|---|
bandwidth-changed | 带宽变化时 | 带宽统计数据 |
node test/http-stream.test.js
node test/ws-flv.test.js
node test/queue.test.js
node test/delog.test.js
node test/mixed-tasks.test.js
node test/iavs-task-fetcher.test.js
node test/grpc-config.test.js
node test/load-test.js
以下是系统的事件驱动架构图,展示了组件间的事件传递关系:
应用启动流程:
任务处理流程:
系统监控流程:
事件类型:
事件传递机制:
├── src/ # 核心源代码目录 │ ├── index.js # 项目入口文件 │ ├── app/ # 应用主目录 │ │ ├── app.js # 应用主类 │ │ ├── config.js # 配置管理 │ │ ├── task-fetcher.js # 任务获取中间件基类 │ │ └── task-fetcher-manager.js # 任务获取管理器 │ ├── grpc/ # gRPC客户端目录 │ │ ├── client.js # gRPC客户端 │ │ └── config_pb.mjs # gRPC配置协议缓冲区 │ ├── processors/ # 处理器目录 │ │ ├── http-stream-processor.js # HTTP/HTTPS视频流处理器 │ │ ├── ws-flv-processor.js # WebSocket FLV流处理器 │ │ └── delog-processor.js # 视频去水印处理器 │ ├── middleware/ # 任务获取中间件目录 │ │ ├── http-task-fetcher.js # HTTP API任务获取中间件 │ │ ├── dummy-task-fetcher.js # 示例任务获取中间件 │ │ ├── iavs-task-fetcher.js # IAVS平台任务获取中间件 │ │ └── video-clip-task-fetcher.js # VideoClip平台任务获取中间件 │ ├── utils/ # 工具目录 │ │ ├── logger.js # 日志管理模块 │ │ ├── task.js # 任务类 │ │ ├── task-queue.js # 智能任务队列 │ │ ├── system-monitor.js # 系统监控模块 │ │ └── bandwidth-controller.js # 带宽控制器 │ ├── daemon.js # 常驻进程入口 ├── config/ # 配置目录 │ ├── app.json # 应用主配置文件 │ ├── app.example.json # 配置文件示例 │ ├── iavs/ # IAVS平台配置目录 │ │ └── config.js # IAVS平台配置文件 │ └── video-clip/ # VideoClip平台配置目录 │ └── config.json # VideoClip平台配置文件 ├── examples/ # 示例代码目录 │ └── custom-callbacks.js # 自定义回调示例 ├── test/ # 测试文件目录 │ ├── http-stream.test.js # HTTP流处理器测试 │ ├── ws-flv.test.js # WebSocket FLV流处理器测试 │ ├── delog.test.js # 视频去水印处理器测试 │ ├── queue.test.js # 智能任务队列测试 │ ├── mixed-tasks.test.js # 混合任务类型测试 │ ├── iavs-task-fetcher.test.js # IAVS任务获取中间件测试 │ ├── grpc-config.test.js # gRPC配置测试 │ └── load-test.js # 负载测试 ├── logs/ # 日志文件目录 ├── output/ # 输出文件目录 ├── package.json # 项目配置文件 └── README.md # 项目说明文档
ws:WebSocket客户端库fluent-ffmpeg:FFmpeg处理库winston:日志记录库systeminformation:系统信息获取库axios:HTTP客户端库,用于IAVS API调用digest-fetch:Digest认证客户端,用于IAVS登录json5:JSON5解析库,用于读取IAVS配置文件uuid:UUID生成库,用于生成任务ID在项目开发过程中,我们遇到了logger.js和config.js之间的循环依赖问题:
问题描述:
logger.js需要从config.js导入日志级别配置config.js需要从logger.js导入createLogger函数来记录日志ReferenceError: Cannot access 'createLogger' before initialization错误解决方案:
延迟日志级别初始化:
logger.js中使用默认日志级别('info'),避免从config.js导入loggers数组存储所有创建的日志记录器实例updateLogLevel函数,用于在应用启动后动态更新所有日志记录器的日志级别在应用启动后更新日志级别:
daemon.js中导入updateLogLevel函数config.get('logger.level')获取配置的日志级别updateLogLevel函数更新所有日志记录器的日志级别相关代码:
// logger.js 中的 updateLogLevel 函数
const updateLogLevel = (level) => {
loggers.forEach(logger => {
logger.level = level;
});
console.log(`Updated all loggers to level: ${level}`);
};
// 导出 updateLogLevel 函数
export default createLogger;
export { updateLogLevel };
// daemon.js 中的使用
import { updateLogLevel } from './utils/logger.js';
async function main() {
// 启动应用
await app.start();
// 应用启动后更新日志级别
const logLevel = config.get('logger.level', 'info');
updateLogLevel(logLevel);
logger.info(`Updated log level to: ${logLevel}`);
}
这种解决方案避免了循环依赖问题,同时保持了从配置文件读取日志级别的灵活性。
系统实现了智能的并发数据自动调整机制,根据系统负载动态调整任务并发度,具体实现如下:
核心配置参数:
enableDynamicConcurrency:是否启用动态并发调节,默认trueconcurrencyAdjustmentInterval:并发调节间隔(毫秒),默认2000concurrencyAdjustmentStep:并发数调节步长,默认2lowLoadThreshold:低负载阈值(%),低于此值时增加并发数,默认20highLoadThreshold:高负载阈值(%),高于此值时减少并发数,默认90minConcurrency:最小并发数,默认1maxConcurrency:最大并发数,默认根据CPU核心数自动计算(CPU核心数 * 0.8)实现逻辑:
算法细节:
newConcurrency = Math.min(currentMaxConcurrency + concurrencyAdjustmentStep, maxConcurrency)newConcurrency = minConcurrency事件通知:
concurrency-adjusted事件系统过载处理:
这种实现方式确保了系统能够根据实际负载智能地调整并发度,在保证系统稳定性的同时,最大化利用系统资源。
系统将任务分为两种类型:
IO密集型任务:
CPU密集型任务:
系统会分别管理这两种类型的任务队列,确保系统资源得到合理分配,避免某一种类型的任务占用过多系统资源。
MIT