TIL/Monitoring(k8s, grafana)

Telegraf 를 활용한 Prometheus exporter metric 수집 및 DB 적재

쓱쓱565 2025. 3. 4. 22:49

Telegraf를 활용한 Prometheus exporter metric 수집 및 DB 적재

filestats exporter(프로메테우스 익스포터)에서 수집할 수 있는 Data를 Prometheus 가 아닌 DB에 저장할 방법을 찾아본다.

0. Telegraf 소개

Telegraf는 데이터베이스, 시스템 및 IoT 센서에서 메트릭 및 이벤트를 수집하고 전송하기 위한 플러그인 중심 서버 에이전트입니다. Telegraf는 Go로 작성되었으며 외부 종속성 없이 단일 바이너리로 컴파일되며 최소한의 메모리 공간만 필요합니다. -와탭 telegraf 소개(https://docs.whatap.io/telegraf/introduction)

  • 아키텍쳐

  • Input(Source): 정보를 가져올 곳
    • 주요 input plugin: http, webhook, CPU, Memory, MySQL, CloudWatch, InfluxDB, prometheus format....
  • Output(Destination): 정보를 보낼 곳
    • 주요 output plugin - http, exec, prometheus, postgreSQL, open TSDB, loki, remoteFile ....

** built-in plugin이 지원하지 않는 format의 경우, execexecd 활용해 3rd party plugin 을 사용할 수 있음.
https://docs.influxdata.com/telegraf/v1/get-started/#configure-telegraf

1. 결론

1) Prometheus exporter metric 수집 - 가능

  • 방법: 설정 파일 작성
# ex)
[[inputs.prometheus]]
  urls = ["http://1.2.3.4:9100/metrics"]

2) DB 적재 - 가능

(1) RDB(PostgreSql) - 가능

  • 방법: 설정 파일에 DB 접속 정보 작성
# ex)
[[outputs.postgresql]]
  connection = "host=1.2.3.4 port=1111 user={username} password={password} dbname={dbname} sslmode=disable"
  schema = "{schemaName}"
  • 수집된 데이터의 형태
    • 지정한 schema 내에 metric name 별로 table 이 생성됨.
    • column -> time, host, url, guage
    • 데이터 형태 변경 가능 방법 확인 필요 -> aggregator 등에서 수정 가능할 것으로 생각됨.

2) Arrow-flight 호환 자체 DB 적재 - 불가능

  • 사유: telegraf가 arrow-flight plugin을 native하게 지원하지 않음.

(1) 시도해본 방법: pyton 혹은 Node 활용한 컨버터 서버 생성(http)

  • 아키텍쳐
    • exporters > Telegraf > Flask server(python) > 자체 DB
  • 장점:
    • 가장 단순하고 확실해보임.
    • 적재 데이터 가공 수요가 있을 시 대체하기 편리하다 판단됨.
  • 단점: 관리 포인트 추가됨.

(2) 대안들

a. Parquet

** 참고: parquet 란? https://butter-shower.tistory.com/245

b. remoteFile

3. 레퍼런스 페이지

https://docs.influxdata.com/telegraf/v1/data_formats/output/

4. 테스트 내역

테스트 목차

  1. Docker 기반 telegraf 설치
  2. (input) 정상 작동 확인
  3. (input) node exporter input 정상 작동 확인
  4. (output) postgreSQL 정상 작동 확인
  5. (output )FlightSQL > XDB 연결
  6. (기타) process 실행 시 작동 여부 확인

1) Docker 기반 telegraf 설치

  1. telegraf 이미지 pull
    docker pull telegraf

  2. config 파일 생성

$ cat config.toml
[[inputs.cpu]]
[[inputs.mem]]
[[outputs.file]]

3. docker container 구동

docker run --rm --volume $PWD/config.toml:/etc/telegraf/telegraf.conf telegraf

2~3) (input) node exporter input 작동 확인

  • 정상 작동
# config.toml
[[inputs.prometheus]]
  urls = ["http://10.253.10.152:9100/metrics"]
[[outputs.file]]

4) (output) postgreSQL 테스트

  • 정상 작동
[[inputs.prometheus]]
  urls = ["http://1.2.3.4:9100/metrics"]
[[outputs.postgresql]]
  connection = "host=HOST port=PORT user=USER password=PASSWORD dbname=DBNAME sslmode=disable"
  schema = "SCHEMA_NAME"
  • 데이터 세부사항
    • 지정한 schema 내에 metric name 별로 table 이 생성됨.
    • column -> time, host, url, guage

ex) test_node_exporter schema 하위로 각 metric별 table 생성 및 적재

5) (output) ArrFlightSQL > XDB 연결

(1) (빅데이터팀 문의 필요) Telegraf(http) > Flask Server(gRPC) > XDB

  • 요청 자체는 정상 발송되는 것으로 보이나, 에러 발생.
    • XDB 팀 문의 필요
2025-02-21T07:25:31Z E! [agent] Error writing to outputs.http: when writing to [http://1.2.3.4:5000/telegraf] received status code: 500. body: Flight returned unimplemented error, with message: DoPut for cmd type '' is not yet implemented. gRPC client debug context: UNKNOWN:Error received from peer ipv4:2.3.4.5:50055 {grpc_message:"DoPut for cmd type \'\' is not yet implemented", grpc_status:12, created_time:"2025-02-21T07:25:18.3385313+00:00"}. Client context: OK
  • config.toml
[[inputs.prometheus]]
  urls = ["http://1.2.3.4:9100/metrics"]

[[outputs.http]]
  url = "http://2.3.4.5:5000/telegraf"
  data_format = "json"
  timeout = "5s"
  method = "POST"
  [outputs.http.headers]
    Content-Type = "application/json"
  • Flask Server 코드
from flask import Flask, request
import pyarrow as pa
import pyarrow.flight as flight
import json
import logging
import re

# Initialize FlightSQLClient
JDBC_URL = "jdbc:arrow-flight-sql://1.2.3.4:5555/?useEncryption=0"
USERNAME = "USERNAME"
PASSWORD = "PASSWORD"

# Set up logging
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

app = Flask(__name__)


class FlightSQLClient:
    def __init__(self, jdbc_url: str, username: str, password: str):
        try:
            # Parse JDBC URL to get host and port
            match = re.match(r"jdbc:arrow-flight-sql://([^:]+):(\d+)/", jdbc_url)
            if not match:
                raise ValueError("Invalid JDBC URL format")

            host, port = match.groups()
            location = f"grpc://{host}:{port}"

            logger.info(f"Connecting to Flight SQL server at {location}")

            # Create client
            self.client = flight.FlightClient(location)

            # Authenticate
            self.authenticate(username, password)

            logger.info("Successfully connected to Flight SQL server")

        except Exception as e:
            logger.error(f"Failed to connect to Flight SQL server: {e}")
            raise

    def authenticate(self, username: str, password: str):
        try:
            # Create basic auth token
            auth = flight.BasicAuth(username, password)

            # Authenticate and store bearer token
            self.bearer_token = self.client.authenticate_basic_token(username, password)
            logger.info("Authentication successful")

        except Exception as e:
            logger.error(f"Authentication failed: {e}")
            raise

    def write_batch(self, metrics):
        try:
            logger.info(type(metrics))
            names = []
            timestamps = []
            values = []
            tags = []

            flattened_data = []
            for metric_entry in metrics:
                for actual_metric in metric_entry["metrics"]:
                    metric_name = actual_metric["name"]
                    timestamp = actual_metric["timestamp"]
                    tags = actual_metric["tags"]

                    for field_name, value in actual_metric["fields"].items():
                        flattened_data.append({
                            "name": metric_name,
                            "timestamp": timestamp,
                            "field": field_name,
                            "value": float(value),  # Ensure value is float
                            "tags": json.dumps(tags)
                        })

            # Convert to PyArrow Table

            # Debug: Check data types
            for row in flattened_data:
                print(f"Row: {row}, Types: {[type(x) for x in row]}")
            # pa.
            schema = pa.schema([
                ("name", pa.string()),
                ("timestamp", pa.int64()),
                ("field", pa.string()),
                ("value", pa.float64()),
                ("tags", pa.string())
            ])
            logger.info(flattened_data)
            table = pa.Table.from_pylist(flattened_data, schema=schema)

            logger.info(f"Generated Arrow Table Schema:\n{table.schema}")
            logger.info(f"Arrow Table First 5 Rows:\n{table.slice(0, 5)}")  # Print a small preview of the table
            descriptor = flight.FlightDescriptor.for_path("metrics")

            # Ensure bearer_token is correctly formatted
            if isinstance(self.bearer_token, tuple):
                print(f"Unexpected tuple found in self.bearer_token: {self.bearer_token}")
                self.bearer_token = self.bearer_token[1]  # Extract the actual token value

            options = flight.FlightCallOptions(headers=[(b"authorization",
                                                         self.bearer_token.encode() if isinstance(self.bearer_token,
                                                                                                  str) else self.bearer_token)])

            # Write to Flight SQL server
            writer, _ = self.client.do_put(descriptor, table.schema, options)
            writer.write_table(table)
            writer.close()

            logger.info(f"Successfully wrote {len(names)} metrics to Flight SQL server")

        except Exception as e:
            logger.exception("Error in write_batch")
            raise

try:
    flight_client = FlightSQLClient(JDBC_URL, USERNAME, PASSWORD)
except Exception as e:
    logger.error(f"Failed to initialize Flight SQL client: {e}")
    flight_client = None


@app.route("/telegraf", methods=["POST"])
def receive_telegraf():
    try:
        logger.info(f"Received request with Content-Type: {request.content_type}")

        if not request.is_json:
            return "Content-Type must be application/json", 415

        metrics = request.get_json()
        logger.info(f"Received JSON metrics: {json.dumps(metrics, indent=2)}")  # Pretty-print JSON

        if not isinstance(metrics, list):
            metrics = [metrics]

        if not metrics:
            return "No metrics received", 400

        if flight_client is None:
            return "Flight SQL server connection not available", 503

        # Write to Flight SQL server
        flight_client.write_batch(metrics)

        # flight_client.
        return "Success", 200

    except json.JSONDecodeError as e:
        logger.exception("JSON decode error")
        return f"Invalid JSON format: {str(e)}", 400
    except Exception as e:
        logger.exception("Error processing request")
        return str(e), 500


if __name__ == "__main__":
    logger.info("Starting Flask server...")
    app.run(host="0.0.0.0", port=5000, debug=True)

(2) 실패한 방법

a. 다른 DB 플러그인을 활용해 호환 직접 연결
  • mysql, postgresql 등.
b. 3rd Party Arrow flight SQL Pluigin