Skip to content

NimTechnology

Trình bày các công nghệ CLOUD một cách dễ hiểu.

  • Kubernetes & Container
    • Docker
    • Kubernetes
      • Gateway API
      • Ingress
      • Pod
    • Helm Chart
    • OAuth2 Proxy
    • Isito-EnvoyFilter
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Vault
    • Longhorn – Storage
    • VictoriaMetrics
    • MetalLB
    • Kong Gateway
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Argo Events
    • Spinnaker
    • Jenkins
    • Harbor
    • TeamCity
    • Git
      • Bitbucket
  • Coding
    • DevSecOps
    • Terraform
      • GCP – Google Cloud
      • AWS – Amazon Web Service
      • Azure Cloud
    • Golang
    • Laravel
    • Python
    • Jquery & JavaScript
    • Selenium
  • Log, Monitor & Tracing
    • DataDog
    • Prometheus
    • Grafana
    • ELK
      • Kibana
      • Logstash
  • BareMetal
    • NextCloud
  • Toggle search form

[Vector by DataDog] Use Vector to parse and convert logs to anything.

Posted on December 14, 2023September 3, 2025 By nim No Comments on [Vector by DataDog] Use Vector to parse and convert logs to anything.

Contents

Toggle
  • 1) Overview of the situation.
  • 1) Opentelemetry sends the logs to Vector.
  • 2) How to configure Vector
  • 2.1) Install vector on Kubernetes
  • 2.2) Vector receives, process logs from the otel-collector
  • 2.1.1) Source:
  • 2) Transforms:
    • 2.1) parse_key_value funtion in vector4
    • 2.2) change key
    • 2.3) Convert log to metrics by Vector
    • 2.4) (Optional)parse syslog by Vector
    • 2.5) (Optional)parse grok by Vector (same as logstash)
    • 2.6) Parse JSON from log
  • 2.7) Convert value trong message log.
  • 2.8) Create an array of metrics from the log
  • 3) Sinks
    • 3.1) output result to console.
    • 3.2) Publish metrics via prometheus client
  • Real Example value for vector

1) Overview of the situation.

Tình huống là như sau:
Bạn có 1 đoạn log trên hệ thông như sau:

time="2023-12-12T17:32:44Z" level=info msg="getRepoObjs stats" application=argocd/longhorn build_options_ms=0 helm_ms=14 plugins_ms=0 repo_ms=13 time_ms=126 unmarshal_ms=97 version_ms=0

Manager talk that:

Em dùng kĩ năng của một devops vẽ cho anh 1 chart time_ms của 1 application và em có thể lấy thông tin log trên.

Sau khi dùng hết kĩ năng để search thì tôi tìm ra 2 ứng cứ viên sáng giá là Logstash và Vector

Logstash thì thuộc họ nhà ELK một tool để control log rất nổi tiếng.
thanh niên này khá là nhanh, tuổi đời lâu, nên document cũng đầy đủ.
Có 1 điều mình không thích ở thanh niên này là nó ăn nhiều RAM.

Quay qua Vector:

Một ứng cứ viên tiềm năng, được việt bằng ngôn ngữ rust
Maybe It will be light and fast.

Ok giờ thì architecture như sau:

1) Opentelemetry sends the logs to Vector.

Để send logs từ otel-collector sang Vector thì mình có tham khảo 2 tài liệu.
https://www.techetio.com/2023/04/29/sending-opentelemetry-logs-to-vector-using-python/
https://intelops.ai/blog/connecting-the-opentelemetry-collector-to-vecor/

mình sẽ export logs sang cho vector thông qua otlphttp

The OTLP/HTTP exporter in the OpenTelemetry Collector is designed to send metrics, traces, and logs through HTTP using the OTLP format. This exporter supports traces, metrics, and logs pipeline types. To include it in your configuration, you need to specify certain settings:
https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlphttpexporter

....
    exporters:
      otlphttp: 
        endpoint: "http://vector-headless.default:4318"
....
    service:
      pipelines:
        logs:
          exporters:
          - coralogix
          - otlphttp #look at
          processors:
          - k8sattributes
          - attributes/insert
          - filter/regexp_resource
          - batch
          receivers:
          - otlp
          - filelog

2) How to configure Vector

2.1) Install vector on Kubernetes

REPO URL: https://helm.vector.dev
CHART: vector:0.29.0

Bạn có thể tùy chọn thay đôi version nếu muốn.

2.2) Vector receives, process logs from the otel-collector

2.1.1) Source:

Chỗ này chúng ta sẽ nhận log từ opentelemetry:
https://vector.dev/docs/reference/configuration/sources/opentelemetry/

customConfig:
  api:
    enabled: true
    address: 127.0.0.1:8686
    playground: true
  sources:
    otel_collector:
      type: opentelemetry
      grpc:
        address: '0.0.0.0:4317'
      http:
        address: '0.0.0.0:4318'
    # Add your log source here if needed

  sinks:
    console_logs:
      type: console
      encoding:
        codec: text
      inputs:
        - "otel_collector.logs"

service:
  ports:
    - name: otel-collector-http
      port: 4318
      protocol: TCP
    - name: metrics
      port: 9598
      protocol: TCP

Có 2 điểm chú ý: Dec 13th, 2023
The opentelemetry source only supports log events at this time.
Received log events will go to this output stream. Use <component_id>.logs as an input to downstream transforms and sinks.

Nghĩa là khi bạn khai báo input của trên another component such as sink or transform thì nó sẽ là <component_id>.logs

Khi này nếu config ok thì bạn show log của vector sẽ thấy nó stdout khá nhiều log.

2) Transforms:

2.1) parse_key_value funtion in vector4

Khi transform chúng ta sẽ dùng type là remap

The “remap” transform type in Vector is primarily used for parsing, shaping, and transforming observability data, such as logs and metrics, within your data processing topology. This transform utilizes the Vector Remap Language (VRL), a language designed for the safe and efficient processing of observability data. VRL is expression-oriented and aligns closely with the data models used in Vector.

Bạn sẽ thấy nội dung của message có dạng key và value.

Trong remap chúng ta sẽ sử dụng function parse_key_value

Parses the value in key-value format. Also known as logfmt.

  • Keys and values can be wrapped with ".
  • " characters can be escaped using \.
customConfig:
  api:
    enabled: true
    address: 127.0.0.1:8686
    playground: true
  sources:
    otel_collector:
      type: opentelemetry
      grpc:
        address: '0.0.0.0:4317'
      http:
        address: '0.0.0.0:4318'
    # Add your log source here if needed

  transforms:
    parse_logs:
      type: remap
      inputs:
        - "otel_collector.logs"
      source: |
          . = parse_key_value!(string!(.message))

2.2) change key

Phần này mình có thể giải thích thêm
không phải key nào cũng có định dang dễ chịu như time-ms hoặc time_ms
mà nó sẽ là như thế này: grpc.time_ms

Nếu bạn cần phải convert từ grpc.time_ms sang grpc_time_ms thì chúng ta sẽ dùng: to_float
to_float: Coerces the value into a float.

  transforms:
    parse_logs:
      type: remap
      inputs:
        - "otel_collector.logs"
      source: |
          . = parse_key_value!(string!(.message))
    extract_metrics:
      type: remap
      inputs:
        - parse_logs
      source: |
          grpc_time_ms, err_convert = to_float(.grpc.time_ms)
          if err_convert != null {
            log("Unable to convert To Float: " + err_convert, level: "error")
          } else {
            .grpc_time_ms = grpc_time_ms
            log(".grpc.time_ms value: " + to_string(grpc_time_ms), level: "info")
          }

2.3) Convert log to metrics by Vector

Đây là phần mà mình muốn nhất:

  transforms:
    parse_logs:
      type: remap
      inputs:
        - "otel_collector.logs"
      source: |
          . = parse_key_value!(string!(.message))
    extract_metrics:
      type: remap
      inputs:
        - parse_logs
      source: |
          grpc_time_ms, err_convert = to_float(.grpc.time_ms)
          if err_convert != null {
            log("Unable to convert To Float: " + err_convert, level: "error")
          } else {
            .grpc_time_ms = grpc_time_ms
            log(".grpc.time_ms value: " + to_string(grpc_time_ms), level: "info")
          }

    convert_to_metrics:
      type: log_to_metric
      inputs:
        - extract_metrics
      metrics:
        - type: gauge
          field: time_ms
          name: response_time_ms
          namespace: argocd
          tags:
            application: '{{ printf "{{ application }}" }}'

log_to_metric: Convert log events to metric events

trong phần example của vector thì bạn sẽ thấy

Nhưng vì dùng trong helm value nên là:

tags:
  application: '{{ printf "{{ application }}" }}'

Dưới đây là các code sưu tầm.

2.4) (Optional)parse syslog by Vector

the configuration using the remap transform:

  1. Parse the JSON Log: Use the remap transform to parse the JSON log.
  2. Extract the time_ms Value: Extract the time_ms value from the nested message field.
  3. Transform to Metric: Convert the extracted time_ms value into a metric.

Here is the revised YAML configuration:

customConfig:
  api:
    enabled: true
    address: 127.0.0.1:8686
    playground: true
  sources:
    otel_collector:
      type: opentelemetry
      grpc:
        address: '0.0.0.0:4317'
      http:
        address: '0.0.0.0:4318'
    # Add your log source here if needed

  transforms:
    remap_argocd:
      type: remap
      inputs:
        - "otel_collector.logs"
      source: |
        parsed, err = parse_syslog(.message)
        if err != null {
          log_msg, err = if .message != null && is_string(.message) {
            "Unable to parse SysLog: " + .message
          } else {
            "Unable to parse SysLog: message field is missing or not a string"
          }
          if err != null {
            log("Error constructing log message: " + err, level: "error")
          } else {
            log(log_msg, level: "error")
          }
        } else {
          . = parsed
        }

  sinks:
    console_sink:
      type: console
      encoding:
        codec: text
      inputs:
        - remap_argocd

service:
  ports:
    - name: otel-collector-http
      port: 4318
      protocol: TCP

The parse_and_extract_time_ms transform in the provided YAML configuration is a Vector transform that uses the remap language. This transform is designed to parse a JSON log message and then extract a specific value (time_ms) from it. Here’s a breakdown of each part of the transform:

# Parsing JSON and extracting time_ms
parse_and_extract_time_ms:
  type: remap
  inputs: ["my_source_id"]
  source: |
    . = parse_json!(string!(.message))
    .time_ms = to_float!(.log_message.time_ms)

  • Type:
    • type: remap chỉ định rằng transform náy sử dụng Vector’s remap language, mà nó là 1 powerful tool for việc sử lý log và metric data.
  • Inputs:
    • inputs: ["my_source_id"] specifies the input to this transform, which in this case is my_source_id. This should be the ID of a source or another transform that precedes this one in your Vector configuration.
  • Source:
    • The source field contains the actual remap script:
      • . = parse_json!(string!(.message)):
        • string!(.message): phần này converts the message field of the log trong một string. cái dấu "!" chỉ định rằng đây là 1 quả quyết – if the conversion fails, an error will be logged, and the event will be dropped.
        • parse_json!(...): This part attempts(nỗ lực) to parse the stringified message as JSON. Again, the ! asserts(khẳng định) that this must succeed, or an error will occur, and the event will be dropped.
        • . = ...: This sets the root object (.) in the remap context to the result of the parse_json! function. This means the entire log event is now replaced with the parsed JSON object.
      • .time_ms = to_float!(.log_message.time_ms):
        • This line extracts the time_ms value from the parsed JSON object. It assumes that after parsing the JSON, there is a field log_message which contains time_ms.
        • to_float!(...): This converts the time_ms value to a floating-point number. The ! asserts that this conversion must succeed.
        • .time_ms = ...: This sets a new field time_ms at the root of the event with the converted floating-point number.

2.5) (Optional)parse grok by Vector (same as logstash)

customConfig:
  api:
    enabled: true
    address: 127.0.0.1:8686
    playground: true
  sources:
    otel_collector:
      type: opentelemetry
      grpc:
        address: '0.0.0.0:4317'
      http:
        address: '0.0.0.0:4318'
    # Add your log source here if needed

  transforms:
    remap_argocd:
      type: remap
      inputs:
        - "otel_collector.logs"
      source: |
        parsed, err_parsed = parse_grok(
          .message, 
          "time=\"%{TIMESTAMP_ISO8601:timestamp}\" level=%{LOGLEVEL:level} msg=\"%{GREEDYDATA:msg}\" application=%{GREEDYDATA:application} build_options_ms=%{NUMBER:build_options_ms:int} helm_ms=%{NUMBER:helm_ms:int} plugins_ms=%{NUMBER:plugins_ms:int} repo_ms=%{NUMBER:repo_ms:int} time_ms=%{NUMBER:time_ms:int} unmarshal_ms=%{NUMBER:unmarshal_ms:int} version_ms=%{NUMBER:version_ms:int}"
        )
        if err_parsed != null {
          log_msg, err_log_msg = "Unable to parse Grok: " + .message
          if err_log_msg != null {
            log(err_log_msg, level: "error")
          }
          log(log_msg, level: "error")
        } else {
          . = parsed
        }
    log2metric_argocd:
      type: log_to_metric
      inputs:
        - remap_argocd
      metrics:
        - type: gauge
          field: time_ms
          name: response_time_ms
          namespace: service
          tags:
            application: '{{ printf "{{ application }}" }}'

  sinks:
    console_sink:
      type: console
      encoding:
        codec: text
      inputs:
        - log_to_metric
    prometheus_exporter:
      type: prometheus_exporter
      inputs:
        - log2metric_argocd
      address: "0.0.0.0:1994"

service:
  ports:
    - name: otel-collector-http
      port: 4318
      protocol: TCP
    - name: metrics
      port: 1994
      protocol: TCP

2.6) Parse JSON from log

2025-07-29 04:17:47 [INFO   ] { "scanResults": { "scan_details": { "rocketcyber": { "eng_id": "rocketcyber", "scan_result_i": 23, "scan_time": 2, "threat_found": "", "db_package_version": "1753747200", "def_time": "2025-07-29T03:45:35.631Z", "eng_package_version": "09_07_2024-363", "inqueue_time": 26 }, "avira": { "eng_id": "avira", "scan_result_i": 0, "scan_time": 10, "threat_found": "", "db_package_version": "1753746925", "def_time": "2025-07-29T03:55:37.097Z", "eng_package_version": "4.15.26-2281", "inqueue_time": 20 }, }, }, "stat": { "request_inqueue_time": 87796, "result_inqueue_time": 1371, "total_inqueue_time": 89167, "total_engine_scan_time": 87731, "aggregated_time": 1632, "total_scan_time": 1724 }, "priority": "ARCHIVE", "current_level": 15, "current_number": 9269, "esIndexName": "results" }
transforms:
  parse_json_from_log:
    type: remap
    inputs:
    - otel_collector.logs
    source: |
      # Find the start of the JSON object in the log message
      json_start_index = find!(.message, "{")

      # Proceed only if a '{' character is found
      if json_start_index != null {
        # Slice the message from the start of the JSON to the end
        json_string = slice!(.message, json_start_index)
        # Parse the extracted JSON string and replace the event with it.
        # The '!' drops events that fail to parse.
        . = parse_json!(json_string)
      } else {
        # If no JSON is found, drop the event to prevent errors downstream
        abort
      }

Đầu tiên nó tìm ký tự bắt đầu của chuỗi Json:

# Find the start of the JSON object in the log message
json_start_index = find!(.message, "{")

Nếu trong message nó không tìm thấy “{“ nó sẽ trả về null
và nếu nó tìm thấy “{“ thì nó sẽ trả về 1 number giống như là 1, 5, 7

Và code If tiếp theo chỉ Run khi json_start_index khác null

# Slice the message from the start of the JSON to the end
json_string = slice!(.message, json_start_index)

Điều này cắt bỏ original log message, bắt đầu từ vị trí của ký tự { cho đến hết chuỗi.

# Parse the extracted JSON string and replace the event with it.
# The '!' drops events that fail to parse.
. = parse_json!(json_string)

Nó sẽ lấy json_string và parse thành một structured object.

Sau bước này thì log sẽ như sau:

{
  "scanResults": {
    "scan_details": {
      "rocketcyber": {
        "eng_id": "rocketcyber",
        "scan_result_i": 23,
        "scan_time": 2,
        "threat_found": "",
        "db_package_version": "1753747200",
        "def_time": "2025-07-29T03:45:35.631Z",
        "eng_package_version": "09_07_2024-363",
        "inqueue_time": 26
      },
      "avira": {
        "eng_id": "avira",
        "scan_result_i": 0,
        "scan_time": 10,
        "threat_found": "",
        "db_package_version": "1753746925",
        "def_time": "2025-07-29T03:55:37.097Z",
        "eng_package_version": "4.15.26-2281",
        "inqueue_time": 20
      },
    },
  },
  "stat": {
    "request_inqueue_time": 87796,
    "result_inqueue_time": 1371,
    "total_inqueue_time": 89167,
    "total_engine_scan_time": 87731,
    "aggregated_time": 1632,
    "total_scan_time": 1724
  },
  "priority": "ARCHIVE",
  "current_level": 15,
  "current_number": 9269,
  "esIndexName": "results"
}

2.7) Convert value trong message log.

{
  "scanResults": {
    "scan_details": {
      "rocketcyber": {
        "eng_id": "rocketcyber",
        "scan_result_i": 23,
        "scan_time": 2,
        "threat_found": "",
        "db_package_version": "1753747200",
        "def_time": "2025-07-29T03:45:35.631Z",
        "eng_package_version": "09_07_2024-363",
        "inqueue_time": 26
      },
      "avira": {
        "eng_id": "avira",
        "scan_result_i": 0,
        "scan_time": 10,
        "threat_found": "",
        "db_package_version": "1753746925",
        "def_time": "2025-07-29T03:55:37.097Z",
        "eng_package_version": "4.15.26-2281",
        "inqueue_time": 20
      },
    },
  },
  "stat": {
    "request_inqueue_time": 87796,
    "result_inqueue_time": 1371,
    "total_inqueue_time": 89167,
    "total_engine_scan_time": 87731,
    "aggregated_time": 1632,
    "total_scan_time": 1724
  },
  "priority": "ARCHIVE",
  "current_level": 15,
  "current_number": 9269,
  "esIndexName": "results"
}

Ở trên là bạn thấy total_scan_time là milisecond.
Giờ mình cần chuyển nó sang second. và không ảnh hưởng object Json

convert_ms_to_s:
  type: remap
  inputs:
    - parse_json_from_log
  source: |
    # Create a new field in seconds
    .stat.total_scan_time_seconds = to_float!(.stat.total_scan_time) / 1000.0

tôi thực hiện chia lại total_scan_time chia cho 1000 để ra second.
Sau chúng ta tạo thêm object total_scan_time_seconds và gán vào stat thông qua dòng này .stat.total_scan_time_seconds = to_float!(.stat.total_scan_time) / 1000.0

2.8) Create an array of metrics from the log

{
  "scanResults": {
    "scan_details": {
      "rocketcyber": {
        "eng_id": "rocketcyber",
        "scan_result_i": 23,
        "scan_time": 2,
        "threat_found": "",
        "db_package_version": "1753747200",
        "def_time": "2025-07-29T03:45:35.631Z",
        "eng_package_version": "09_07_2024-363",
        "inqueue_time": 26
      },
      "avira": {
        "eng_id": "avira",
        "scan_result_i": 0,
        "scan_time": 10,
        "threat_found": "",
        "db_package_version": "1753746925",
        "def_time": "2025-07-29T03:55:37.097Z",
        "eng_package_version": "4.15.26-2281",
        "inqueue_time": 20
      },
    },
  },
  "stat": {
    "request_inqueue_time": 87796,
    "result_inqueue_time": 1371,
    "total_inqueue_time": 89167,
    "total_engine_scan_time": 87731,
    "aggregated_time": 1632,
    "total_scan_time": 1724
  },
  "priority": "ARCHIVE",
  "current_level": 15,
  "current_number": 9269,
  "esIndexName": "results"
}

Chúng ta có log: scanResults -> scan_details
Lấy giá trị của từng engine tạo ra metrics:

Mình muốn là nó sẽ đi vào từng engine và tạo ra các metrics như sau:

nim_engine_inqueue_time{engine_name="aegislab",pool="ARCHIVE"} 33 1753763291241
nim_engine_inqueue_time{engine_name="ahnlab",pool="ARCHIVE"} 20 1753763291241
nim_engine_inqueue_time{engine_name="avira",pool="ARCHIVE"} 53 1753763291241
# Creates an array of objects, with each object containing an engine's name and scan time.
create_engine_metrics_array:
  type: remap
  inputs:
    - parse_json_from_log
  source: |
    .engine_scans = []
    if exists(.scanResults.scan_details) {
      for_each(object!(.scanResults.scan_details)) -> |key, value| {
        .engine_scans = push(.engine_scans, {
          "engine_name": key,
          "scan_time": value.scan_time,
          "inqueue_time": value.inqueue_time
        })
      }
    }
    # If the array is empty, drop the event so `explode` doesn't generate empty logs.
    if length(.engine_scans) == 0 {
        del(.)
    }

Đầu tiên nó sẽ tạo event mới là .engine_scans = []

if exists(.scanResults.scan_details) { nếu trong .scanResults.scan_details có giá trị thì thực hiện đi vào if.

for_each(object!(.scanResults.scan_details)) -> |key, value| {

Đây là 1 vòng loop
object!: Hàm này đảm bảo rằng .scanResults.scan_details là một structured object.
for_each(…): Điều này lặp qua từng cặp key-value bên trong đối tượng đó.
-> |key, value|: Trong mỗi vòng lặp , key sẽ giữ engine’s name (e.g., “tachyon”, “k7”), và value sẽ chứa đối tượng tương ứng chứa thông tin chi tiết về công cụ đó.

.engine_scans = push(.engine_scans, {
  "engine_name": key,
  "scan_time": value.scan_time,
  "inqueue_time": value.inqueue_time
})
  • push(…): Hàm này thêm một phần tử vào một mảng.
  • The new object: Đó là một đối tượng có cấu trúc chứa ba trường:
  1. engine_name: The name of the engine (from the key).
  2. scan_time: The scan time for that engine (from value.scan_time).
  3. inqueue_time: The in-queue time for that engine (from value.inqueue_time).
# Takes the .engine_scans array and creates a separate log event for each element.
explode_engine_scans:
  type: lua
  inputs:
    - create_engine_metrics_array
  version: "2"
  hooks:
    process: |
      function (event, emit)
        local engine_scans = event.log.engine_scans
        if engine_scans and type(engine_scans) == "table" then
          for i, engine_data in ipairs(engine_scans) do
            local new_event = {
              log = {
                engine_name = engine_data.engine_name,
                scan_time = engine_data.scan_time,
                inqueue_time = engine_data.inqueue_time,
                priority = event.log.priority
              }
            }
            emit(new_event)
          end
        end
      end
type: lua
version: "2"
  • type: lua: Chỉ định rằng phép biến đổi này sẽ thực thi một Lua script.
  • version: “2”: Chọn phiên bản hiện đại, được khuyến nghị của API chuyển đổi Lua trong Vector.
hooks:
  process: |
  • hooks: Đây là nơi bạn xác định mã Lua sẽ được thực thi.
  • process: |: cái process hook chạy script của nó cho tất cả các single event mà đi qua cái transform này.
function (event, emit)
  • event: This is the incoming log event object from the previous transform (create_engine_metrics_array). It contains the engine_scans array we created. Đây là một incoming log event object từ transform trước đó (create_engine_metrics_array). Nó chứa mảng engine_scans mà chúng ta đã tạo.
  • emit: Đây là một hàm đặc biệt được cung cấp bởi Vector mà bạn gọi để gửi một sự kiện mới xuống hạ nguồn đến transform tiếp theo trong pipeline.
local engine_scans = event.log.engine_scans

Nó lấy mảng engine_scans từ event và lưu trữ nó vào một local Lua variable named engine_scans để dễ dàng truy cập.

if engine_scans and type(engine_scans) == "table" then

Đây là một kiểm tra an toàn.Nó đảm bảo rằng engine_scans thực sự tồn tại (engine_scans) và nó là một table (which is Lua’s data type for both arrays and objects).

for i, engine_data in ipairs(engine_scans) do
  • Đây là vòng lặp cốt lõi lặp qua mảng engine_scans.
  • ipairs: Đây là một hàm Lua tiêu chuẩn để lặp qua một mảng.
  • engine_data: Trong mỗi lần lặp, `engine_data` sẽ là một trong các đối tượng từ mảng, ví dụ: { “engine_name”: “tachyon”, “scan_time”: 13, “inqueue_time”: 1 }.
local new_event = {
  log = {
    engine_name = engine_data.engine_name,
    scan_time = engine_data.scan_time,
    inqueue_time = engine_data.inqueue_time,
    priority = event.log.priority
  }
}
  • Bên trong vòng lặp, điều này tạo ra một event hoàn toàn mới, sạch sẽ.
  • It constructs a log object containing only the specific data for the current engine in the loop, plus the priority from the original event.
  • Nó tạo một log object chỉ chứa dữ liệu cụ thể (data) cho current engine trong vòng lặp, cùng với độ ưu tiên từ sự kiện gốc.
emit(new_event)

Đây là bước quan trọng nhất. Nó lấy new_event mà chúng ta vừa tạo và gửi nó ra khỏi transform. Nếu event ban đầu có hai engines, Hàm `emit` này sẽ được gọi hai lần., tạo ra hai sự kiện đầu ra riêng biệt.

3) Sinks

Sinks trong vector là các bạn public kết quả của transfroms hay sources ra prometheus, stdout (console).

3.1) output result to console.

chúng ta sẽ thường xuyền output ra ngoài console (stdout) để sẽ kết quả

  sinks:
    console_metrics:
      type: console
      encoding:
        codec: text
      inputs:
        - convert_to_metrics

3.2) Publish metrics via prometheus client

Lúc này bạn publish metrics thông qua metrics page http://xxx.xxx:metrics thì promeheus server mới scapes metrics được

    convert_to_metrics:
      type: log_to_metric
      inputs:
        - extract_metrics
      metrics:
        - type: gauge
          field: time_ms
          name: response_time_ms
          namespace: argocd
          tags:
            application: '{{ printf "{{ application }}" }}'

  sinks:
    prometheus_exporter:
      type: prometheus_exporter
      inputs:
        - convert_to_metrics
      address: "0.0.0.0:9598"

giờ bạn curl vào localhost:9598/metrics

openvscode-server@openvscode-server-65d78d546b-n2tv2:~$ curl http://vector-headless.default:9598/metrics
# HELP argocd_response_time_ms response_time_ms
# TYPE argocd_response_time_ms gauge
argocd_response_time_ms{application="argocd/kafka-strimzi"} 1367 1702489126145
argocd_response_time_ms{application="argocd/opentelemetry-collector"} 81 1702489125144
argocd_response_time_ms{application="argocd/argocd-image-updater"} 271 1702489125343
argocd_response_time_ms{application="argocd/ingress-nginx"} 277 1702489125343
argocd_response_time_ms{application="argocd/cilium"} 473 1702489125546
argocd_response_time_ms{application="argocd/harbor"} 463 1702489125546
argocd_response_time_ms{application="argocd/vector"} 188 1702489125144
argocd_response_time_ms{application="argocd/backstage"} 226 1702489125145
argocd_response_time_ms{application="argocd/rancher"} 190 1702489125144

Real Example value for vector

role: Stateless-Aggregator
          customConfig:
            api:
              enabled: true
              address: 127.0.0.1:8686
              playground: true
            sources:
              otel_collector:
                type: opentelemetry
                grpc:
                  address: '0.0.0.0:4317'
                http:
                  address: '0.0.0.0:4318'
            
            transforms:
              parse_json_from_log:
                type: remap
                inputs:
                - otel_collector.logs
                source: |
                  # Find the start of the JSON object in the log message
                  json_start_index = find!(.message, "{")

                  # Proceed only if a '{' character is found
                  if json_start_index != null {
                    # Slice the message from the start of the JSON to the end
                    json_string = slice!(.message, json_start_index)
                    # Parse the extracted JSON string and replace the event with it.
                    # The '!' drops events that fail to parse.
                    . = parse_json!(json_string)
                  } else {
                    # If no JSON is found, drop the event to prevent errors downstream
                    abort
                  }
              convert_ms_to_s:
                type: remap
                inputs:
                  - parse_json_from_log
                source: |
                  # Create a new field in seconds
                  .stat.total_scan_time_seconds = to_float!(.stat.total_scan_time) / 1000.0
              scan_time_metrics:
                type: log_to_metric
                inputs:
                  - convert_ms_to_s
                metrics:
                - field: stat.total_scan_time
                  name: nim_total_scan_time
                  type: gauge
                  tags:
                    pool: '{{ printf "{{ priority }}" }}'
                - field: stat.download_time
                  name: nim_download_time
                  type: gauge
                  tags:
                    pool: '{{ printf "{{ priority }}" }}'
                - field: stat.total_scan_time_seconds
                  name: nim_total_scan_time_seconds
                  type: histogram
                  tags:
                    pool: '{{ printf "{{ priority }}" }}'
                - field: stat.request_inqueue_time
                  name: nim_request_inqueue_time
                  type: gauge
                  tags:
                    pool: '{{ printf "{{ priority }}" }}'
                - field: stat.result_inqueue_time
                  name: nim_result_inqueue_time
                  type: gauge
                  tags:
                    pool: '{{ printf "{{ priority }}" }}'
              # Creates an array of objects, with each object containing an engine's name and scan time.
              create_engine_metrics_array:
                type: remap
                inputs:
                  - parse_json_from_log
                source: |
                  .engine_scans = []
                  if exists(.scanResults.scan_details) {
                    for_each(object!(.scanResults.scan_details)) -> |key, value| {
                      .engine_scans = push(.engine_scans, {
                        "engine_name": key,
                        "scan_time": value.scan_time,
                        "inqueue_time": value.inqueue_time
                      })
                    }
                  }
                  # If the array is empty, drop the event so `explode` doesn't generate empty logs.
                  if length(.engine_scans) == 0 {
                      del(.)
                  }
              # Takes the .engine_scans array and creates a separate log event for each element.
              explode_engine_scans:
                type: lua
                inputs:
                  - create_engine_metrics_array
                version: "2"
                hooks:
                  process: |
                    function (event, emit)
                      local engine_scans = event.log.engine_scans
                      if engine_scans and type(engine_scans) == "table" then
                        for i, engine_data in ipairs(engine_scans) do
                          local new_event = {
                            log = {
                              engine_name = engine_data.engine_name,
                              scan_time = engine_data.scan_time,
                              inqueue_time = engine_data.inqueue_time,
                              priority = event.log.priority
                            }
                          }
                          emit(new_event)
                        end
                      end
                    end
              # Converts each exploded log event into a gauge metric.
              engine_scan_time_to_metric:
                type: log_to_metric
                inputs:
                  - explode_engine_scans
                metrics:
                  - type: gauge
                    field: "scan_time" # The value for the gauge
                    name: "nim_engine_scan_time"
                    tags:
                      engine_name: '{{ printf "{{ engine_name }}" }}' # Dynamic tag from the event
                      pool: '{{ printf "{{ priority }}" }}' # Preserve the pool tag from the original event
                  - type: gauge
                    field: "inqueue_time"
                    name: "nim_engine_inqueue_time"
                    tags:
                      engine_name: '{{ printf "{{ engine_name }}" }}'
                      pool: '{{ printf "{{ priority }}" }}'
            sinks:
              console_logs:
                type: console
                encoding:
                  codec: text
                inputs:
                  - "otel_collector.logs"
              prometheus_exporter:
                type: prometheus_exporter
                inputs:
                  - scan_time_metrics
                  - engine_scan_time_to_metric
                address: 0.0.0.0:9598
                buckets: [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 0.75, 1, 1.5, 2, 2.5, 3, 3.5, 4, 4.5, 5, 5.5, 6, 6.5, 7, 7.5, 8, 8.5, 9, 9.5, 10]
          service:
            ports:
              - name: otel-collector-http
                port: 4318
                protocol: TCP
              - name: otel-collector-grpc
                port: 4317
                protocol: TCP
              - name: metrics
                port: 9598
                protocol: TCP
          podAnnotations:
            prometheus.io/scrape: 'true'
            prometheus.io/path: '/metrics'
            prometheus.io/port: '9598'
Log, Monitor & Tracing, Uncategorized

Post navigation

Previous Post: [Script] Create a large file with a depth of 50 folders and 100,000 child files; the max size is 1GB.
Next Post: [Rancher] Login Rancher by Azure ID or MicroSoft account.

More Related Articles

[Phi&P] Leadershift Certificate. Coding
[Jenkins] Share Libraries 6: Closures Jenkins
[Kibana] Sankey chart draw from point A to point B – So beautiful ELK
[Sidecar/Kubernestes] Inject sidecar into a Pod automatically Kubernetes
[Kafka] Kafka Console Producer CLI. Apache Kafka
[Prometheus] Relabelling – Đưa thông tin từ Discovered Label sang target label Log, Monitor & Tracing

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Tham Gia Group DevOps nhé!
Để Nim có nhiều động lực ra nhiều bài viết.
Để nhận được những thông báo mới nhất.

Recent Posts

  • [Rancher/EKS] Rancher from v2.12.x can not work on eks cluster. April 15, 2026
  • [Telegram/Openclaw] Configure openclaw bot in a Telegram group. March 31, 2026
  • Tutorial: Gateway API + Traefik + oauth2-proxy (Microsoft Entra ID) March 30, 2026
  • Full + incremental backup: When restoring, do deleted files come back? March 27, 2026
  • [K8S] Create long-lived kubeconfig on k8s March 23, 2026

Archives

  • April 2026
  • March 2026
  • February 2026
  • January 2026
  • December 2025
  • November 2025
  • October 2025
  • September 2025
  • August 2025
  • July 2025
  • June 2025
  • May 2025
  • April 2025
  • March 2025
  • February 2025
  • January 2025
  • December 2024
  • November 2024
  • October 2024
  • September 2024
  • August 2024
  • July 2024
  • June 2024
  • May 2024
  • April 2024
  • March 2024
  • February 2024
  • January 2024
  • December 2023
  • November 2023
  • October 2023
  • September 2023
  • August 2023
  • July 2023
  • June 2023
  • May 2023
  • April 2023
  • March 2023
  • February 2023
  • January 2023
  • December 2022
  • November 2022
  • October 2022
  • September 2022
  • August 2022
  • July 2022
  • June 2022
  • May 2022
  • April 2022
  • March 2022
  • February 2022
  • January 2022
  • December 2021
  • November 2021
  • October 2021
  • September 2021
  • August 2021
  • July 2021
  • June 2021

Categories

  • AI
    • OpenClaw
  • BareMetal
    • NextCloud
  • CI/CD
    • Argo Events
    • ArgoCD
    • ArgoWorkflows
    • Git
      • Bitbucket
    • Harbor
    • Jenkins
    • Spinnaker
    • TeamCity
  • Coding
    • DevSecOps
    • Golang
    • Jquery & JavaScript
    • Laravel
    • NextJS 14 & ReactJS & Type Script
    • Python
    • Selenium
    • Terraform
      • AWS – Amazon Web Service
      • Azure Cloud
      • GCP – Google Cloud
  • Kubernetes & Container
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Docker
    • Helm Chart
    • Isito-EnvoyFilter
    • Kong Gateway
    • Kubernetes
      • Gateway API
      • Ingress
      • Pod
    • Longhorn – Storage
    • MetalLB
    • OAuth2 Proxy
    • Vault
    • VictoriaMetrics
  • Log, Monitor & Tracing
    • DataDog
    • ELK
      • Kibana
      • Logstash
    • Fluent
    • Grafana
    • Prometheus
  • Uncategorized
  • Admin

Copyright © 2026 NimTechnology.