代码编织梦想

Kafka 是一款具备高吞吐量、高可靠性和高可扩展性的分布式消息队列,而 GreptimeDB 是专门用于存储时间序列数据的开源时序数据库。两者在各自的领域都表现出色,但如何高效地连接它们以实现数据的无缝传输和处理?

Vector 作为一个高速且可扩展的数据管道工具发挥了作用。它能够从多个来源(如应用日志、系统指标)收集、转换并传输数据,并将这些数据发送到不同的目标(如数据库、监控系统)。

而随着 GreptimeDB 现已全面支持日志数据的存储与分析,日志接收功能 greptime log sink 已被集成到 Vector 中,使用户可以通过 greptime_logs sink 将来自 Vector 的各种数据源轻松写入 GreptimeDB。详情和示例代码可参考此文:《Vector 增加 GreptimeDB 日志写入支持,连接数十种数据源》。

接下来,本文将详细介绍如何使用 Vector 从 Kafka 读取日志数据并将其写入 GreptimeDB,包括具体的实现步骤与示例代码。

准备工作

假设我们已经有一个 Kafka 集群,其中有一个名为 test_topic 的 topic,里面存储了日志数据。Kafka 中的示例数据内容如下:

127.0.0.1 - - [04/Sep/2024:15:46:13 -0700] "GET / HTTP/1.1" 200 615 "-" "Mozilla/5.0 (X11; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0"

接下来,我们需要安装 Vector 和 GreptimeDB。

安装 & 配置 Vector

Vector 是一个开源的数据收集工具,支持从多种数据源读取数据,并将数据写入多种数据目的地。我们可以使用 Vector 从 Kafka 读取数据,并将数据写入 GreptimeDB。

安装 Vector 非常简单,可通过二进制容器等进行安装,具体安装步骤请参考 Vector 官方文档

安装完成后,我们需要配置 Vector,使其能够从 Kafka 读取数据并写入 GreptimeDB。下面是一个简单的 Vector 配置文件:

[sources.mq]
type = "kafka"
group_id = "vector0"
topics = ["test_topic"]
bootstrap_servers = "kafka:9092"

[sinks.console]
type = "console"
inputs = [ "mq" ]
encoding.codec = "text"

[sinks.sink_greptime_logs]
type = "greptimedb_logs"
table = "demo_logs"
pipeline_name = "demo_pipeline"
compression = "gzip"
inputs = [ "mq" ]
endpoint = "http://greptimedb:4000"

上面的配置文件中,我们定义了一个名为 mq 的 source,用于从 Kafka 读取数据。我们还定义了一个名为 sink_greptime_logs 的 sink,用于将数据写入 GreptimeDB。

安装 & 配置 GreptimeDB

GreptimeDB 是一个开源的时序数据库,专门用于存储时间序列数据。我们可以使用 GreptimeDB 存储从 Kafka 读取的日志数据。

安装 GreptimeDB 同样非常简单,可通过二进制、容器等进行安装。具体安装步骤请参考 GreptimeDB 官方文档

安装完成后,我们使用默认配置即可。因为日志数据多种多样,我们提供了 Pipeline 功能来处理和过滤日志数据,只保留日志中我们关心的数据,我们将在后续技术博客中分享 Pipeline 引擎的实现原理和方案步骤,敬请期待。

如下例子对我们提供的 nginx 日志格式进行了解析,我们使用如下所示的 Pipeline 配置文件。

processors:
  - dissect:
      fields:
        - message
      patterns:
        - '%{ip} - - [%{datetime}] "%{method} %{path} %{protocol}" %{status} %{size} "-" "%{user_agent}"'
  - date:
      fields:
        - datetime
      formats:
        - "%d/%b/%Y:%H:%M:%S %z"
  - date:
      fields:
        - timestamp
      formats:
        - "%Y-%m-%dT%H:%M:%S%.3fZ"

transform:
  - fields:
      - ip
      - path
    type: string
  - fields:
      - method
      - protocol
    type: string
    index: tag
  - fields:
      - user_agent
    type: string
    index: fulltext
  - fields:
      - status
    type: uint32
    index: tag
  - fields:
      - size
    type: uint32
  - fields:
      - datetime
    type: timestamp
    index: timestamp
  - fields:
      - timestamp
    type: timestamp

在上面的 Pipeline 配置文件中,我们使用 dissect processor 对日志数据进行解析。本来非结构化的日志数据,被拆分并进行格式转化后,获得了一个结构化的数据,包含 ipdatatimemethodpathprotocolstatussizeuser_agent 。然后使用 date processor 对时间两个不同格式的时间字段进行解析。最后使用 transform 对字段进行转换,并设置 index。

关于 index,我们指定了 methodprotocolstatus 为 tag 字段,主要用于高效的查询,一些不确定值的数量的字段,或者值的数量特别多的,不建议设置为 tag,这会导致高基问题。所以 ipsize 均没有被设置为 tag 字段。

pathuser_agent 字段,我们增加了全文索引。以便可以使用模糊搜索来快速找到的关心的内容。详细的查询语法可参考此处

上述配置文件可通过 HTTP 接口上传到 GreptimeDB 中,以创建一个名为 demo_pipeline 的 Pipeline 用于日志的解析与修剪,然后存入 GreptimeDB 中。

curl -X 'POST' 'http://greptimedb:4000/v1/events/pipelines/demo_pipeline' -F 'file=@/config_data/pipeline.yaml' -v

运行 Vector & GreptimeDB

现在,我们已经准备好了 Vector 和 GreptimeDB,现在就可以运行它们了。成功后,Vector 将从 Kafka 读取数据,并将数据写入 GreptimeDB。

我们可以通过 MySQL 协议连接 GreptimeDB,查看数据。

mysql> show tables;
+-------------+
| Tables      |
+-------------+
| demo_logs   |
| numbers     |
+-------------+
3 rows in set (0.00 sec)

mysql> select * from demo_logs order by timestamp desc limit 10;
+-----------------+--------+----------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+
| ip              | method | protocol | path     | user_agent                                                                                                                                                                                    | status | size | datetime            | timestamp                  |
+-----------------+--------+----------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+
| 37.254.223.207  | DELETE | HTTP/2.0 | /about   | Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; The World)                                                                                                                                 |    201 |  495 | 2024-10-28 03:39:29 | 2024-10-28 03:39:29.982000 |
| 113.26.47.170   | PUT    | HTTP/2.0 | /contact | Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; LBBROWSER) |    404 |  183 | 2024-10-28 03:39:26 | 2024-10-28 03:39:26.977000 |
| 33.80.49.13     | PUT    | HTTP/2.0 | /about   | Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11                                                                        |    500 |  150 | 2024-10-28 03:39:23 | 2024-10-28 03:39:23.973000 |
| 240.14.156.37   | DELETE | HTTP/1.1 | /contact | Mozilla/5.0 (Linux; U; Android 3.0; en-us; Xoom Build/HRI39) AppleWebKit/534.13 (KHTML, like Gecko) Version/4.0 Safari/534.13                                                                 |    200 |  155 | 2024-10-28 03:39:20 | 2024-10-28 03:39:20.969000 |
| 210.90.39.41    | POST   | HTTP/2.0 | /about   | Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; LBBROWSER) |    201 |  188 | 2024-10-28 03:39:17 | 2024-10-28 03:39:17.964000 |
| 219.88.194.150  | DELETE | HTTP/1.1 | /contact | Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.0; Trident/4.0)                                                                                                                               |    404 |  704 | 2024-10-28 03:39:14 | 2024-10-28 03:39:14.963000 |
| 130.255.0.241   | DELETE | HTTP/1.1 | /contact | Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1)                                                                                                                                            |    500 |  816 | 2024-10-28 03:39:11 | 2024-10-28 03:39:11.959000 |
| 168.144.155.215 | POST   | HTTP/1.1 | /        | Mozilla/5.0 (iPhone; U; CPU iPhone OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5                                          |    500 |  511 | 2024-10-28 03:39:08 | 2024-10-28 03:39:08.954000 |
| 28.112.30.158   | GET    | HTTP/1.1 | /about   | Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; en) Opera 9.50                                                                                                                             |    200 |  842 | 2024-10-28 03:39:05 | 2024-10-28 03:39:05.950000 |
| 166.9.187.104   | GET    | HTTP/2.0 | /blog    | Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.71 Safari/537.36                                                                                  |    201 |  970 | 2024-10-28 03:39:02 | 2024-10-28 03:39:02.946000 |
+-----------------+--------+----------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+
10 rows in set (0.00 sec)

mysql> desc demo_logs;
+------------+---------------------+------+------+---------+---------------+
| Column     | Type                | Key  | Null | Default | Semantic Type |
+------------+---------------------+------+------+---------+---------------+
| ip         | String              |      | YES  |         | FIELD         |
| method     | String              | PRI  | YES  |         | TAG           |
| protocol   | String              | PRI  | YES  |         | TAG           |
| path       | String              |      | YES  |         | FIELD         |
| user_agent | String              |      | YES  |         | FIELD         |
| status     | UInt32              | PRI  | YES  |         | TAG           |
| size       | UInt32              |      | YES  |         | FIELD         |
| datetime   | TimestampNanosecond | PRI  | NO   |         | TIMESTAMP     |
| timestamp  | TimestampNanosecond |      | YES  |         | FIELD         |
+------------+---------------------+------+------+---------+---------------+
9 rows in set (0.00 sec)


mysql> show create table demo_logs;
+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table     | Create Table                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| demo_logs | CREATE TABLE IF NOT EXISTS `demo_logs` (
  `ip` STRING NULL,
  `method` STRING NULL,
  `protocol` STRING NULL,
  `path` STRING NULL FULLTEXT WITH(analyzer = 'English', case_sensitive = 'false'),
  `user_agent` STRING NULL FULLTEXT WITH(analyzer = 'English', case_sensitive = 'false'),
  `status` INT UNSIGNED NULL,
  `size` INT UNSIGNED NULL,
  `datetime` TIMESTAMP(9) NOT NULL,
  `timestamp` TIMESTAMP(9) NULL,
  TIME INDEX (`datetime`),
  PRIMARY KEY (`method`, `protocol`, `status`)
)

ENGINE=mito
WITH(
  append_mode = 'true'
) |
+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)

现在我们的数据已经入库,可以利用 GreptimeDB 提供的一些功能来快速过滤我们关心的数据,比如通过全文搜索我们可以对 UA 进行模糊匹配,快速找到 UA 包含 Android 的数据。

mysql> SELECT * FROM demo_logs WHERE MATCHES(user_agent, 'Android') limit 10;
+-----------------+--------+----------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+
| ip              | method | protocol | path     | user_agent                                                                                                                                                         | status | size | datetime            | timestamp                  |
+-----------------+--------+----------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+
| 240.14.156.37   | DELETE | HTTP/1.1 | /contact | Mozilla/5.0 (Linux; U; Android 3.0; en-us; Xoom Build/HRI39) AppleWebKit/534.13 (KHTML, like Gecko) Version/4.0 Safari/534.13                                      |    200 |  155 | 2024-10-28 03:39:20 | 2024-10-28 03:39:20.969000 |
| 186.44.204.29   | DELETE | HTTP/1.1 | /        | Opera/9.80 (Android 2.3.4; Linux; Opera Mobi/build-1107180945; U; en-GB) Presto/2.8.149 Version/11.10                                                              |    201 |  343 | 2024-10-28 03:45:33 | 2024-10-28 03:45:33.459000 |
| 75.246.111.167  | DELETE | HTTP/1.1 | /contact | Mozilla/5.0 (Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1                          |    404 |  869 | 2024-10-28 03:38:59 | 2024-10-28 03:38:59.942000 |
| 236.239.192.109 | DELETE | HTTP/1.1 | /contact | Mozilla/5.0 (Linux; U; Android 2.2.1; zh-cn; HTC_Wildfire_A3333 Build/FRG83D) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1                |    500 |  892 | 2024-10-28 03:38:53 | 2024-10-28 03:38:53.934000 |
| 232.232.14.176  | DELETE | HTTP/1.1 | /contact | Mozilla/5.0 (Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1                          |    500 |  644 | 2024-10-28 03:46:42 | 2024-10-28 03:46:42.550000 |
| 135.16.130.172  | DELETE | HTTP/2.0 | /        | MQQBrowser/26 Mozilla/5.0 (Linux; U; Android 2.3.7; zh-cn; MB200 Build/GRJ22; CyanogenMod-7) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1 |    404 |  177 | 2024-10-28 03:47:27 | 2024-10-28 03:47:27.613000 |
| 69.23.7.123     | GET    | HTTP/1.1 | /blog    | Mozilla/5.0 (Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1                          |    201 |  770 | 2024-10-28 03:45:09 | 2024-10-28 03:45:09.425000 |
| 37.61.6.211     | GET    | HTTP/1.1 | /blog    | MQQBrowser/26 Mozilla/5.0 (Linux; U; Android 2.3.7; zh-cn; MB200 Build/GRJ22; CyanogenMod-7) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1 |    404 |  298 | 2024-10-28 03:45:21 | 2024-10-28 03:45:21.442000 |
| 244.166.255.46  | GET    | HTTP/2.0 | /blog    | Mozilla/5.0 (Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1                          |    201 |  963 | 2024-10-28 03:45:48 | 2024-10-28 03:45:48.478000 |
| 35.169.107.238  | GET    | HTTP/2.0 | /blog    | Mozilla/5.0 (Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1                          |    404 |  249 | 2024-10-28 03:46:48 | 2024-10-28 03:46:48.558000 |
+-----------------+--------+----------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+
10 rows in set (0.01 sec)

在进行一些统计工作或者问题排查的时候,经常性的需要区分用户的渠道和 url。例如,我们可能需要筛选出 Android 渠道的用户,并查看访问 blog 页面时的 HTTP 状态码分布。通过以下 SQL 查询,可以快速获取所需结果,显著减少数据处理时间。

mysql> SELECT method,status,count(*) FROM demo_logs WHERE MATCHES(user_agent, 'Android') and MATCHES(path, 'blog') group by method,status;
+--------+--------+----------+
| method | status | COUNT(*) |
+--------+--------+----------+
| GET    |    404 |        2 |
| GET    |    201 |        3 |
| PUT    |    500 |        2 |
| POST   |    404 |        1 |
+--------+--------+----------+
4 rows in set (0.01 sec)

我们已经将此过程打包成一个 docker compose 文件,欢迎前往 GitHub demo-scene repo 获取相关源码和指南:

https://github.com/GreptimeTeam/demo-scene/tree/main/kafka-ingestion

总结

本文介绍了如何利用 Vector 从 Kafka 读取日志数据并写入 GreptimeDB。Vector 是一个开源的数据收集工具,支持从多种数据源读取数据,并将数据写入多种数据目的地。目前已支持 GreptimeDB 的 sink,可以很方便的将原先系统中的监控数据导入 GreptimeDB 中。

本文介绍了如何利用 Vector 工具将 Kafka 中的日志数据无缝传输至 GreptimeDB 中,充分利用 GreptimeDB 在存储和分析时序数据上的优势,以及 Vector 的灵活性让数据处理更加高效。

GreptimeDB 强大的日志存储和查询功能为日志分析提供了可靠保障,无论是构建日志管理系统,还是进行实时监控与分析,Kafka + Vector + GreptimeDB 的组合能够帮助用户实现高效的数据流转与处理

未来我们将进一步介绍如何通过 GreptimeDB 的 Pipeline 引擎实现更加复杂的日志处理和数据过滤,敬请期待!

11 月 9 日我们将在深圳举办「云平台及 AI 时代下的可观测性技术演进」线下沙龙,欢迎报名: https://1965290734055.huodongxing.com/event/6777706662000

关于 Greptime

Greptime 格睿科技专注于为可观测、物联网及车联网等领域提供实时、高效的数据存储和分析服务,帮助客户挖掘数据的深层价值。目前基于云原生的时序数据库 GreptimeDB 已经衍生出多款适合不同用户的解决方案,更多信息或 demo 展示请联系下方小助手(微信号:greptime)。

欢迎对开源感兴趣的朋友们参与贡献和讨论,从带有 good first issue 标签的 issue 开始你的开源之旅吧~期待在开源社群里遇见你!添加小助手微信即可加入“技术交流群”与志同道合的朋友们面对面交流哦~

Star us on GitHub Now: https://github.com/GreptimeTeam/greptimedb

官网:https://greptime.cn/
文档:https://docs.greptime.cn/
Twitter: https://twitter.com/Greptime
Slack: https://greptime.com/slack
LinkedIn: https://www.linkedin.com/company/greptime/

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/beary_tang/article/details/143333754

springBoot 将日志通过logback写入kafka实战-爱代码爱编程

在web项目实战过程中,我们对日志的处理一般有多种方式,直接写到文件、或者直接写入ELK或者直接写入kafka,今天我们分享直接写入kafka的实战流程: 1、pom文件引入jar <dependency> <groupId>com.github.danielwegener</groupId> <art

kafka数据和日志目录迁移教程_kafka的日志文件在哪-爱代码爱编程

简介 Kafka在运行的过程中,存储在磁盘上的数据会逐渐扩大,甚至会撑爆系统盘,在线上环境我们通常会把kafka的数据存储目录和日志存储目录迁移到磁盘中,或者扩容kafka的存储磁盘。本文将一站式解决kafka的磁盘存储或

kafka系列之—向kafka 写入数据(四)_kafka批量数据写入-爱代码爱编程

一, 创建Kafka生产者 1.1 必选的三个属性 1.1.1 bootstrap.servers 指定broker的地址清单,不需要包含所有的broker地址,生产者会从给定的broker里找到其它broker的信息,建议最少提供两个broker的信息。 1.1.2 key.serializer broker希望接收到的消息的键和值都是字节数组

物联网海量数据下的时序数据库选型:influxdb、tdengine、mongodb与hbase对比与建议-爱代码爱编程

随着物联网(IoT)的普及,各行业纷纷部署大量传感器、设备生成的数据流,面对如此海量的时间序列数据,如何高效存储、查询和分析成为关键。为此,时序数据库(Time Series Database, TSDB)在IoT系统中得到