资讯专栏INFORMATION COLUMN

k8s与日志--采用golang实现Fluent Bit的output插件

binta / 1553人阅读

摘要:采用实现的插件前言目前社区日志采集和处理的组件不少,之前方案中的,社区中的,方案中的以及大数据用到比较多的。适合采用的方案,实现日志中心化收集的方案。主要负责采集,负责处理和传送。

采用golang实现Fluent Bit的output插件 前言

目前社区日志采集和处理的组件不少,之前elk方案中的logstash,cncf社区中的fluentd,efk方案中的filebeat,以及大数据用到比较多的flume。而Fluent Bit是一款用c语言编写的高性能的日志收集组件,整个架构源于fluentd。官方比较数据如下:

Fluentd Fluent Bit
Scope Containers / Servers Containers / Servers
Language C & Ruby C
Memory ~40MB ~450KB
Performance High Performance High Performance
Dependencies Built as a Ruby Gem, it requires a certain number of gems. Zero dependencies, unless some special plugin requires them.
Plugins More than 650 plugins available Around 35 plugins available
License Apache License v2.0 Apache License v2.0

通过数据可以看出,fluent bit 占用资源更少。适合采用fluent bit + fluentd 的方案,实现日志中心化收集的方案。fluent bit主要负责采集,fluentd负责处理和传送。

扩展output插件

fluent bit 本身是C语言编写,扩展插件有一定的难度。可能官方考虑到这一点,实现了fluent-bit-go,可以实现采用go语言来编写插件,目前只支持output的编写。
fluent-bit-go其实就是利用cgo,封装了c接口。代码比较简单,主要分析其中一个关键文件

</>复制代码

  1. package output
  2. /*
  3. #include
  4. #include "flb_plugin.h"
  5. #include "flb_output.h"
  6. */
  7. import "C"
  8. import "fmt"
  9. import "unsafe"
  10. // Define constants matching Fluent Bit core
  11. const FLB_ERROR = C.FLB_ERROR
  12. const FLB_OK = C.FLB_OK
  13. const FLB_RETRY = C.FLB_RETRY
  14. const FLB_PROXY_OUTPUT_PLUGIN = C.FLB_PROXY_OUTPUT_PLUGIN
  15. const FLB_PROXY_GOLANG = C.FLB_PROXY_GOLANG
  16. // Local type to define a plugin definition
  17. type FLBPlugin C.struct_flb_plugin_proxy
  18. type FLBOutPlugin C.struct_flbgo_output_plugin
  19. // When the FLBPluginInit is triggered by Fluent Bit, a plugin context
  20. // is passed and the next step is to invoke this FLBPluginRegister() function
  21. // to fill the required information: type, proxy type, flags name and
  22. // description.
  23. func FLBPluginRegister(ctx unsafe.Pointer, name string, desc string) int {
  24. p := (*FLBPlugin) (unsafe.Pointer(ctx))
  25. p._type = FLB_PROXY_OUTPUT_PLUGIN
  26. p.proxy = FLB_PROXY_GOLANG
  27. p.flags = 0
  28. p.name = C.CString(name)
  29. p.description = C.CString(desc)
  30. return 0
  31. }
  32. // Release resources allocated by the plugin initialization
  33. func FLBPluginUnregister(ctx unsafe.Pointer) {
  34. p := (*FLBPlugin) (unsafe.Pointer(ctx))
  35. fmt.Printf("[flbgo] unregistering %v
  36. ", p)
  37. C.free(unsafe.Pointer(p.name))
  38. C.free(unsafe.Pointer(p.description))
  39. }
  40. func FLBPluginConfigKey(ctx unsafe.Pointer, key string) string {
  41. _key := C.CString(key)
  42. return C.GoString(C.output_get_property(_key, unsafe.Pointer(ctx)))
  43. }

主要是定义了一些编写插件需要用到的变量和方法,例如FLBPluginRegister注册组件,FLBPluginConfigKey获取配置文件设定参数等。
PS
实际上用golang调用fluent-bit-go,再加一些实际的业务逻辑实现,最终编译成一个c-share的.so动态链接库。

定制fluent-bit-kafka-ouput插件

实际上,fluent-bit v0.13版本以后就提供了kafka output的插件,但是实际项目中,并不满足我们的需求,必须定制化。
当然接下来的代码主要是作为一个demo,讲清楚如何编写一个output插件。

代码编写和分析

先上代码:

</>复制代码

  1. package main
  2. import (
  3. "C"
  4. "fmt"
  5. "io"
  6. "log"
  7. "reflect"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "unsafe"
  12. "github.com/Shopify/sarama"
  13. "github.com/fluent/fluent-bit-go/output"
  14. "github.com/ugorji/go/codec"
  15. )
  16. var (
  17. brokers []string
  18. producer sarama.SyncProducer
  19. timeout = 0 * time.Minute
  20. topic string
  21. module string
  22. messageKey string
  23. )
  24. //export FLBPluginRegister
  25. func FLBPluginRegister(ctx unsafe.Pointer) int {
  26. return output.FLBPluginRegister(ctx, "out_kafka", "Kafka Output Plugin.!")
  27. }
  28. //export FLBPluginInit
  29. // ctx (context) pointer to fluentbit context (state/ c code)
  30. func FLBPluginInit(ctx unsafe.Pointer) int {
  31. if bs := output.FLBPluginConfigKey(ctx, "brokers"); bs != "" {
  32. brokers = strings.Split(bs, ",")
  33. } else {
  34. log.Printf("you must set brokers")
  35. return output.FLB_ERROR
  36. }
  37. if tp := output.FLBPluginConfigKey(ctx, "topics"); tp != "" {
  38. topic = tp
  39. } else {
  40. log.Printf("you must set topics")
  41. return output.FLB_ERROR
  42. }
  43. if mo := output.FLBPluginConfigKey(ctx, "module"); mo != "" {
  44. module = mo
  45. } else {
  46. log.Printf("you must set module")
  47. return output.FLB_ERROR
  48. }
  49. if key := output.FLBPluginConfigKey(ctx, "message_key"); key != "" {
  50. messageKey = key
  51. } else {
  52. log.Printf("you must set message_key")
  53. return output.FLB_ERROR
  54. }
  55. config := sarama.NewConfig()
  56. config.Producer.Return.Successes = true
  57. if required_acks := output.FLBPluginConfigKey(ctx, "required_acks"); required_acks != "" {
  58. if acks, err := strconv.Atoi(required_acks); err == nil {
  59. config.Producer.RequiredAcks = sarama.RequiredAcks(acks)
  60. }
  61. }
  62. if compression_codec := output.FLBPluginConfigKey(ctx, "compression_codec"); compression_codec != "" {
  63. if codec, err := strconv.Atoi(compression_codec); err == nil {
  64. config.Producer.Compression = sarama.CompressionCodec(codec)
  65. }
  66. }
  67. if max_retry := output.FLBPluginConfigKey(ctx, "max_retry"); max_retry != "" {
  68. if max_retry, err := strconv.Atoi(max_retry); err == nil {
  69. config.Producer.Retry.Max = max_retry
  70. }
  71. }
  72. if timeout == 0 {
  73. timeout = 5 * time.Minute
  74. }
  75. // If Kafka is not running on init, wait to connect
  76. deadline := time.Now().Add(timeout)
  77. for tries := 0; time.Now().Before(deadline); tries++ {
  78. var err error
  79. if producer == nil {
  80. producer, err = sarama.NewSyncProducer(brokers, config)
  81. }
  82. if err == nil {
  83. return output.FLB_OK
  84. }
  85. log.Printf("Cannot connect to Kafka: (%s) retrying...", err)
  86. time.Sleep(time.Second * 30)
  87. }
  88. log.Printf("Kafka failed to respond after %s", timeout)
  89. return output.FLB_ERROR
  90. }
  91. //export FLBPluginFlush
  92. // FLBPluginFlush is called from fluent-bit when data need to be sent. is called from fluent-bit when data need to be sent.
  93. func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
  94. var h codec.MsgpackHandle
  95. var b []byte
  96. var m interface{}
  97. var err error
  98. b = C.GoBytes(data, length)
  99. dec := codec.NewDecoderBytes(b, &h)
  100. // Iterate the original MessagePack array
  101. var msgs []*sarama.ProducerMessage
  102. for {
  103. // decode the msgpack data
  104. err = dec.Decode(&m)
  105. if err != nil {
  106. if err == io.EOF {
  107. break
  108. }
  109. log.Printf("Failed to decode msgpack data: %v
  110. ", err)
  111. return output.FLB_ERROR
  112. }
  113. // Get a slice and their two entries: timestamp and map
  114. slice := reflect.ValueOf(m)
  115. data := slice.Index(1)
  116. // Convert slice data to a real map and iterate
  117. mapData := data.Interface().(map[interface{}]interface{})
  118. flattenData, err := Flatten(mapData, "", UnderscoreStyle)
  119. if err != nil {
  120. break
  121. }
  122. message := ""
  123. host := ""
  124. for k, v := range flattenData {
  125. value := ""
  126. switch t := v.(type) {
  127. case string:
  128. value = t
  129. case []byte:
  130. value = string(t)
  131. default:
  132. value = fmt.Sprintf("%v", v)
  133. }
  134. if k == "pod_name" {
  135. host = value
  136. }
  137. if k == messageKey {
  138. message = value
  139. }
  140. }
  141. if message == "" || host == "" {
  142. break
  143. }
  144. m := &sarama.ProducerMessage{
  145. Topic: topic,
  146. Key: sarama.StringEncoder(fmt.Sprintf("host=%s|module=%s", host, module)),
  147. Value: sarama.ByteEncoder(message),
  148. }
  149. msgs = append(msgs, m)
  150. }
  151. err = producer.SendMessages(msgs)
  152. if err != nil {
  153. log.Printf("FAILED to send kafka message: %s
  154. ", err)
  155. return output.FLB_ERROR
  156. }
  157. return output.FLB_OK
  158. }
  159. //export FLBPluginExit
  160. func FLBPluginExit() int {
  161. producer.Close()
  162. return output.FLB_OK
  163. }
  164. func main() {
  165. }

FLBPluginExit 插件退出的时候需要执行的一些方法,比如关闭连接。

FLBPluginRegister 注册插件

FLBPluginInit 插件初始化

FLBPluginFlush flush到数据到output

FLBPluginConfigKey 获取配置文件中参数

PS
当然除了FLBPluginConfigKey之外,也可以通过获取环境变量来获得设置参数。
ctx相当于一个上下文,负责之间的数据的传递。

编译和执行

编译的时候

</>复制代码

  1. go build -buildmode=c-shared -o out_kafka.so .

生成out_kafka.so

执行的时候

</>复制代码

  1. /fluent-bit/bin/fluent-bit" -c /fluent-bit/etc/fluent-bit.conf -e /fluent-bit/out_kafka.so
总结

采用类似的编写结构,就可以定制化自己的输出插件了。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/32693.html

相关文章

  • k8s日志--采用golang实现Fluent Bitoutput插件

    摘要:采用实现的插件前言目前社区日志采集和处理的组件不少,之前方案中的,社区中的,方案中的以及大数据用到比较多的。适合采用的方案,实现日志中心化收集的方案。主要负责采集,负责处理和传送。 采用golang实现Fluent Bit的output插件 前言 目前社区日志采集和处理的组件不少,之前elk方案中的logstash,cncf社区中的fluentd,efk方案中的filebeat,以及大...

    岳光 评论0 收藏0
  • k8slog--利用fluent bit收集k8s日志

    摘要:是一个开源和多平台的,它允许您从不同的来源收集数据日志,统一并将它们发送到多个目的地。例如日志收集日志分析主要讲部署的集群。日志主要有和的日志,一般采用部署,自然而然就是要支持格式日志的采集。业务落盘的日志。部署方案采取部署。 前言 收集日志的组件多不胜数,有ELK久负盛名组合中的logstash, 也有EFK组合中的filebeat,更有cncf新贵fluentd,另外还有大数据领域...

    betacat 评论0 收藏0
  • k8slog--利用fluent bit收集k8s日志

    摘要:是一个开源和多平台的,它允许您从不同的来源收集数据日志,统一并将它们发送到多个目的地。例如日志收集日志分析主要讲部署的集群。日志主要有和的日志,一般采用部署,自然而然就是要支持格式日志的采集。业务落盘的日志。部署方案采取部署。 前言 收集日志的组件多不胜数,有ELK久负盛名组合中的logstash, 也有EFK组合中的filebeat,更有cncf新贵fluentd,另外还有大数据领域...

    CoffeX 评论0 收藏0
  • 关于k8s集群容器日志收集总结

    摘要:我推荐你使用进行日志收集,将作为的出口。集群目前暂时没有提供日志查看机制。以如下的形式启动容器,容器日志将发往配置的。 【作者barnett】本文介绍了k8s官方提供的日志收集方法,并介绍了Fluentd日志收集器并与其他产品做了比较。最后介绍了好雨云帮如何对k8s进行改造并使用ZeroMQ以消息的形式将日志传输到统一的日志处理中心。 容器日志存在形式 目前容器日志有两种输出形式: ...

    jeffrey_up 评论0 收藏0
  • 关于k8s集群容器日志收集总结

    摘要:我推荐你使用进行日志收集,将作为的出口。集群目前暂时没有提供日志查看机制。以如下的形式启动容器,容器日志将发往配置的。 【作者barnett】本文介绍了k8s官方提供的日志收集方法,并介绍了Fluentd日志收集器并与其他产品做了比较。最后介绍了好雨云帮如何对k8s进行改造并使用ZeroMQ以消息的形式将日志传输到统一的日志处理中心。 容器日志存在形式 目前容器日志有两种输出形式: ...

    or0fun 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<