Telegraf를 활용한 Prometheus exporter metric 수집 및 DB 적재
filestats exporter
(프로메테우스 익스포터)에서 수집할 수 있는 Data를 Prometheus 가 아닌 DB에 저장할 방법을 찾아본다.
0. Telegraf 소개
Telegraf는 데이터베이스, 시스템 및 IoT 센서에서 메트릭 및 이벤트를 수집하고 전송하기 위한 플러그인 중심 서버 에이전트입니다. Telegraf는 Go로 작성되었으며 외부 종속성 없이 단일 바이너리로 컴파일되며 최소한의 메모리 공간만 필요합니다. -와탭 telegraf 소개(https://docs.whatap.io/telegraf/introduction)
- 아키텍쳐
- Input(Source): 정보를 가져올 곳
- 주요 input plugin: http, webhook, CPU, Memory, MySQL, CloudWatch, InfluxDB, prometheus format....
- Output(Destination): 정보를 보낼 곳
- 주요 output plugin - http, exec, prometheus, postgreSQL, open TSDB, loki, remoteFile ....
** built-in plugin이 지원하지 않는 format의 경우, exec
나 execd
활용해 3rd party plugin 을 사용할 수 있음.
https://docs.influxdata.com/telegraf/v1/get-started/#configure-telegraf
1. 결론
1) Prometheus exporter metric 수집 - 가능
- 방법: 설정 파일 작성
# ex)
[[inputs.prometheus]]
urls = ["http://1.2.3.4:9100/metrics"]
2) DB 적재 - 가능
(1) RDB(PostgreSql) - 가능
- 방법: 설정 파일에 DB 접속 정보 작성
# ex)
[[outputs.postgresql]]
connection = "host=1.2.3.4 port=1111 user={username} password={password} dbname={dbname} sslmode=disable"
schema = "{schemaName}"
- 수집된 데이터의 형태
- 지정한
schema
내에metric name
별로table
이 생성됨. - column ->
time
,host
,url
,guage
- 데이터 형태 변경 가능 방법 확인 필요 ->
aggregator
등에서 수정 가능할 것으로 생각됨.
- 지정한
2) Arrow-flight 호환 자체 DB 적재 - 불가능
- 사유: telegraf가 arrow-flight plugin을 native하게 지원하지 않음.
(1) 시도해본 방법: pyton 혹은 Node 활용한 컨버터 서버 생성(http)
- 아키텍쳐
- exporters > Telegraf > Flask server(python) > 자체 DB
- 장점:
- 가장 단순하고 확실해보임.
- 적재 데이터 가공 수요가 있을 시 대체하기 편리하다 판단됨.
- 단점: 관리 포인트 추가됨.
(2) 대안들
a. Parquet
** 참고: parquet 란? https://butter-shower.tistory.com/245
b. remoteFile
- SFTP/Local 통해 file upload
https://github.com/influxdata/telegraf/blob/release-1.33/plugins/outputs/remotefile/README.md
3. 레퍼런스 페이지
- Telegraf Docs
https://docs.influxdata.com/telegraf/v1/ - Input plugin 목록
https://docs.influxdata.com/telegraf/v1/plugins/#input-plugins - Telegraf - Output plugin 목록
https://docs.influxdata.com/telegraf/v1/plugins/ - Telegraf - Output - 지원 dataformat 목록
https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md - Telegraf - 외부 plugin 사용법
https://github.com/influxdata/telegraf/blob/master/docs/EXTERNAL_PLUGINS.md
https://docs.influxdata.com/telegraf/v1/data_formats/output/
- 기타 (구버전용) - Arrow Flight 용 3rd party plugin
https://github.com/ModelarData/Telegraf-Output-Apache-Arrow-Flight
4. 테스트 내역
테스트 목차
- Docker 기반 telegraf 설치
- (input) 정상 작동 확인
- (input) node exporter input 정상 작동 확인
- (output) postgreSQL 정상 작동 확인
- (output )FlightSQL > XDB 연결
- (기타) process 실행 시 작동 여부 확인
1) Docker 기반 telegraf 설치
- 참고 문서: telegraf quickstart
https://github.com/influxdata/telegraf/blob/master/docs/QUICK_START.md
telegraf 이미지 pull
docker pull telegraf
config 파일 생성
$ cat config.toml
[[inputs.cpu]]
[[inputs.mem]]
[[outputs.file]]
3. docker container 구동
docker run --rm --volume $PWD/config.toml:/etc/telegraf/telegraf.conf telegraf
2~3) (input) node exporter input 작동 확인
- 정상 작동
# config.toml
[[inputs.prometheus]]
urls = ["http://10.253.10.152:9100/metrics"]
[[outputs.file]]
4) (output) postgreSQL 테스트
- 정상 작동
[[inputs.prometheus]]
urls = ["http://1.2.3.4:9100/metrics"]
[[outputs.postgresql]]
connection = "host=HOST port=PORT user=USER password=PASSWORD dbname=DBNAME sslmode=disable"
schema = "SCHEMA_NAME"
- 데이터 세부사항
- 지정한
schema
내에metric name
별로table
이 생성됨. - column ->
time
,host
,url
,guage
- 지정한
ex) test_node_exporter
schema 하위로 각 metric별 table 생성 및 적재
5) (output) ArrFlightSQL > XDB 연결
(1) (빅데이터팀 문의 필요) Telegraf(http) > Flask Server(gRPC) > XDB
- 요청 자체는 정상 발송되는 것으로 보이나, 에러 발생.
- XDB 팀 문의 필요
2025-02-21T07:25:31Z E! [agent] Error writing to outputs.http: when writing to [http://1.2.3.4:5000/telegraf] received status code: 500. body: Flight returned unimplemented error, with message: DoPut for cmd type '' is not yet implemented. gRPC client debug context: UNKNOWN:Error received from peer ipv4:2.3.4.5:50055 {grpc_message:"DoPut for cmd type \'\' is not yet implemented", grpc_status:12, created_time:"2025-02-21T07:25:18.3385313+00:00"}. Client context: OK
- config.toml
[[inputs.prometheus]]
urls = ["http://1.2.3.4:9100/metrics"]
[[outputs.http]]
url = "http://2.3.4.5:5000/telegraf"
data_format = "json"
timeout = "5s"
method = "POST"
[outputs.http.headers]
Content-Type = "application/json"
- Flask Server 코드
from flask import Flask, request
import pyarrow as pa
import pyarrow.flight as flight
import json
import logging
import re
# Initialize FlightSQLClient
JDBC_URL = "jdbc:arrow-flight-sql://1.2.3.4:5555/?useEncryption=0"
USERNAME = "USERNAME"
PASSWORD = "PASSWORD"
# Set up logging
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
app = Flask(__name__)
class FlightSQLClient:
def __init__(self, jdbc_url: str, username: str, password: str):
try:
# Parse JDBC URL to get host and port
match = re.match(r"jdbc:arrow-flight-sql://([^:]+):(\d+)/", jdbc_url)
if not match:
raise ValueError("Invalid JDBC URL format")
host, port = match.groups()
location = f"grpc://{host}:{port}"
logger.info(f"Connecting to Flight SQL server at {location}")
# Create client
self.client = flight.FlightClient(location)
# Authenticate
self.authenticate(username, password)
logger.info("Successfully connected to Flight SQL server")
except Exception as e:
logger.error(f"Failed to connect to Flight SQL server: {e}")
raise
def authenticate(self, username: str, password: str):
try:
# Create basic auth token
auth = flight.BasicAuth(username, password)
# Authenticate and store bearer token
self.bearer_token = self.client.authenticate_basic_token(username, password)
logger.info("Authentication successful")
except Exception as e:
logger.error(f"Authentication failed: {e}")
raise
def write_batch(self, metrics):
try:
logger.info(type(metrics))
names = []
timestamps = []
values = []
tags = []
flattened_data = []
for metric_entry in metrics:
for actual_metric in metric_entry["metrics"]:
metric_name = actual_metric["name"]
timestamp = actual_metric["timestamp"]
tags = actual_metric["tags"]
for field_name, value in actual_metric["fields"].items():
flattened_data.append({
"name": metric_name,
"timestamp": timestamp,
"field": field_name,
"value": float(value), # Ensure value is float
"tags": json.dumps(tags)
})
# Convert to PyArrow Table
# Debug: Check data types
for row in flattened_data:
print(f"Row: {row}, Types: {[type(x) for x in row]}")
# pa.
schema = pa.schema([
("name", pa.string()),
("timestamp", pa.int64()),
("field", pa.string()),
("value", pa.float64()),
("tags", pa.string())
])
logger.info(flattened_data)
table = pa.Table.from_pylist(flattened_data, schema=schema)
logger.info(f"Generated Arrow Table Schema:\n{table.schema}")
logger.info(f"Arrow Table First 5 Rows:\n{table.slice(0, 5)}") # Print a small preview of the table
descriptor = flight.FlightDescriptor.for_path("metrics")
# Ensure bearer_token is correctly formatted
if isinstance(self.bearer_token, tuple):
print(f"Unexpected tuple found in self.bearer_token: {self.bearer_token}")
self.bearer_token = self.bearer_token[1] # Extract the actual token value
options = flight.FlightCallOptions(headers=[(b"authorization",
self.bearer_token.encode() if isinstance(self.bearer_token,
str) else self.bearer_token)])
# Write to Flight SQL server
writer, _ = self.client.do_put(descriptor, table.schema, options)
writer.write_table(table)
writer.close()
logger.info(f"Successfully wrote {len(names)} metrics to Flight SQL server")
except Exception as e:
logger.exception("Error in write_batch")
raise
try:
flight_client = FlightSQLClient(JDBC_URL, USERNAME, PASSWORD)
except Exception as e:
logger.error(f"Failed to initialize Flight SQL client: {e}")
flight_client = None
@app.route("/telegraf", methods=["POST"])
def receive_telegraf():
try:
logger.info(f"Received request with Content-Type: {request.content_type}")
if not request.is_json:
return "Content-Type must be application/json", 415
metrics = request.get_json()
logger.info(f"Received JSON metrics: {json.dumps(metrics, indent=2)}") # Pretty-print JSON
if not isinstance(metrics, list):
metrics = [metrics]
if not metrics:
return "No metrics received", 400
if flight_client is None:
return "Flight SQL server connection not available", 503
# Write to Flight SQL server
flight_client.write_batch(metrics)
# flight_client.
return "Success", 200
except json.JSONDecodeError as e:
logger.exception("JSON decode error")
return f"Invalid JSON format: {str(e)}", 400
except Exception as e:
logger.exception("Error processing request")
return str(e), 500
if __name__ == "__main__":
logger.info("Starting Flask server...")
app.run(host="0.0.0.0", port=5000, debug=True)
(2) 실패한 방법
a. 다른 DB 플러그인을 활용해 호환 직접 연결
- mysql, postgresql 등.
b. 3rd Party Arrow flight SQL Pluigin
- 버전 호환성 이슈로 실패.
- 더 좋은 방법 없을 경우,
telegraf
버전 낮춘 후 재 시도.
- 더 좋은 방법 없을 경우,
- 레퍼런스: https://github.com/ModelarData/Telegraf-Output-Apache-Arrow-Flight
'TIL > Monitoring(k8s, grafana)' 카테고리의 다른 글
[RKE2] Grafana 의 ingress (0) | 2025.02.28 |
---|---|
Prometheus - exporter들의 정확성 테스트 (0) | 2025.02.21 |
[모니터링, k8s]RKE2 기반 k8s 모니터링 (0) | 2025.02.21 |
[Grafana]pod 생성 시 user provisioning (0) | 2025.02.06 |
[AI, Kubeflow] Kubeflow bootstrap 소개 (0) | 2025.01.24 |