当前位置: 首页 > news >正文

PHP数据管道与流式计算框架

PHP数据管道与流式计算框架

数据管道是数据处理的核心模式。PHP可以实现简单的流式计算框架,对数据流进行实时处理。今天说说PHP数据管道的设计。

数据管道的核心是各阶段的处理节点。数据从输入节点流入,经过多个处理节点,最终从输出节点流出。

```php
interface PipelineNode
{
public function process(mixed $data): mixed;
public function setName(string $name): void;
public function getName(): string;
}

class InputNode implements PipelineNode
{
private string $name = 'input';

public function setName(string $name): void
{
$this->name = $name;
}

public function getName(): string
{
return $this->name;
}

public function process(mixed $data): mixed
{
echo "输入: " . json_encode($data) . "\n";
return $data;
}
}

class FilterNode implements PipelineNode
{
private string $name = 'filter';
private callable $predicate;

public function __construct(callable $predicate)
{
$this->predicate = $predicate;
}

public function setName(string $name): void
{
$this->name = $name;
}

public function getName(): string
{
return $this->name;
}

public function process(mixed $data): mixed
{
$result = ($this->predicate)($data);
echo "过滤: " . ($result ? '通过' : '丢弃') . "\n";
return $result ? $data : null;
}
}

class TransformNode implements PipelineNode
{
private string $name = 'transform';
private callable $transformer;

public function __construct(callable $transformer)
{
$this->transformer = $transformer;
}

public function setName(string $name): void
{
$this->name = $name;
}

public function getName(): string
{
return $this->name;
}

public function process(mixed $data): mixed
{
$result = ($this->transformer)($data);
echo "转换: " . json_encode($data) . " -> " . json_encode($result) . "\n";
return $result;
}
}

class AggregateNode implements PipelineNode
{
private string $name = 'aggregate';
private array $buffer = [];
private int $batchSize;
private callable $aggregator;

public function __construct(int $batchSize, callable $aggregator)
{
$this->batchSize = $batchSize;
$this->aggregator = $aggregator;
}

public function setName(string $name): void
{
$this->name = $name;
}

public function getName(): string
{
return $this->name;
}

public function process(mixed $data): mixed
{
$this->buffer[] = $data;

if (count($this->buffer) >= $this->batchSize) {
$result = ($this->aggregator)($this->buffer);
$this->buffer = [];
echo "聚合: " . json_encode($result) . "\n";
return $result;
}

return null;
}

public function flush(): mixed
{
if (empty($this->buffer)) return null;
$result = ($this->aggregator)($this->buffer);
$this->buffer = [];
return $result;
}
}

class OutputNode implements PipelineNode
{
private string $name = 'output';

public function setName(string $name): void
{
$this->name = $name;
}

public function getName(): string
{
return $this->name;
}

public function process(mixed $data): mixed
{
echo "输出: " . json_encode($data) . "\n";
return $data;
}
}

class DataPipeline
{
private array $nodes = [];

public function addNode(PipelineNode $node): void
{
$this->nodes[] = $node;
}

public function process(mixed $data): array
{
$results = [];

foreach ($data as $item) {
$current = $item;
foreach ($this->nodes as $node) {
$current = $node->process($current);
if ($current === null) break;
}

if ($current !== null) {
$results[] = $current;
}
}

// 处理聚合节点中剩余的数据
foreach ($this->nodes as $node) {
if ($node instanceof AggregateNode) {
$remaining = $node->flush();
if ($remaining !== null) {
$results[] = $remaining;
}
}
}

return $results;
}
}

// 创建管道
$pipeline = new DataPipeline();
$pipeline->addNode(new InputNode());
$pipeline->addNode(new FilterNode(fn($item) => $item['status'] === 'paid'));
$pipeline->addNode(new TransformNode(fn($item) => [
'order_id' => $item['order_id'],
'amount' => $item['amount'],
'user' => $item['user'],
'final_amount' => $item['amount'] * 0.95,
]));
$pipeline->addNode(new OutputNode());

$orders = [
['order_id' => 1, 'amount' => 100, 'user' => '张三', 'status' => 'paid'],
['order_id' => 2, 'amount' => 200, 'user' => '李四', 'status' => 'unpaid'],
['order_id' => 3, 'amount' => 300, 'user' => '王五', 'status' => 'paid'],
];

echo "开始处理订单数据...\n";
$results = $pipeline->process($orders);
echo "\n处理完成,共 " . count($results) . " 条结果\n";
?>
```

窗口计算是流式计算的常见模式:

```php
class WindowCalculator
{
private array $window = [];
private int $windowSize;

public function __construct(int $windowSize = 5)
{
$this->windowSize = $windowSize;
}

public function add(mixed $value): void
{
$this->window[] = $value;
if (count($this->window) > $this->windowSize) {
array_shift($this->window);
}
}

public function sum(): float
{
return array_sum($this->window);
}

public function average(): float
{
if (empty($this->window)) return 0;
return array_sum($this->window) / count($this->window);
}

public function max(): float
{
return max($this->window);
}

public function min(): float
{
return min($this->window);
}

public function count(): int
{
return count($this->window);
}

public function reset(): void
{
$this->window = [];
}
}

$calculator = new WindowCalculator(3);

// 模拟流式数据
$data = [10, 20, 30, 40, 50, 60, 70, 80];

echo "滑动窗口计算:\n";
foreach ($data as $i => $value) {
$calculator->add($value);
echo " 添加 {$value}: 平均={$calculator->average()}, 和={$calculator->sum()}\n";
}
?>
```

PHP的数据管道实现虽然不如Kafka Streams或Flink专业,但在数据量不大的场景下完全够用。管道模式让数据处理逻辑模块化,每个节点各司其职,便于测试和维护。适合日志处理、数据清洗、实时计算等场景。

http://www.zskr.cn/news/1452652.html

相关文章:

  • 数据科学中的多元化与算法公平性:从理论到工程实践
  • STM32F407三轴CNC控制器固件包:兼容GRBL、500kHz脉冲输出、全功能驱动模块
  • 从无人机到智能车:手把手教你用自适应Kalman滤波搞定传感器数据融合(Python实战)
  • 第一次课
  • Linux 系统新玩法:用 NVIDIA GPU 显存作交换空间,提升可寻址内存
  • Horseshoe先验在稀疏信号预测中的理论最优性与自适应应用
  • 2026年最新黄石市黄金回收铂金回收白银回收彩金回收解析:口碑排行前五门店筛选及避坑要点和联系方式推荐 - 亦辰小黄鸭
  • UE5 UMG性能优化实战:如何高效绘制实时更新的多曲线图表?
  • BetterJoy深度解析:让Switch手柄在Windows上获得完美XInput支持的技术方案
  • 智慧树自动刷课插件:5分钟实现视频学习自动化完整指南
  • 国内头部猎头公司实测对比:哪家更适配中高端求职 - 得赢
  • 2026年最新惠州市黄金回收铂金回收白银回收彩金回收解析:口碑排行前五门店筛选及避坑要点和联系方式推荐 - 亦辰小黄鸭
  • Java 应用 CPU 过高排查全流程
  • AI 简历到底能不能过企业 ATS 系统?实测对比
  • 2026石家庄名包回收店铺多店横评,教你轻松选出高性价比渠道 - 奢侈品回收测评
  • 【真实经验分享】Oracle Data Guard 化身分裂之谜:一个 VALID_FOR 参数引发的级联灾难
  • 404 Media 起诉 ICE,索要 200 万美元间谍软件合同文件,获大量涂黑内容
  • WeChatPad:突破微信设备限制的技术方案
  • 不止是格式:Pattern Recognition投稿中那些没人告诉你的‘潜规则’与编辑视角
  • C# .NET项目一键接入微信、支付宝、银联支付的开箱即用封装包
  • Java 频繁GC 完整排查流程
  • SNAP 9.0处理Sentinel-1 SLC数据:一个简化流程的实战分享(避坑PolSARpro导入失败)
  • 免费微信聊天记录永久保存终极指南:WeChatMsg让数据真正属于你
  • AWR2944开发板实测DDM雷达原始数据+MATLAB一键处理脚本
  • Adobe Firefly 3.0+Figma AI Beta双引擎深度评测:实测17个真实项目,响应延迟下降68%但存在3个致命兼容盲区
  • 从‘算得对’到‘证得清’:一个非数学专业生的《数学分析》自学踩坑与上岸心得
  • 企业网络割接避坑指南:为什么你的深信服AD配置完上不了网?
  • B站视频转文字的终极方案:Bili2text完整指南让知识提取效率翻倍
  • AdaMamba:自适应Mamba模型在时间序列预测中的创新应用
  • 别再只会拖路由器了!EVE-NG里用VPCS模拟真实PC的5个实战场景(附完整命令清单)