3个信号、2个环境变量、0个采集器:使用 Python 和 Elastic 的托管 OTLP 端点实现 OpenTelemetry

3个信号、2个环境变量、0个采集器:使用 Python 和 Elastic 的托管 OTLP 端点实现 OpenTelemetry

作者:来自 Elastic Jeffrey Rengifo

使用 OpenTelemetry 为 Flask API 添加监测,并仅通过 2 个环境变量将追踪、指标和日志发送到 Elastic Cloud,无需采集器。

Elastic 的 托管 OTLP 端点 允许你的 Python 应用仅使用标准的 OpenTelemetry SDK 和 2 个环境变量,直接将追踪、指标和日志发送到 Elastic Cloud。无需部署、配置或维护 Collector。

本文将通过一个完整添加监测的 Flask API 示例,演示从创建第一个 span 到在 Kibana 服务地图中查看服务的全过程,并实现日志与追踪关联。

前提条件

  • 一个 Elastic Cloud 账户(Serverless 或 Cloud Hosted v9.0+)

  • Python 3.9+

  • Kibana 内置的示例 eCommerce orders 数据集(Add data > Sample data > Sample eCommerce orders

安装所需的包:

pip install flask elasticsearch python-dotenv \ opentelemetry-api opentelemetry-sdk \ opentelemetry-exporter-otlp

什么是 Elastic 托管 OTLP 端点?

Elastic 托管 OTLP 端点是一个托管的摄取层,支持应用直接通过 HTTP 或 gRPC 接收标准 OTLP 数据,无需使用采集器。

OpenTelemetry 使用 OTLP(OpenTelemetry 协议) 来传输遥测数据。

在传统架构中,应用会将数据发送到 OpenTelemetry Collector,再由 Collector 转发到 Elasticsearch 等后端系统。

Collector 负责批处理、重试和路由,但也意味着你需要额外部署、配置并维护一个组件。

Elastic 的 托管 OTLP 端点(近期正式发布)移除了这一步骤。

你的应用可以直接将 OTLP 数据发送到 Elastic 托管的端点。

该端点支持标准 OTLP over HTTP 或 gRPC,并由托管摄取层提供支撑,负责扩展、缓冲和可靠性保障。

你使用的仍然是同一套 opentelemetry-exporter-otlp 包,这些包与任何后端兼容。

使用 OpenTelemetry 构建 Python Flask API

我们将用 Python 和 Flask 构建一个小型 REST API,用于列出和查询存储在 Elasticsearch 中的 eCommerce 订单。

作为数据源,我们将使用 Kibana 内置的示例数据集。

你可以在 配套代码仓库 中找到完整的应用代码。

下面的教程会逐步构建代码,但如果你愿意,也可以直接克隆仓库并跟着一起操作。

在添加遥测之前,我们先看基础应用:一个用于查询 Kibana 示例 eCommerce 索引的 Flask API。

它包含两个端点:列出最近订单,以及通过 ID 查询单个订单。

创建一个名为 app.py 的文件:

import os from dotenv import load_dotenv from elasticsearch import Elasticsearch from flask import Flask, jsonify load_dotenv() # Elasticsearch client es = Elasticsearch( hosts=[os.environ["ES_URL"]], api_key=os.environ["ES_API_KEY"], ) INDEX = "kibana_sample_data_ecommerce" app = Flask(__name__) @app.route("/orders") def list_orders(): response = es.search( index=INDEX, size=10, sort=[{"order_date": "desc"}], aggs={"total_revenue": {"sum": {"field": "taxful_total_price"}}}, ) hits = response["hits"]["hits"] total_revenue = response["aggregations"]["total_revenue"]["value"] orders = [ { "order_id": h["_source"]["order_id"], "customer": h["_source"]["customer_full_name"], "total": h["_source"]["taxful_total_price"], "date": h["_source"]["order_date"], } for h in hits ] return jsonify({"orders": orders, "total_revenue": total_revenue}) @app.route("/orders/<order_id>") def get_order(order_id): response = es.search( index=INDEX, size=1, query={"term": {"order_id": order_id}}, ) hits = response["hits"]["hits"] if not hits: return jsonify({"error": "Order not found"}), 404 return jsonify(hits[0]["_source"]) if __name__ == "__main__": app.run(host="0.0.0.0", port=5001)

创建一个 .env 文件,填写你的 Elasticsearch 凭证。

要获取 OTLP 端点和 API Key,请参考 快速开始:将 OTLP 数据发送到 Elastic 指南。

OTEL_EXPORTER_OTLP_ENDPOINT= OTEL_EXPORTER_OTLP_HEADERS=Authorization=ApiKey <api_key> ES_URL= ES_API_KEY=

这是一个未进行任何监测(instrumentation)的可运行 API。接下来的部分将逐步为其添加追踪、指标和日志,每次只引入一种信号。

从 Python 发送 traces

我们先从 traces 开始,使用 Python OpenTelemetry SDK。

我们会用 spans 包裹 Elasticsearch 调用,从而观察每个查询的耗时。

设置 tracer

在 app.py 的顶部添加以下内容:

from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import Resource, SERVICE_NAME # Create a resource identifying your service resource = Resource.create({ SERVICE_NAME: "my-python-app" }) # Set up the tracer provider with OTLP export tracer_provider = TracerProvider(resource=resource) tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) trace.set_tracer_provider(tracer_provider) tracer = trace.get_tracer(__name__)


为你的 endpoints 添加 spans

现在将每个 route handler 用一个 parent span 包裹,并在 Elasticsearch 调用外再嵌套一个 child span。这样你就得到一个两层 trace:一个 span 表示 HTTP 请求,另一个 span 表示内部数据库查询。

@app.route("/orders") def list_orders(): with tracer.start_as_current_span("list-orders") as span: with tracer.start_as_current_span("es.search") as es_span: es_span.set_attribute("db.system", "elasticsearch") es_span.set_attribute("db.elasticsearch.index", INDEX) response = es.search( index=INDEX, size=10, sort=[{"order_date": "desc"}], aggs={"total_revenue": {"sum": {"field": "taxful_total_price"}}}, ) hits = response["hits"]["hits"] total_revenue = response["aggregations"]["total_revenue"]["value"] orders = [ { "order_id": h["_source"]["order_id"], "customer": h["_source"]["customer_full_name"], "total": h["_source"]["taxful_total_price"], "date": h["_source"]["order_date"], } for h in hits ] span.set_attribute("orders.returned", len(orders)) return jsonify({"orders": orders, "total_revenue": total_revenue})

每个请求都会创建一个 parent span(list-orders),并包含一个 child span(es.search)用于包裹 Elasticsearch 调用。

db.systemdb.elasticsearch.index 属性是 Kibana 用来在 trace waterfall 中识别该调用为 Elasticsearch 查询的关键字段。

从 Python 发送 metrics

metrics 用于揭示跨所有请求的模式:有多少调用命中了每个 endpoint、响应时间如何随时间变化。我们现在添加一个 counter 和 histogram 来捕获这些信息。

设置 meter

在 app.py 中添加以下内容:

from opentelemetry import metrics from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter # 设置 metric reader 和 provider metric_reader = PeriodicExportingMetricReader( OTLPMetricExporter(), export_interval_millis=10000 # 每 10 秒导出一次 ) meter_provider = MeterProvider( resource=resource, metric_readers=[metric_reader] ) metrics.set_meter_provider(meter_provider) meter = metrics.get_meter(__name__)

创建 instruments

# 请求总数 counter request_counter = meter.create_counter( name="app.requests.total", description="Total number of requests", unit="1" ) # 请求耗时 histogram request_duration = meter.create_histogram( name="app.request.duration", description="Request duration in seconds", unit="s" )

在 endpoints 中记录 metrics

更新 route handler 来记录 metrics:

import time @app.route("/orders") def list_orders(): start_time = time.time() with tracer.start_as_current_span("list-orders") as span: with tracer.start_as_current_span("es.search") as es_span: es_span.set_attribute("db.system", "elasticsearch") es_span.set_attribute("db.elasticsearch.index", INDEX) response = es.search( index=INDEX, size=10, sort=[{"order_date": "desc"}], aggs={"total_revenue": {"sum": {"field": "taxful_total_price"}}}, ) hits = response["hits"]["hits"] total_revenue = response["aggregations"]["total_revenue"]["value"] orders = [ { "order_id": h["_source"]["order_id"], "customer": h["_source"]["customer_full_name"], "total": h["_source"]["taxful_total_price"], "date": h["_source"]["order_date"], } for h in hits ] duration = time.time() - start_time span.set_attribute("orders.returned", len(orders)) request_counter.add(1, {"endpoint": "/orders", "status": "200"}) request_duration.record(duration, {"endpoint": "/orders"}) return jsonify({"orders": orders, "total_revenue": total_revenue})

从 Python 发送 logs

traces 和 metrics 告诉你发生了什么以及发生频率。logs 捕获细节:错误信息、参数值和调试上下文。OpenTelemetry 通过桥接 Python 的标准 `` 模块,让你现有的日志语句与 traces 和 metrics 一起通过 OTLP 导出。

当你在 span 上下文中输出 log 时,SDK 会自动附加 trace ID 和 span ID,因此你可以从一条 log 直接跳转到生成它的 trace。

设置 log provider

import logging from opentelemetry._logs import set_logger_provider from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler from opentelemetry.sdk._logs.export import BatchLogRecordProcessor from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter # 设置 logger provider logger_provider = LoggerProvider(resource=resource) logger_provider.add_log_record_processor( BatchLogRecordProcessor(OTLPLogExporter()) ) set_logger_provider(logger_provider) # 将 Python logging 接入 OpenTelemetry handler = LoggingHandler( level=logging.INFO, logger_provider=logger_provider ) logging.getLogger().addHandler(handler) logging.getLogger().addHandler(logging.StreamHandler()) # 同时输出到终端 logging.getLogger().setLevel(logging.INFO) logger = logging.getLogger(__name__)

LoggingHandler()确保 logs 在开发时也会显示在你的终端中。

如果没有它,logs 只会发送到 Elastic,而终端不会输出内容,这在测试时可能会让人困惑。

从 endpoints 发送 logs

@app.route("/orders") def list_orders(): start_time = time.time() with tracer.start_as_current_span("list-orders") as span: logger.info("列出最近订单") with tracer.start_as_current_span("es.search") as es_span: es_span.set_attribute("db.system", "elasticsearch") es_span.set_attribute("db.elasticsearch.index", INDEX) response = es.search( index=INDEX, size=10, sort=[{"order_date": "desc"}], aggs={"total_revenue": {"sum": {"field": "taxful_total_price"}}}, ) hits = response["hits"]["hits"] total_revenue = response["aggregations"]["total_revenue"]["value"] orders = [ { "order_id": h["_source"]["order_id"], "customer": h["_source"]["customer_full_name"], "total": h["_source"]["taxful_total_price"], "date": h["_source"]["order_date"], } for h in hits ] duration = time.time() - start_time span.set_attribute("orders.returned", len(orders)) logger.info( "订单已列出", extra={"orders.returned": len(orders), "duration_s": round(duration, 4)} ) request_counter.add(1, {"endpoint": "/orders", "status": "200"}) request_duration.record(duration, {"endpoint": "/orders"}) return jsonify({"orders": orders, "total_revenue": total_revenue})

因为这些 logger.info() 调用发生在 span 上下文中,OpenTelemetry 会自动为每条日志记录附加 trace ID 和 span ID。

在 Kibana 中,这意味着你可以从一条 log 直接跳转到对应的 trace。

运行已添加监测的 Flask 应用并生成流量

Traces、metrics 和 logs 已经全部接入后,应用就完成了完整的可观测性配置。现在我们运行它、发送请求,并验证数据是否进入 Kibana。

python app.py

注意:Flask 自带的开发服务器适用于本教程。在生产环境中,应使用类似 gunicorn (花生壳) 的 WSGI 服务器。

生成一些流量:

curl http://localhost:5001/orders curl http://localhost:5001/orders/584677 curl http://localhost:5001/orders/does-not-exist

几秒后,打开 Kibana,进入Observability > APM > Services

你应该能看到 my-python-app 被列出。

在 Kibana 中查看 traces、metrics 和 logs

现在遥测数据已经开始流动,我们来看看每种信号在 Kibana 中的落点,以及它们是如何相互关联的。

Traces

进入APM > Services > my-python-app > Transactions。点击一个 list-orders transaction 来打开 trace waterfall。

你应该会看到 parent span list-orders,以及一个 child spanes.search。

我们之前设置的 db.systemdb.elasticsearch.index 属性会显示在 span 详情中,同时 Kibana 会将这个 child 识别为一个 Elasticsearch 查询。

失败的 transactions

为了查看错误可见性,打开一个针对/orders/does-not-exist请求的get-order transaction。

transaction 详情中会显示 event.outcome: failure,以及我们附加的order.id: does-not-exist 属性。

这表明错误状态和自定义属性可以正确地通过托管端点进行传播。

关联日志(Correlated logs)

进入Discover,按service.name: my-python-app. 进行过滤。

展开一条日志文档,可以看到 body.text,trace_id和 span_id 字段。

由于我们是在 span 上下文中发送 logs,每条日志记录都会携带 trace ID。

你可以复制 trace_id 的值,然后进入APM,直接跳转到生成该日志的 trace。

Metrics

Discover中切换到 metrics-* data view。

你应该会看到 app.requests.total​​​​​​​counter 和app.request.duration histogram 以固定间隔持续到达。

你也可以在 Lens 中创建一个可视化:app.requests.total 在 Y 轴使用@timestamp 在X 轴使用,来观察请求量随时间的变化。

服务地图

Kibana 会基于我们之前设置的 span 属性自动构建 服务地图。

因为我们在 Elasticsearch spans db.system: elasticsear 中标记了,Kibana 会绘制my-python-app和 elasticsearch 之间的依赖关系,从而为你提供服务及其后端的可视化概览。

总结:在 Python 中无需 Collector 即可实现完整 OpenTelemetry 可观测性

在本教程中,我们使用标准 OpenTelemetry SDK 和两个环境变量为 Flask API 添加了 traces、metrics 和 logs,然后在 Kibana 中验证了所有三种信号,并实现了内置的 log-to-trace 关联。

托管 OTLP 端点负责处理扩展、缓冲和持久化,因此你可以专注于应用本身,而无需运维摄取基础设施。

接下来,你可以考虑使用 EDOT Python 进行自动埋点,以移除手动 spans,或者添加更贴合业务领域的自定义 metrics。

下一步

  • 查看 Elastic 托管 OTLP 端点文档 了解高级配置。

  • 尝试 `` 或 EDOT Python,在无需手动 spans 的情况下自动为 Flask 和 Elasticsearch 客户端进行埋点。

  • 查阅 OpenTelemetry Python SDK 文档 获取更多埋点选项。

  • 如需对 histogram 的时序特性进行细粒度控制,请参考 托管 OTLP 端点文档。

原文:OpenTelemetry Python to Elastic in 2 environment variables — Elastic Observability Labs