logo
0
0
WeChat Login
Zhang Wenhu<wenhulove333@163.com>
feat: 排查当前代码,进行优化

视频流处理器库

一个基于Node.js的视频流处理器库,支持HTTP/HTTPS和WebSocket视频流处理,以及视频去水印(delog)功能,提供智能任务队列管理和可扩展的任务获取中间件系统。

功能特点

  • ✅ 支持HTTP/HTTPS视频流处理(flv, mp4, zmv等各种视频格式)
  • ✅ 支持WebSocket FLV流处理
  • ✅ 支持视频去水印(delog)处理
  • ✅ 智能任务队列,支持动态并发调节
  • ✅ 并发数据自动调整机制,根据系统负载智能调整并发度
  • ✅ 任务提交时直接执行,减少任务等待时间
  • ✅ 只copy视频流,不处理音频,提高处理效率
  • ✅ 支持指定输出视频时长,自动重试确保时长准确
  • ✅ 实时系统监控(CPU、内存、负载)
  • ✅ 智能带宽控制,防止网络拥塞
  • ✅ 支持自定义回调函数,灵活扩展
  • ✅ 实时事件发射(下载速度、处理时长等),所有事件包含taskId便于追踪
  • ✅ 完善的日志记录(使用winston库)
  • ✅ 支持ES Modules(import/export语法)
  • ✅ 异步接口设计,易于使用
  • ✅ 可扩展的任务获取中间件系统
  • ✅ 常驻守护进程,从外部获取任务
  • ✅ HTTP API任务获取中间件
  • ✅ 示例任务获取中间件
  • ✅ IAVS平台任务获取中间件
  • ✅ VideoClip平台任务获取中间件
  • ✅ 支持混合任务类型处理
  • ✅ 系统负载监控,自动拒绝过载请求
  • ✅ 自动恢复机制,负载降低后继续接受任务
  • ✅ IAVS平台任务获取中间件支持delog任务队列,优先处理delog任务
  • ✅ VideoClip任务获取中间件支持任务持久化和重启恢复
  • ✅ VideoClip任务获取中间件支持任务失败重试机制
  • ✅ 任务提交失败时的错误处理和重试机制
  • ✅ 系统过载时的任务调度暂停和恢复机制
  • ✅ 完善的任务状态追踪和事件处理
  • ✅ 实时下载速度计算(使用滑动窗口方法)
  • ✅ WebSocket连接无数据情况和超时处理
  • ✅ 循环依赖问题解决方案
  • ✅ 延迟日志级别初始化,提高系统启动可靠性
  • ✅ 任务提交失败回调机制,便于中间件处理失败情况
  • ✅ 系统状态定期监控和记录
  • ✅ 完善的资源清理机制,确保临时文件正确处理
  • ✅ 支持任务ID传递,便于全链路追踪
  • ✅ 中间件错误处理和事件通知机制
  • ✅ 配置文件自动重载,无需重启应用即可应用新配置
  • ✅ gRPC客户端支持,动态获取配置
  • ✅ 支持VideoClip和IAVS多平台任务获取
  • ✅ 任务执行状态详细统计和监控
  • ✅ 实时队列状态打印,便于监控系统运行情况
  • ✅ 支持任务类型分类(CPU密集型和IO密集型)
  • ✅ 智能系统负载检测和恢复机制

安装

npm install

配置管理

配置文件位置

系统的配置文件默认位于以下位置:

  • 主配置文件config/app.json - 包含应用的主要配置
  • IAVS配置文件config/iavs/config.js - 包含IAVS平台的登录信息和摄像头配置
  • VideoClip配置文件config/video-clip/config.json - 包含VideoClip平台的任务获取配置

配置文件自动重载

系统支持配置文件自动重载功能,当配置文件发生变化时,系统会自动检测并应用新的配置,无需重启应用。

工作原理

  1. 文件系统监听:系统使用 Node.js 的 fs.watch API 监听配置文件所在目录的变化
  2. 配置重载:当配置文件发生变化时,系统会自动重新加载配置文件
  3. 配置变更检测:系统会比较旧配置和新配置,只有当配置确实发生变化时才会应用新配置
  4. 组件更新:当配置发生变化时,系统会自动更新相关组件的配置,如任务队列、任务获取器等
  5. 防抖处理:系统会对配置文件变化事件进行防抖处理,避免频繁触发配置重载

支持的配置文件类型

  • config/app.json - 主配置文件
  • config/iavs/config.js - IAVS平台配置文件
  • config/video-clip/config.json - VideoClip平台配置文件
  • 其他在配置目录中的 JSON 和 JS 配置文件

主配置文件结构

主配置文件 config/app.json 的结构如下:

{ "taskQueue": { "maxConcurrency": null, "minConcurrency": 1, "maxBandwidth": null, "maxCPUUsage": 90, "maxMemoryUsage": 80, "monitorInterval": 1000, "enableDynamicConcurrency": true, "concurrencyAdjustmentInterval": 2000, "concurrencyAdjustmentStep": 2, "lowLoadThreshold": 40, "highLoadThreshold": 80 }, "fetchers": { "http": { "enabled": false, "port": 3000, "endpoint": "/api/tasks", "interval": 1000 }, "dummy": { "enabled": false, "interval": 5000, "taskCount": 5 }, "file": { "enabled": false, "watchDir": "./tasks", "filePattern": "*.json" }, "iavs": { "enabled": false, "configPath": "", "maxRetryCount": 3, "deleteOriginalAfterDelog": true, "heartbeatFile": "./heartbeat" }, "video-clip": { "enabled": true, "configPath": "", "useGrpcConfig": true } }, "grpc": { "enabled": true, "host": "localhost", "port": 50056, "interval": 60000 }, "logger": { "level": "info", "format": "json" } }

主要配置选项说明

任务队列配置
配置项类型默认值说明
maxConcurrencyNumbernull最大并发数,null表示根据CPU核心数自动计算
minConcurrencyNumber1最小并发数
maxBandwidthNumbernull最大总带宽(字节/秒),null表示无限制
maxCPUUsageNumber80最大CPU使用率(%)
maxMemoryUsageNumber80最大内存使用率(%)
monitorIntervalNumber1000监控间隔(毫秒)
enableDynamicConcurrencyBooleantrue是否启用动态并发调节
concurrencyAdjustmentIntervalNumber5000并发调节间隔(毫秒)
concurrencyAdjustmentStepNumber1并发数调节步长
lowLoadThresholdNumber40低负载阈值(%),低于此值时增加并发数
highLoadThresholdNumber70高负载阈值(%),高于此值时减少并发数
任务获取器配置
配置项类型默认值说明
http.enabledBooleanfalse是否启用HTTP API任务获取
http.portNumber3000HTTP API端口
http.endpointString"/api/tasks"HTTP API端点
http.intervalNumber1000轮询间隔(毫秒)
dummy.enabledBooleanfalse是否启用示例任务获取
dummy.intervalNumber5000任务生成间隔(毫秒)
dummy.taskCountNumber5生成的任务数量
file.enabledBooleanfalse是否启用文件监控任务获取
file.watchDirString"./tasks"监控目录
file.filePatternString"*.json"文件匹配模式
iavs.enabledBooleanfalse是否启用IAVS平台任务获取
iavs.configPathString""IAVS配置文件路径
iavs.maxRetryCountNumber3最大重试次数
iavs.deleteOriginalAfterDelogBooleantruedelog完成后是否删除原始文件
iavs.heartbeatFileString"./heartbeat"心跳文件路径
video-clip.enabledBooleantrue是否启用VideoClip平台任务获取
video-clip.configPathString""VideoClip配置文件路径
video-clip.useGrpcConfigBooleantrue是否使用gRPC配置
gRPC配置
配置项类型默认值说明
grpc.enabledBooleantrue是否启用gRPC客户端
grpc.hostString"localhost"gRPC服务器地址
grpc.portNumber50056gRPC服务器端口
grpc.intervalNumber60000配置同步间隔(毫秒)
日志配置
配置项类型默认值说明
logger.levelString"info"日志级别,可选值:debug, info, warn, error
logger.formatString"json"日志格式,可选值:json, simple

IAVS配置文件

IAVS配置文件默认位于 config/iavs/config.js,包含平台登录信息和摄像头配置:

{ "name": "downloader1", "delogCpuPercent": 20, "threads": 2, "startDate": "2025-10-01", "downloadMode": "websocket", // http or websocket "duration": 600, // 下载时长(秒) "iavs": { "url": "http://60.188.49.211:8082", "username": "admin", "password": "admin" }, "cameraGroup": [ { "name": "group1", "storagePath": "./output", "location": "own", "cameras": [ { "name": "camera1", "avObjName": "av/camera1/1@192.168.1.1:8080", "delogAreas": [{"left": 100, "top": 100, "width": 200, "height": 50}] } ] } ] }

IAVS配置选项说明

配置项类型默认值说明
nameString"downloader1"下载器名称
delogCpuPercentNumber20去水印任务的CPU使用率限制(%)
threadsNumber2线程数
startDateString"2025-10-01"开始日期
downloadModeString"websocket"下载模式,可选值:http, websocket
durationNumber600下载时长(秒)
iavs.urlString"http://60.188.49.211:8082"IAVS平台URL
iavs.usernameString"admin"IAVS平台用户名
iavs.passwordString"admin"IAVS平台密码
cameraGroupArray[]摄像头组配置
cameraGroup[].nameString"group1"摄像头组名称
cameraGroup[].storagePathString"./output"存储路径
cameraGroup[].locationString"own"位置
cameraGroup[].camerasArray[]摄像头配置
cameraGroup[].cameras[].nameString"camera1"摄像头名称
cameraGroup[].cameras[].avObjNameString"av/camera1/1@192.168.1.1:8080"摄像头对象名称
cameraGroup[].cameras[].delogAreasArray[]去水印区域配置

VideoClip配置文件

VideoClip配置文件默认位于 config/video-clip/config.json,包含VideoClip平台的任务获取配置:

{ "unitCode": "", "api": { "baseUrl": "http://172.16.3.70:8088", "timeout": 10000, "downloadTask": "/videocollect/api/v1/task/applyDownloadTask", "downloadReceipt": "/videocollect/api/v1/task/receiptDownloadTask", "transcodeTask": "/videocollect/api/v1/task/applyTransferTask", "transcodeReceipt": "/videocollect/api/v1/task/receiptTransferTask" }, "downloadMode": "ws", "locationOrder": ["own", "dev", "server"], "addressReplacements": {}, "addressCheckTimeout": 3000, "fetch": { "interval": 5000, "retryDelay": 3000, "maxRetries": 3 }, "progress": { "reportInterval": 5000, "initialDelay": 5000, "retryCount": 3, "retryDelay": 1000 }, "persistence": { "enabled": true, "filePath": "data/video-clip-pending-tasks.json" }, "retry": { "maxRetries": 3, "minDurationRatio": 0.5 }, "useGrpcConfig": true }

VideoClip配置选项说明

配置项类型默认值说明
unitCodeString""单元代码
api.baseUrlString"http://172.16.3.70:8088"API基础地址
api.timeoutNumber10000API超时时间(毫秒)
api.downloadTaskString"/videocollect/api/v1/task/applyDownloadTask"下载任务API端点
api.downloadReceiptString"/videocollect/api/v1/task/receiptDownloadTask"下载回执API端点
api.transcodeTaskString"/videocollect/api/v1/task/applyTransferTask"转码任务API端点
api.transcodeReceiptString"/videocollect/api/v1/task/receiptTransferTask"转码回执API端点
downloadModeString"ws"下载模式,可选值:ws, http
locationOrderArray["own", "dev", "server"]Location优先级
addressReplacementsObject{}地址替换配置
addressCheckTimeoutNumber3000地址检测超时(毫秒)
fetch.intervalNumber5000任务获取间隔(毫秒)
fetch.retryDelayNumber3000获取重试延迟(毫秒)
fetch.maxRetriesNumber3获取最大重试次数
progress.reportIntervalNumber5000进度上报间隔(毫秒)
progress.initialDelayNumber5000首次上报延迟(毫秒)
progress.retryCountNumber3进度上报重试次数
progress.retryDelayNumber1000上报重试延迟(毫秒)
persistence.enabledBooleantrue是否启用持久化
persistence.filePathString"data/video-clip-pending-tasks.json"持久化文件路径
retry.maxRetriesNumber3任务最大重试次数
retry.minDurationRatioNumber0.5最小视频时长比例
useGrpcConfigBooleantrue是否使用gRPC配置

gRPC配置

gRPC配置用于从外部gRPC服务获取配置,支持动态更新:

{ "grpc": { "enabled": true, "host": "localhost", "port": 50056, "interval": 60000 } }

gRPC配置选项说明

配置项类型默认值说明
enabledBooleantrue是否启用gRPC客户端
hostString"localhost"gRPC服务器地址
portNumber50056gRPC服务器端口
intervalNumber60000配置同步间隔(毫秒)

gRPC配置更新时会触发以下事件:

  • grpc-config-updated: 当配置更新时触发,包含 unitIdvideoPath

快速开始

智能任务队列(推荐使用)

import { TaskQueue } from './src/index.js'; // 创建任务队列实例 const queue = new TaskQueue({ maxConcurrency: 4, // 最大并发数 minConcurrency: 1, // 最小并发数 maxBandwidth: 10 * 1024 * 1024, // 最大总带宽10MB/s monitorInterval: 1000, // 监控间隔1秒 // 并发调整相关配置 enableDynamicConcurrency: true, // 启用动态并发调节 concurrencyAdjustmentInterval: 2000, // 并发调节间隔(毫秒) concurrencyAdjustmentStep: 2, // 并发数调节步长 lowLoadThreshold: 20, // 低负载阈值(%),低于此值时增加并发数 highLoadThreshold: 90, // 高负载阈值(%),高于此值时减少并发数 maxSystemLoad: 90, // 系统最大负载阈值,超过此值拒绝新任务 loadRecoveryThreshold: 70, // 负载恢复阈值,低于此值恢复接受任务 statusPrintInterval: 10000 // 状态打印间隔(毫秒) }); // 启动队列 queue.start(); // 定义自定义回调函数 const callbacks = { 'download-speed': (data) => { console.log(`下载速度: ${(data.bytesPerSecond / 1024 / 1024).toFixed(2)} MB/s, 任务ID: ${data.taskId}`); }, 'progress': (data) => { console.log(`处理进度: ${data.timemark}, 已用时: ${data.elapsedTime.toFixed(2)}秒, 任务ID: ${data.taskId}`); }, 'end': (data) => { console.log(`任务完成, 总时长: ${data.totalTime.toFixed(2)}秒, 总字节: ${(data.totalBytes / 1024 / 1024).toFixed(2)} MB, 任务ID: ${data.taskId}`); }, 'completed': (data) => { console.log(`任务${data.taskId}完成, 结果: ${JSON.stringify(data.result)}`); }, 'failed': (data) => { console.error(`任务${data.taskId}失败, 错误: ${data.error.message}`); } }; // 提交HTTP任务 const result1 = await queue.submitTask('http', { httpUrl: 'https://vjs.zencdn.net/v/oceans.mp4', duration: 20, outputPath: 'output/http_test_1.mp4' }, callbacks); // 提交WebSocket任务 const result2 = await queue.submitTask('ws', { wsUrl: 'ws://example.com/stream.flv', duration: 20, outputPath: 'output/ws_test_1.mp4' }, callbacks); // 提交视频去水印任务 const result3 = await queue.submitTask('delog', { inputPath: 'input/video.mp4', delogAreas: [ { x: 100, y: 100, width: 200, height: 100 }, { x: 500, y: 200, width: 150, height: 150 } ], outputPath: 'output/delog_test_1.mp4', outputResolution: '1280:720' }, callbacks); // 获取队列状态 const queueStatus = queue.getQueueStatus(); console.log('队列状态:', queueStatus); // 停止队列 queue.stop();

HTTP/HTTPS视频流处理(直接使用)

import { HTTPStreamProcessor } from './src/index.js'; const processor = new HTTPStreamProcessor(); // 添加事件监听 processor.on('download-speed', (data) => { console.log(`下载速度: ${(data.bytesPerSecond / 1024 / 1024).toFixed(2)} MB/s`); }); processor.on('progress', (data) => { console.log(`处理进度: ${data.timemark}, 已用时: ${data.elapsedTime.toFixed(2)}秒`); }); // 处理HTTP视频流 await processor.processStream( 'http://example.com/stream.flv', // HTTP/HTTPS视频地址 60, // 输出视频时长(秒) 'output.mp4', // 输出文件路径 'flv' // 输入格式(可选,ffmpeg会自动检测) );

WebSocket FLV流处理(直接使用)

import { WSFLVProcessor } from './src/index.js'; const processor = new WSFLVProcessor(); // 添加事件监听 processor.on('download-speed', (data) => { console.log(`下载速度: ${(data.bytesPerSecond / 1024 / 1024).toFixed(2)} MB/s`); }); processor.on('progress', (data) => { console.log(`处理进度: ${data.timemark}, 已用时: ${data.elapsedTime.toFixed(2)}秒`); }); // 处理WebSocket FLV流 await processor.processFLVStream( 'ws://example.com/stream.flv', // WebSocket FLV地址 60, // 输出视频时长(秒) 'output.mp4' // 输出文件路径 );

视频去水印处理(直接使用)

import { DelogProcessor } from './src/index.js'; const processor = new DelogProcessor(); // 添加事件监听 processor.on('progress', (data) => { console.log(`处理进度: ${data.timemark}, 已用时: ${data.elapsedTime.toFixed(2)}秒`); }); processor.on('start', (data) => { console.log(`开始处理视频: ${data.inputPath}`); console.log(`去水印区域: ${JSON.stringify(data.delogAreas)}`); console.log(`输出分辨率: ${data.outputResolution}`); }); processor.on('end', (data) => { console.log(`处理完成: ${data.outputPath}`); console.log(`总时长: ${data.totalTime.toFixed(2)}秒`); }); // 处理视频去水印 await processor.processFile( 'input/video.mp4', // 输入视频文件路径 [ { x: 100, y: 100, width: 200, height: 100 }, // 去水印区域1 { x: 500, y: 200, width: 150, height: 150 } // 去水印区域2 ], 'output/delogged_video.mp4', // 输出视频路径 '1280:720' // 输出分辨率(可选,默认1280:720) );

常驻守护进程(从外部获取任务)

# 启动常驻守护进程 npm start # 或使用daemon命令 npm run daemon

HTTP API任务提交

curl -X POST -H "Content-Type: application/json" -d '{ "taskType": "http", "params": { "httpUrl": "https://vjs.zencdn.net/v/oceans.mp4", "duration": 20, "outputPath": "output/api_test.mp4" }, "callbacks": { "completed": "function(data) { console.log(\"Task completed: \" + data.taskId); }" } }' http://localhost:3000/api/tasks

IAVS平台任务获取

配置IAVS任务获取中间件

config/app.json 中添加或修改以下配置:

{ "fetchers": { "iavs": { "enabled": true, "configPath": "config/iavs/config.js", "maxRetryCount": 3 } } }

IAVS配置文件位置

IAVS配置文件默认位于 config/iavs/config.js,包含平台登录信息和摄像头配置。详细配置选项请参考 配置管理 章节。

IAVS配置文件格式

{ "name": "downloader1", "delogCpuPercent": 20, "threads": 2, "startDate": "2025-10-01", "downloadMode": "websocket", // http or websocket "iavs": { "url": "http://60.188.49.211:8082", "username": "admin", "password": "admin" }, "cameraGroup": [ { "name": "group1", "storagePath": "/path/to/storage", "location": "own", "cameras": [ { "name": "camera1", "avObjName": "av/camera1/1@192.168.1.1:8080", "delogAreas": [{"left": 100, "top": 100, "width": 200, "height": 50}] } ] } ] }

IAVS任务获取中间件的delog任务队列机制

IAVS任务获取中间件现在支持delog任务队列机制,具体实现如下:

  1. delog任务队列:当下载任务完成时,delog任务会被添加到一个先进先出的队列中,而不是直接提交。

  2. 优先调度delog任务:在调度摄像头任务之前,会优先处理delog任务队列中的任务,确保delog任务能够及时处理。

  3. 任务提交状态跟踪:只有当delog任务提交成功时,才会从队列中移除,否则会留在队列中等待下次调度。

  4. 系统过载处理:当系统过载时,会停止调度新的任务,包括delog任务,直到系统负载恢复。

  5. 错误处理和重试:当任务提交失败时,会记录错误信息,并在系统负载恢复后重新尝试提交。

这种机制确保了delog任务能够按照正确的顺序处理,并且在系统负载过高时不会导致系统崩溃。

下载重试机制

IAVS任务获取中间件现在支持智能下载重试机制,确保视频时长准确,具体实现如下:

  1. 视频长度验证:下载完成后,系统会使用FFmpeg检查视频的实际时长,并与请求的时长进行比较。

  2. 任务重试队列:如果视频时长不足(低于要求的90%),任务会被添加到重试队列中,等待重新下载。

  3. 可配置的重试次数:通过 maxRetryCount 配置项可以设置最大重试次数,默认值为3次。

  4. 失败处理:当达到最大重试次数后,系统会生成一个 .failed 文件,包含失败原因、下载URL和视频长度信息。

  5. 系统负载感知:任务重试会考虑系统负载情况,当系统过载时会暂停重试,直到负载恢复。

配置示例

config/app.json 中添加或修改以下配置:

{ "fetchers": { "iavs": { "enabled": true, "configPath": "config/iavs/config.js", "maxRetryCount": 3 } } }

失败文件格式

当任务失败时,系统会生成一个与下载文件同名但扩展名为 .failed 的文件,包含以下信息:

{ "reason": "Failed after 3 attempts: Video duration insufficient (50s < 60s)", "downloadUrl": "ws://example.com/stream.flv", "requestedDuration": 60, "actualDuration": 50 }

这种机制确保了系统能够自动处理视频长度不足的情况,提高了下载任务的成功率和可靠性。

VideoClip平台任务获取

VideoClip任务获取中间件从 video-auto-clip-system API 获取下载和转码任务。

配置VideoClip任务获取中间件

config/app.json 中添加或修改以下配置:

{ "fetchers": { "video-clip": { "enabled": true, "configPath": "config/video-clip/config.json", "useGrpcConfig": true } }, "grpc": { "enabled": true, "host": "localhost", "port": 50056, "interval": 60000 } }

VideoClip配置文件位置

VideoClip配置文件默认位于 config/video-clip/config.json,包含API配置和任务获取配置。详细配置选项请参考 配置管理 章节。

VideoClip配置文件格式

{ "unitCode": "", "api": { "baseUrl": "http://172.16.3.70:8088", "timeout": 10000, "downloadTask": "/videocollect/api/v1/task/applyDownloadTask", "downloadReceipt": "/videocollect/api/v1/task/receiptDownloadTask", "transcodeTask": "/videocollect/api/v1/task/applyTransferTask", "transcodeReceipt": "/videocollect/api/v1/task/receiptTransferTask" }, "downloadMode": "ws", "locationOrder": ["own", "dev", "server"], "fetch": { "interval": 5000, "retryDelay": 3000, "maxRetries": 3 }, "progress": { "reportInterval": 5000, "initialDelay": 5000 }, "persistence": { "enabled": true, "filePath": "data/video-clip-pending-tasks.json" }, "retry": { "maxRetries": 3, "minDurationRatio": 0.5 }, "useGrpcConfig": true }

VideoClip任务获取中间件功能

  1. 任务获取:从VideoClip API获取下载和转码任务

  2. 下载任务处理

    • 支持CAMERA类型和HTTP类型的下载任务
    • 按locationOrder优先级组装和检测URL可用性
    • 支持WebSocket和HTTP两种下载模式
  3. 转码任务处理

    • 解析logoPosition,支持多种格式
    • 自动转换坐标格式
  4. 持久化机制

    • 任务状态自动保存到磁盘
    • 重启后自动恢复待处理任务
    • 支持任务重试计数持久化
  5. 重试机制

    • 下载失败自动重试
    • 转码失败自动重试
    • 可配置最大重试次数
    • 支持时长不足时重试
  6. gRPC配置集成

    • 支持从gRPC服务动态获取unitCode和videoPath
    • 配置更新自动生效
    • 支持独立配置文件+gRPC动态配置的混合模式

API文档

TaskQueue类(智能任务队列)

构造函数

new TaskQueue(options = {})

参数

  • options:配置选项
    • maxConcurrency:最大并发数,null表示根据CPU核心数自动计算,默认null
    • minConcurrency:最小并发数,默认1
    • maxBandwidth:最大总带宽(字节/秒),null表示无限制,默认null
    • maxCPUUsage:最大CPU使用率(%),默认80
    • maxMemoryUsage:最大内存使用率(%),默认80
    • monitorInterval:监控间隔(毫秒),默认1000
    • enableDynamicConcurrency:是否启用动态并发调节,默认true
    • concurrencyAdjustmentInterval:并发调节间隔(毫秒),默认2000
    • concurrencyAdjustmentStep:并发数调节步长,默认2
    • lowLoadThreshold:低负载阈值(%),低于此值时增加并发数,默认20
    • highLoadThreshold:高负载阈值(%),高于此值时减少并发数,默认90
    • maxSystemLoad:系统最大负载阈值,超过此值拒绝新任务,默认90
    • loadRecoveryThreshold:负载恢复阈值,低于此值恢复接受任务,默认70
    • statusPrintInterval:状态打印间隔,默认10000

start方法

start()

功能:启动任务队列

stop方法

stop(cancelRunningTasks = false)

功能:停止任务队列

参数

  • cancelRunningTasks:是否取消正在执行的任务,默认false

submitTask方法

async submitTask(taskType, params, callbacks = {})

功能:提交任务到队列

参数

  • taskType:任务类型,'http'、'ws'或'delog'
  • params:任务参数
    • 对于'http'类型:{ httpUrl, duration, outputPath, inputFormat }
    • 对于'ws'类型:{ wsUrl, duration, outputPath }
    • 对于'delog'类型:{ inputPath, delogAreas, outputPath, outputResolution }
  • callbacks:自定义回调函数,键为事件类型,值为回调函数

返回值

  • Promise,成功时返回任务执行结果

getQueueStatus方法

getQueueStatus()

功能:获取当前队列状态

返回值

  • 队列状态对象,包含运行中的任务信息和系统状态

getStats方法

getStats()

功能:获取队列统计信息

返回值

  • 队列统计信息对象

HTTPStreamProcessor类

构造函数

new HTTPStreamProcessor(options = {})

参数

  • options:配置选项
    • chunkSize:数据块大小,默认65536(64KB)
    • timeout:超时时间,默认30000(30秒)

processStream方法

async processStream(httpUrl, duration, outputPath, inputFormat = null, taskId = null)

参数

  • httpUrl:HTTP/HTTPS视频地址
  • duration:输出视频时长(秒)
  • outputPath:输出文件路径
  • inputFormat:输入格式(可选,ffmpeg会自动检测)
  • taskId:任务ID(可选)

返回值

  • Promise,成功时返回包含以下字段的对象:
    • outputPath:输出文件路径
    • totalTime:总处理时间(秒)
    • totalBytes:总下载字节数
    • taskId:任务ID

WSFLVProcessor类

构造函数

new WSFLVProcessor(options = {})

参数

  • options:配置选项
    • chunkSize:数据块大小,默认65536(64KB)
    • timeout:超时时间,默认30000(30秒)
    • reconnectAttempts:重连次数,默认0

processFLVStream方法

async processFLVStream(wsUrl, duration, outputPath, taskId = null)

参数

  • wsUrl:WebSocket FLV地址
  • duration:输出视频时长(秒)
  • outputPath:输出文件路径
  • taskId:任务ID(可选)

返回值

  • Promise,成功时返回包含以下字段的对象:
    • outputPath:输出文件路径
    • totalTime:总处理时间(秒)
    • totalBytes:总下载字节数
    • taskId:任务ID
    • requestedDuration:请求的视频时长(秒)
    • actualDuration:实际的视频时长(秒)

下载速度计算实现

WSFLVProcessor使用滑动窗口方法计算实时下载速度,具体实现如下:

  1. 滑动窗口数据存储:使用downloadData数组存储最近1秒内的下载数据,每个元素包含时间戳和字节数。

  2. 数据清理:每次接收到数据时,移除超过1秒的数据,确保窗口大小保持在1秒内。

  3. 速度计算:计算窗口内所有数据的总字节数,作为过去1秒的下载速度。

  4. 事件发射:每1秒发射一次download-speed事件,包含实时下载速度和其他相关信息。

// 存储最近1秒内的下载数据 const downloadData = []; ws.on('message', (data) => { const now = Date.now(); const byteLength = data.byteLength || data.length; // 更新统计信息 totalBytes += byteLength; lastByteTime = now; // 添加当前时间和字节数到下载数据数组 downloadData.push({ time: now, bytes: byteLength }); // 移除超过1秒的数据 const oneSecondAgo = now - 1000; while (downloadData.length > 0 && downloadData[0].time < oneSecondAgo) { downloadData.shift(); } // 计算过去1秒内的总字节数 const bytesInLastSecond = downloadData.reduce((sum, item) => sum + item.bytes, 0); // 计算实时下载速度(每秒字节数) const downloadSpeed = bytesInLastSecond; // 限制download-speed事件发射频率 if (now - lastSpeedEventTime >= SPEED_EVENT_INTERVAL) { // 触发下载速度事件 this.emit('download-speed', { bytesPerSecond: downloadSpeed, bytesReceived: totalBytes, elapsedTime: elapsedSeconds, currentTime: now, taskId }); lastSpeedEventTime = now; } });

这种实现方式确保了下载速度的实时性,避免了使用平均速度导致的延迟和不准确问题。

DelogProcessor类

构造函数

new DelogProcessor(options = {})

参数

  • options:配置选项
    • chunkSize:数据块大小,默认65536(64KB)
    • timeout:超时时间,默认30000(30秒)

processFile方法

async processFile(inputPath, delogAreas, outputPath, outputResolution = '1280:720', taskId = null)

参数

  • inputPath:输入视频文件路径
  • delogAreas:去水印区域数组,每个区域包含 { left, top, width, height }
  • outputPath:输出视频文件路径
  • outputResolution:输出视频分辨率,格式为 width:height,默认 '1280:720'
  • taskId:任务ID(可选)

返回值

  • Promise,成功时返回包含以下字段的对象:
    • outputPath:输出文件路径
    • totalTime:总处理时间(秒)
    • totalBytes:总处理字节数
    • taskId:任务ID
    • duration:视频总时长(秒)

Task类

构造函数

new Task(taskType, params, callbacks = {})

参数

  • taskType:任务类型,'http'、'ws'或'delog'
  • params:任务参数
  • callbacks:自定义回调函数

execute方法

execute()

功能:执行任务

返回值

  • Promise,成功时返回任务执行结果

cancel方法

cancel()

功能:取消任务

getStatus方法

getStatus()

功能:获取任务状态

返回值

  • 任务状态对象

getStats方法

getStats()

功能:获取任务统计信息

返回值

  • 任务统计信息对象

SystemMonitor类

构造函数

new SystemMonitor(options = {})

参数

  • options:配置选项
    • monitorInterval:监控间隔(毫秒),默认1000

startMonitoring方法

startMonitoring()

功能:开始系统监控

stopMonitoring方法

stopMonitoring()

功能:停止系统监控

getCPUUsage方法

getCPUUsage()

功能:获取CPU使用率

返回值

  • CPU使用率(%)

getMemoryUsage方法

getMemoryUsage()

功能:获取内存使用率

返回值

  • 内存使用率(%)

getSystemLoad方法

getSystemLoad()

功能:获取系统负载

返回值

  • 系统负载(%)

getSystemStats方法

getSystemStats()

功能:获取系统统计信息

返回值

  • 系统统计信息对象

BandwidthController类

构造函数

new BandwidthController(options = {})

参数

  • options:配置选项
    • maxBandwidth:最大总带宽(字节/秒),null表示无限制,默认null
    • monitorInterval:监控间隔(毫秒),默认1000

startMonitoring方法

startMonitoring()

功能:开始带宽监控

stopMonitoring方法

stopMonitoring()

功能:停止带宽监控

canAddTask方法

canAddTask()

功能:检查是否可以添加新任务

返回值

  • boolean,true表示可以添加新任务,false表示不行

updateTaskBandwidth方法

updateTaskBandwidth(taskId, bytesReceived, interval)

功能:更新任务带宽使用情况

参数

  • taskId:任务ID
  • bytesReceived:接收的字节数
  • interval:时间间隔(毫秒)

releaseBandwidth方法

releaseBandwidth(taskId)

功能:释放任务带宽

参数

  • taskId:任务ID

getCurrentBandwidthStats方法

getCurrentBandwidthStats()

功能:获取当前带宽统计信息

返回值

  • 带宽统计信息对象

事件说明

TaskQueue事件

事件名触发时机事件数据
started队列启动时-
stopped队列停止时-
task-submitted任务提交时{ taskId, taskType, status }
task-status-changed任务状态变化时任务状态对象,包含id、taskType、status等详细信息
task-completed任务完成时{ taskId, taskType, result, duration }
task-failed任务失败时{ taskId, taskType, error }
task-cancelled任务取消时{ taskId, taskType }
task-bandwidth-updated任务带宽更新时{ taskId, bytesPerSecond, bytesReceived, maxBandwidth }
task-progress任务进度更新时{ taskId, timemark, elapsedTime, duration }
system-load-changed系统负载变化时{ load, isOverloaded, maxLoad, runningTasks, maxConcurrency, recoveryThreshold }
bandwidth-changed带宽变化时带宽统计数据,包含currentBandwidth、maxBandwidth、avgBandwidth等
queue-status-changed队列状态变化时完整的队列状态对象,包含运行中的任务详情
concurrency-adjusted并发数调整时{ oldConcurrency, newConcurrency, cpuUsage, memoryUsage, systemLoad }
- oldConcurrency:调整前的并发数
- newConcurrency:调整后的并发数
- cpuUsage:调整时的CPU使用率(%)
- memoryUsage:调整时的内存使用率(%)
- systemLoad:调整时的系统负载(%)
system-overloaded系统过载时{ currentLoad, maxLoad, recoveryThreshold, runningTasks, maxConcurrency }
system-load-recovered系统负载恢复时{ currentLoad, maxLoad, recoveryThreshold, runningTasks, maxConcurrency }

HTTPStreamProcessor事件

事件名触发时机事件数据
start处理开始时{ httpUrl, duration, outputPath, inputFormat, startTime, taskId }
download-speed每500毫秒{ bytesPerSecond, bytesReceived, elapsedTime, currentTime, taskId }
progressFFmpeg进度更新时{ timemark, currentTime, elapsedTime, duration, taskId }
ffmpeg-startFFmpeg开始处理时{ commandLine, taskId }
end处理完成时{ outputPath, totalTime, totalBytes, taskId }
error发生错误时Error对象,包含taskId

WSFLVProcessor事件

事件名触发时机事件数据
start处理开始时{ wsUrl, duration, outputPath, startTime, taskId }
connect连接到WebSocket服务器时{ wsUrl, taskId }
download-speed每1秒{ bytesPerSecond, bytesReceived, elapsedTime, currentTime, taskId }
progressFFmpeg进度更新时{ timemark, currentTime, elapsedTime, duration, taskId }
ffmpeg-startFFmpeg开始处理时{ commandLine, taskId }
end处理完成时{ outputPath, totalTime, totalBytes, taskId }
error发生错误时Error对象,包含taskId
disconnect与WebSocket服务器断开连接时{ code, reason, taskId }
reconnect-attempt尝试重新连接时{ attempt, maxAttempts, taskId }

DelogProcessor事件

事件名触发时机事件数据
start处理开始时{ inputPath, delogAreas, outputPath, outputResolution, startTime, taskId }
progressFFmpeg进度更新时{ timemark, currentTime, elapsedTime, duration, taskId }
ffmpeg-startFFmpeg开始处理时{ commandLine, taskId }
end处理完成时{ outputPath, totalTime, totalBytes, taskId }
error发生错误时Error对象,包含taskId

Task事件

事件名触发时机事件数据
status-changed任务状态变化时任务状态对象,包含id、taskType、status、createdAt、startedAt、completedAt、duration、bytesReceived、currentBandwidth、maxBandwidth、error
completed任务完成时任务执行结果对象,包含outputPath、totalTime、totalBytes、taskId
failed任务失败时Error对象,包含错误信息
cancelled任务取消时-
bandwidth-updated任务带宽更新时{ taskId, bytesPerSecond, bytesReceived, maxBandwidth }
progress任务进度更新时{ taskId, timemark, elapsedTime, duration }
started任务开始时{ taskId, startTime }
download-speed下载速度更新时转发处理器的download-speed事件,包含taskId
ffmpeg-startFFmpeg开始处理时转发处理器的ffmpeg-start事件,包含taskId
start处理器开始处理时转发处理器的start事件,包含taskId
end处理器处理完成时转发处理器的end事件,包含taskId
connectWebSocket连接成功时转发WSFLVProcessor的connect事件,包含taskId
disconnectWebSocket断开连接时转发WSFLVProcessor的disconnect事件,包含taskId
reconnect-attemptWebSocket尝试重连时转发WSFLVProcessor的reconnect-attempt事件,包含taskId
error处理器发生错误时转发处理器的error事件,包含taskId

SystemMonitor事件

事件名触发时机事件数据
system-load系统负载变化时系统负载数据

BandwidthController事件

事件名触发时机事件数据
bandwidth-changed带宽变化时带宽统计数据

测试

运行HTTP流处理器测试

node test/http-stream.test.js

运行WebSocket FLV流处理器测试

node test/ws-flv.test.js

运行智能任务队列测试

node test/queue.test.js

运行视频去水印处理器测试

node test/delog.test.js

运行混合任务类型测试

node test/mixed-tasks.test.js

运行IAVS任务获取中间件测试

node test/iavs-task-fetcher.test.js

运行gRPC配置测试

node test/grpc-config.test.js

运行负载测试

node test/load-test.js

事件驱动架构

事件流图

以下是系统的事件驱动架构图,展示了组件间的事件传递关系:

事件流说明

  1. 应用启动流程

    • App启动TaskFetcherManager和TaskQueue
    • TaskFetcherManager加载并启动中间件
    • 中间件从外部获取任务并提交给TaskFetcherManager
    • TaskFetcherManager将任务提交给TaskQueue
  2. 任务处理流程

    • TaskQueue接收任务并直接执行(如果系统资源允许)
    • Task创建相应的处理器实例(HTTP/WS/Delog)
    • 处理器执行具体的视频处理任务
    • 处理器向Task发送各种事件(下载速度、进度、状态变化等)
    • Task将事件转发给TaskQueue
    • TaskQueue将事件转发给App
  3. 系统监控流程

    • SystemMonitor监控系统CPU、内存和负载
    • BandwidthController监控带宽使用情况
    • 当系统负载超过阈值时,TaskQueue发出系统过载事件
    • App将过载事件转发给TaskFetcherManager
    • TaskFetcherManager暂停获取新任务
    • 当系统负载恢复时,TaskQueue发出系统恢复事件
    • App将恢复事件转发给TaskFetcherManager
    • TaskFetcherManager恢复获取新任务
  4. 事件类型

    • 应用事件:start, stop, task-submitted, task-completed, task-failed
    • 任务队列事件:started, stopped, task-submitted, task-completed, task-failed, system-overloaded, system-load-recovered, bandwidth-changed
    • 任务事件:status-changed, completed, failed, cancelled, bandwidth-updated, progress, download-speed
    • 处理器事件:start, end, download-speed, progress, error, connect, disconnect, reconnect-attempt
    • 监控事件:system-load, bandwidth-changed
  5. 事件传递机制

    • 系统使用Node.js的EventEmitter实现事件驱动架构
    • 事件从底层组件向上层组件传递
    • 每层组件可以监听和处理感兴趣的事件
    • 事件可以被转发或转换为更高级别的事件
    • 所有事件都包含taskId,便于追踪任务状态

项目结构

├── src/ # 核心源代码目录 │ ├── index.js # 项目入口文件 │ ├── app/ # 应用主目录 │ │ ├── app.js # 应用主类 │ │ ├── config.js # 配置管理 │ │ ├── task-fetcher.js # 任务获取中间件基类 │ │ └── task-fetcher-manager.js # 任务获取管理器 │ ├── grpc/ # gRPC客户端目录 │ │ ├── client.js # gRPC客户端 │ │ └── config_pb.mjs # gRPC配置协议缓冲区 │ ├── processors/ # 处理器目录 │ │ ├── http-stream-processor.js # HTTP/HTTPS视频流处理器 │ │ ├── ws-flv-processor.js # WebSocket FLV流处理器 │ │ └── delog-processor.js # 视频去水印处理器 │ ├── middleware/ # 任务获取中间件目录 │ │ ├── http-task-fetcher.js # HTTP API任务获取中间件 │ │ ├── dummy-task-fetcher.js # 示例任务获取中间件 │ │ ├── iavs-task-fetcher.js # IAVS平台任务获取中间件 │ │ └── video-clip-task-fetcher.js # VideoClip平台任务获取中间件 │ ├── utils/ # 工具目录 │ │ ├── logger.js # 日志管理模块 │ │ ├── task.js # 任务类 │ │ ├── task-queue.js # 智能任务队列 │ │ ├── system-monitor.js # 系统监控模块 │ │ └── bandwidth-controller.js # 带宽控制器 │ ├── daemon.js # 常驻进程入口 ├── config/ # 配置目录 │ ├── app.json # 应用主配置文件 │ ├── app.example.json # 配置文件示例 │ ├── iavs/ # IAVS平台配置目录 │ │ └── config.js # IAVS平台配置文件 │ └── video-clip/ # VideoClip平台配置目录 │ └── config.json # VideoClip平台配置文件 ├── examples/ # 示例代码目录 │ └── custom-callbacks.js # 自定义回调示例 ├── test/ # 测试文件目录 │ ├── http-stream.test.js # HTTP流处理器测试 │ ├── ws-flv.test.js # WebSocket FLV流处理器测试 │ ├── delog.test.js # 视频去水印处理器测试 │ ├── queue.test.js # 智能任务队列测试 │ ├── mixed-tasks.test.js # 混合任务类型测试 │ ├── iavs-task-fetcher.test.js # IAVS任务获取中间件测试 │ ├── grpc-config.test.js # gRPC配置测试 │ └── load-test.js # 负载测试 ├── logs/ # 日志文件目录 ├── output/ # 输出文件目录 ├── package.json # 项目配置文件 └── README.md # 项目说明文档

依赖

  • ws:WebSocket客户端库
  • fluent-ffmpeg:FFmpeg处理库
  • winston:日志记录库
  • systeminformation:系统信息获取库
  • axios:HTTP客户端库,用于IAVS API调用
  • digest-fetch:Digest认证客户端,用于IAVS登录
  • json5:JSON5解析库,用于读取IAVS配置文件
  • uuid:UUID生成库,用于生成任务ID

技术实现细节

循环依赖问题解决

在项目开发过程中,我们遇到了logger.jsconfig.js之间的循环依赖问题:

问题描述

  • logger.js需要从config.js导入日志级别配置
  • config.js需要从logger.js导入createLogger函数来记录日志
  • 这导致了ReferenceError: Cannot access 'createLogger' before initialization错误

解决方案

  1. 延迟日志级别初始化

    • logger.js中使用默认日志级别('info'),避免从config.js导入
    • 添加loggers数组存储所有创建的日志记录器实例
    • 添加updateLogLevel函数,用于在应用启动后动态更新所有日志记录器的日志级别
  2. 在应用启动后更新日志级别

    • daemon.js中导入updateLogLevel函数
    • 在应用启动后,使用config.get('logger.level')获取配置的日志级别
    • 调用updateLogLevel函数更新所有日志记录器的日志级别

相关代码

// logger.js 中的 updateLogLevel 函数 const updateLogLevel = (level) => { loggers.forEach(logger => { logger.level = level; }); console.log(`Updated all loggers to level: ${level}`); }; // 导出 updateLogLevel 函数 export default createLogger; export { updateLogLevel }; // daemon.js 中的使用 import { updateLogLevel } from './utils/logger.js'; async function main() { // 启动应用 await app.start(); // 应用启动后更新日志级别 const logLevel = config.get('logger.level', 'info'); updateLogLevel(logLevel); logger.info(`Updated log level to: ${logLevel}`); }

这种解决方案避免了循环依赖问题,同时保持了从配置文件读取日志级别的灵活性。

并发数据自动调整机制

系统实现了智能的并发数据自动调整机制,根据系统负载动态调整任务并发度,具体实现如下:

  1. 核心配置参数

    • enableDynamicConcurrency:是否启用动态并发调节,默认true
    • concurrencyAdjustmentInterval:并发调节间隔(毫秒),默认2000
    • concurrencyAdjustmentStep:并发数调节步长,默认2
    • lowLoadThreshold:低负载阈值(%),低于此值时增加并发数,默认20
    • highLoadThreshold:高负载阈值(%),高于此值时减少并发数,默认90
    • minConcurrency:最小并发数,默认1
    • maxConcurrency:最大并发数,默认根据CPU核心数自动计算(CPU核心数 * 0.8)
  2. 实现逻辑

    • 系统定期(默认每2秒)检查当前系统负载
    • 当系统负载低于低负载阈值且当前并发数小于最大并发数时,增加并发数
    • 当系统负载高于高负载阈值且当前并发数大于最小并发数时,将并发数调整为最小并发数
    • 并发数调整后,立即尝试执行更多任务(如果队列中有等待的任务)
  3. 算法细节

    • 增加并发数时:newConcurrency = Math.min(currentMaxConcurrency + concurrencyAdjustmentStep, maxConcurrency)
    • 减少并发数时:newConcurrency = minConcurrency
    • 系统负载计算:综合考虑CPU使用率、内存使用率和系统整体负载
  4. 事件通知

    • 当并发数发生变化时,系统会发射concurrency-adjusted事件
    • 事件包含旧并发数、新并发数、CPU使用率、内存使用率和系统负载等信息
  5. 系统过载处理

    • 当系统负载超过最大系统负载阈值(默认90%)时,系统会拒绝新任务提交
    • 当系统负载降至恢复阈值(默认70%)以下时,系统会恢复接受新任务

这种实现方式确保了系统能够根据实际负载智能地调整并发度,在保证系统稳定性的同时,最大化利用系统资源。

任务类型分类与管理

系统将任务分为两种类型:

  1. IO密集型任务

    • HTTP/HTTPS视频流下载任务
    • WebSocket FLV流下载任务
    • 这些任务主要受网络带宽限制,CPU使用率较低
    • 使用固定的最大并发数限制
  2. CPU密集型任务

    • 视频去水印(delog)任务
    • 这些任务需要大量CPU资源进行视频处理
    • 使用动态调整的并发数限制

系统会分别管理这两种类型的任务队列,确保系统资源得到合理分配,避免某一种类型的任务占用过多系统资源。

许可证

MIT