当前位置: 首页 > news >正文

DolphinDB横截面引擎:实时统计分析

目录

    • 摘要
    • 一、横截面引擎概述
      • 1.1 什么是横截面引擎
      • 1.2 横截面引擎 vs 时间序列引擎
      • 1.3 适用场景
    • 二、创建横截面引擎
      • 2.1 基本语法
      • 2.2 创建简单引擎
      • 2.3 创建带分组的引擎
    • 三、触发模式
      • 3.1 触发模式类型
      • 3.2 每行触发
      • 3.3 定时触发
    • 四、聚合指标
      • 4.1 基本统计
      • 4.2 百分位统计
      • 4.3 Top-N计算
    • 五、实战案例
      • 5.1 设备实时排名
      • 5.2 实时异常检测
      • 5.3 生产效率监控
    • 六、引擎管理
      • 6.1 查看引擎状态
      • 6.2 删除引擎
      • 6.3 引擎监控
    • 七、性能优化
      • 7.1 分组数量优化
      • 7.2 触发模式选择
      • 7.3 内存管理
    • 八、总结
    • 参考资料

摘要

本文深入讲解DolphinDB横截面引擎。从引擎原理到创建配置,从实时统计到Top-N计算,从分组分析到性能优化,全面介绍横截面引擎的核心功能。通过丰富的代码示例,帮助读者掌握实时统计分析的核心技能。


一、横截面引擎概述

1.1 什么是横截面引擎

横截面引擎是对同一时刻所有分组的数据进行聚合计算:

横截面引擎

时刻T

设备1数据

设备2数据

设备3数据

横截面聚合

整体统计结果

1.2 横截面引擎 vs 时间序列引擎

特性时间序列引擎横截面引擎
聚合维度时间窗口内同一时刻所有分组
输出频率窗口结束每条数据触发
适用场景时间趋势实时排名、整体统计

1.3 适用场景

场景说明
实时排名设备实时排名
整体统计所有设备整体统计
Top-N计算实时Top-N
异常检测整体异常检测

二、创建横截面引擎

2.1 基本语法

//创建横截面引擎 agg=createCrossSectionalEngine("engine_name",//引擎名称 metrics,//聚合指标 outputTable,//输出表 timeColumn,//时间列[keyColumn],//分组列(可选)[triggeringPattern],//触发模式[triggeringInterval]//触发间隔)

2.2 创建简单引擎

//创建输入流表 share streamTable(1:0,`device_id`timestamp`temperature,[INT,TIMESTAMP,DOUBLE])asinput_stream//创建输出表 share table(1:0,`timestamp`avg_temp`max_temp`min_temp`device_count,[TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,LONG])asoutput_table//创建横截面引擎 agg=createCrossSectionalEngine("cs_engine",<[avg(temperature)asavg_temp,max(temperature)asmax_temp,min(temperature)asmin_temp,count(*)asdevice_count]>,output_table,`timestamp)//订阅流表 subscribeTable(,"input_stream","cs_agg",-1,agg,true)

2.3 创建带分组的引擎

//创建带分组的横截面引擎 agg=createCrossSectionalEngine("grouped_cs_engine",<[avg(temperature)asavg_temp,max(temperature)asmax_temp]>,output_table,`timestamp,`device_id)//分组说明://-每个device_id维护最新值//-聚合计算所有device_id的最新值

三、触发模式

3.1 触发模式类型

模式说明
perRow每条数据触发
perBatch每批数据触发
interval定时触发

3.2 每行触发

//每行触发:每条数据都触发计算 agg=createCrossSectionalEngine("per_row_engine",<[avg(temperature)asavg_temp]>,output_table,`timestamp,`device_id,"perRow")//每行触发

3.3 定时触发

//定时触发:每隔N毫秒触发 agg=createCrossSectionalEngine("interval_engine",<[avg(temperature)asavg_temp]>,output_table,`timestamp,`device_id,"interval",1000)//1秒触发

四、聚合指标

4.1 基本统计

//基本统计指标 agg=createCrossSectionalEngine("basic_stats",<[avg(temperature)asavg_temp,sum(temperature)assum_temp,max(temperature)asmax_temp,min(temperature)asmin_temp,count(*)asdevice_count,std(temperature)asstd_temp]>,output_table,`timestamp,`device_id)

4.2 百分位统计

//百分位统计 agg=createCrossSectionalEngine("percentile_stats",<[percentile(temperature,50)asmedian,percentile(temperature,95)asp95,percentile(temperature,99)asp99]>,output_table,`timestamp,`device_id)

4.3 Top-N计算

//Top-N计算:需要自定义函数//使用top函数获取Top-N//创建输出表 share table(1:0,`timestamp`top_devices`top_temps,[TIMESTAMP,STRING,STRING])astopn_table//创建引擎 agg=createCrossSectionalEngine("topn_engine",<[concat(string(device_id),",")astop_devices,concat(string(temperature),",")astop_temps]>,topn_table,`timestamp,`device_id)

五、实战案例

5.1 设备实时排名

//==========1.创建流表==========share streamTable(100000:0,`device_id`timestamp`temperature`humidity,[INT,TIMESTAMP,DOUBLE,DOUBLE])assensor_stream//==========2.创建输出表==========share table(1:0,`timestamp`avg_temp`max_temp`min_temp`device_count`std_temp,[TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,LONG,DOUBLE])asstats_table//==========3.创建横截面引擎==========agg=createCrossSectionalEngine("ranking_engine",<[avg(temperature)asavg_temp,max(temperature)asmax_temp,min(temperature)asmin_temp,count(*)asdevice_count,std(temperature)asstd_temp]>,stats_table,`timestamp,`device_id,"interval",5000)//5秒输出//==========4.订阅流表==========subscribeTable(,"sensor_stream","ranking_agg",-1,agg,true)//==========5.模拟数据==========defsimulateRanking(){for(iin1..100){sensor_stream.append!(table(1..100asdevice_id,take(now(),100)astimestamp,rand(20.0..30.0,100)astemperature,rand(40.0..60.0,100)ashumidity))sleep(1000)}}simulateRanking()//查看结果 select top20*fromstats_table

5.2 实时异常检测

//==========1.创建流表==========share streamTable(100000:0,`device_id`timestamp`temperature,[INT,TIMESTAMP,DOUBLE])assensor_stream//==========2.创建输出表==========share table(1:0,`timestamp`avg_temp`std_temp`threshold`anomaly_count,[TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,LONG])asanomaly_table//==========3.创建异常检测引擎==========agg=createCrossSectionalEngine("anomaly_engine",<[avg(temperature)asavg_temp,std(temperature)asstd_temp,avg(temperature)+3*std(temperature)asthreshold,sum(iif(temperature>avg(temperature)+3*std(temperature),1,0))asanomaly_count]>,anomaly_table,`timestamp,`device_id,"interval",10000)//10秒检测//==========4.订阅流表==========subscribeTable(,"sensor_stream","anomaly_agg",-1,agg,true)

5.3 生产效率监控

//==========1.创建流表==========share streamTable(100000:0,`production_line`timestamp`output`efficiency,[SYMBOL,TIMESTAMP,DOUBLE,DOUBLE])asproduction_stream//==========2.创建输出表==========share table(1:0,`timestamp`total_output`avg_efficiency`max_efficiency`line_count,[TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,LONG])asproduction_stats//==========3.创建监控引擎==========agg=createCrossSectionalEngine("production_engine",<[sum(output)astotal_output,avg(efficiency)asavg_efficiency,max(efficiency)asmax_efficiency,count(*)asline_count]>,production_stats,`timestamp,`production_line,"interval",60000)//每分钟统计//==========4.订阅流表==========subscribeTable(,"production_stream","production_agg",-1,agg,true)

六、引擎管理

6.1 查看引擎状态

//查看所有引擎状态 getStreamEngineStat()//查看特定引擎 getStreamEngineStat("cs_engine")

6.2 删除引擎

//删除引擎 dropStreamEngine("cs_engine")

6.3 引擎监控

//引擎监控函数defmonitorCSEngine(){stat=getStreamEngineStat()for(rowinstat){if(row.type=="CrossSectionalAggregator"){print("横截面引擎: "+row.name)print(" 状态: "+row.status)print(" 分组数: "+string(row.numGroups))print(" 处理行数: "+string(row.processedRows))}}}monitorCSEngine()

七、性能优化

7.1 分组数量优化

//分组数量建议//单引擎分组数<100000//如果分组数过多://1.使用多个引擎//2.增加触发间隔

7.2 触发模式选择

场景推荐触发模式
实时监控perRow
周期统计interval
批量处理perBatch

7.3 内存管理

//横截面引擎内存使用//=分组数 × 每组数据大小//优化://1.减少分组数//2.使用过滤条件//3.定期清理不活跃分组

八、总结

本文详细介绍了DolphinDB横截面引擎:

  1. 引擎原理:同一时刻所有分组聚合
  2. 创建方法:简单引擎、分组引擎
  3. 触发模式:每行触发、定时触发
  4. 聚合指标:基本统计、百分位、Top-N
  5. 实战应用:实时排名、异常检测、效率监控
  6. 性能优化:分组数量、触发模式、内存管理

思考题

  1. 横截面引擎和时间序列引擎有什么区别?
  2. 如何选择合适的触发模式?
  3. 如何设计实时排名系统?

参考资料

  • DolphinDB横截面引擎
  • DolphinDB流计算
http://www.zskr.cn/news/1423132.html

相关文章:

  • 【万字文档+全套源码】 基于SpringBoot+Vue的智能化酒店管理系统-计算机专业项目设计分享
  • 2026东莞全屋翻新整装实力企业盘点:本土匠心品牌领跑行业 - 资讯纵览
  • 别再手动算矩阵了!用Python+Eigen库5分钟搞定激光雷达与车体坐标系的自动标定
  • AI与机器学习如何重塑房地产:从估值到客户匹配的技术实践
  • VMware 17安装CentOS避坑全记录:从镜像选择、磁盘分区到网络配置,新手必看
  • 构建之法阅读笔记09
  • ChatGPT内容完美导入Word:告别格式丢失的4大实用方案
  • 郑州买灯找谁?家装灯具优选|科伦蒂照明郑州旗舰店全新升级启幕 - 资讯纵览
  • SAP CO02工单组件批量维护实战:用ABAP BAPI实现增删改查的完整代码与避坑指南
  • QuPath完整指南:如何用开源软件实现病理图像的精准分析
  • 2026年北京美甲美睫品牌推荐榜,专业推荐前五名 - 资讯快报
  • 云计算15年:多类型项目风险与成本并存,借鉴经验才能蓬勃发展!
  • 2026东莞旧房翻新企业优选盘点:深耕本土品质 焕新人居环境 - 资讯纵览
  • 2026年主流AI漫剧工具多维排行与团队选型参考 - 资讯纵览
  • Windows 11开始菜单终极修复指南:三步恢复磁贴并自定义任务栏
  • FanControl技术深度解析:Windows平台高级风扇控制架构与实践
  • 雷电冲击,老师傅的放心选择
  • STM32串口发送中断实战:用TC标志位实现字符串发送的完整流程与注意事项
  • 2026年十大月子中心推荐:口碑与专业度排名解析 - 资讯快报
  • 从数据拟合到参数估计:一次搞懂正态/对数正态分布在数据分析中的实际应用(含MATLAB/ Python对比)
  • 郑州新郑市家电维修清洗|维小达 专业空调、冰箱、洗衣机、热水器、电视、油烟机、灶具、消毒柜、小家电维修清洗一站式服务 - 维小达科技
  • Intel Arc显卡在Linux下的AI性能实测:对比CPU/iGPU,MULTI插件协同推理效率提升多少?
  • 上海周末搬迁哪家搬场公司可以安排|3个核心选商标准+实操流程 - 知行集录
  • 从‘读心术’到决策树:用Pandas和NumPy复现ID3算法,实战筛选最佳特征
  • Kiro Agent Hooks:文件一保存,AI 自动帮你跑测试、补文档、查规范
  • 告别迷茫!CANoe 11.0保姆级界面导航:从打开官方例程到看懂每个功能区
  • 实验20 自动灭火场景实验
  • 量子计算在动态平均场理论中的创新应用
  • 2026 年 Q1 云厂商财报增速亮眼,“卖算力”难撑利润,谁能过渡到“卖不可替代性”?
  • 从手机屏幕到摄影打光:搞懂色温与显色性,让你的照片和视频告别‘阴间滤镜’