摘要:采用实现的插件前言目前社区日志采集和处理的组件不少,之前方案中的,社区中的,方案中的以及大数据用到比较多的。适合采用的方案,实现日志中心化收集的方案。主要负责采集,负责处理和传送。
采用golang实现Fluent Bit的output插件 前言
目前社区日志采集和处理的组件不少,之前elk方案中的logstash,cncf社区中的fluentd,efk方案中的filebeat,以及大数据用到比较多的flume。而Fluent Bit是一款用c语言编写的高性能的日志收集组件,整个架构源于fluentd。官方比较数据如下:
Fluentd | Fluent Bit | |
---|---|---|
Scope | Containers / Servers | Containers / Servers |
Language | C & Ruby | C |
Memory | ~40MB | ~450KB |
Performance | High Performance | High Performance |
Dependencies | Built as a Ruby Gem, it requires a certain number of gems. | Zero dependencies, unless some special plugin requires them. |
Plugins | More than 650 plugins available | Around 35 plugins available |
License | Apache License v2.0 | Apache License v2.0 |
通过数据可以看出,fluent bit 占用资源更少。适合采用fluent bit + fluentd 的方案,实现日志中心化收集的方案。fluent bit主要负责采集,fluentd负责处理和传送。
扩展output插件fluent bit 本身是C语言编写,扩展插件有一定的难度。可能官方考虑到这一点,实现了fluent-bit-go,可以实现采用go语言来编写插件,目前只支持output的编写。
fluent-bit-go其实就是利用cgo,封装了c接口。代码比较简单,主要分析其中一个关键文件
</>复制代码
package output
/*
#include
#include "flb_plugin.h"
#include "flb_output.h"
*/
import "C"
import "fmt"
import "unsafe"
// Define constants matching Fluent Bit core
const FLB_ERROR = C.FLB_ERROR
const FLB_OK = C.FLB_OK
const FLB_RETRY = C.FLB_RETRY
const FLB_PROXY_OUTPUT_PLUGIN = C.FLB_PROXY_OUTPUT_PLUGIN
const FLB_PROXY_GOLANG = C.FLB_PROXY_GOLANG
// Local type to define a plugin definition
type FLBPlugin C.struct_flb_plugin_proxy
type FLBOutPlugin C.struct_flbgo_output_plugin
// When the FLBPluginInit is triggered by Fluent Bit, a plugin context
// is passed and the next step is to invoke this FLBPluginRegister() function
// to fill the required information: type, proxy type, flags name and
// description.
func FLBPluginRegister(ctx unsafe.Pointer, name string, desc string) int {
p := (*FLBPlugin) (unsafe.Pointer(ctx))
p._type = FLB_PROXY_OUTPUT_PLUGIN
p.proxy = FLB_PROXY_GOLANG
p.flags = 0
p.name = C.CString(name)
p.description = C.CString(desc)
return 0
}
// Release resources allocated by the plugin initialization
func FLBPluginUnregister(ctx unsafe.Pointer) {
p := (*FLBPlugin) (unsafe.Pointer(ctx))
fmt.Printf("[flbgo] unregistering %v
", p)
C.free(unsafe.Pointer(p.name))
C.free(unsafe.Pointer(p.description))
}
func FLBPluginConfigKey(ctx unsafe.Pointer, key string) string {
_key := C.CString(key)
return C.GoString(C.output_get_property(_key, unsafe.Pointer(ctx)))
}
主要是定义了一些编写插件需要用到的变量和方法,例如FLBPluginRegister注册组件,FLBPluginConfigKey获取配置文件设定参数等。
PS
实际上用golang调用fluent-bit-go,再加一些实际的业务逻辑实现,最终编译成一个c-share的.so动态链接库。
实际上,fluent-bit v0.13版本以后就提供了kafka output的插件,但是实际项目中,并不满足我们的需求,必须定制化。
当然接下来的代码主要是作为一个demo,讲清楚如何编写一个output插件。
先上代码:
</>复制代码
package main
import (
"C"
"fmt"
"io"
"log"
"reflect"
"strconv"
"strings"
"time"
"unsafe"
"github.com/Shopify/sarama"
"github.com/fluent/fluent-bit-go/output"
"github.com/ugorji/go/codec"
)
var (
brokers []string
producer sarama.SyncProducer
timeout = 0 * time.Minute
topic string
module string
messageKey string
)
//export FLBPluginRegister
func FLBPluginRegister(ctx unsafe.Pointer) int {
return output.FLBPluginRegister(ctx, "out_kafka", "Kafka Output Plugin.!")
}
//export FLBPluginInit
// ctx (context) pointer to fluentbit context (state/ c code)
func FLBPluginInit(ctx unsafe.Pointer) int {
if bs := output.FLBPluginConfigKey(ctx, "brokers"); bs != "" {
brokers = strings.Split(bs, ",")
} else {
log.Printf("you must set brokers")
return output.FLB_ERROR
}
if tp := output.FLBPluginConfigKey(ctx, "topics"); tp != "" {
topic = tp
} else {
log.Printf("you must set topics")
return output.FLB_ERROR
}
if mo := output.FLBPluginConfigKey(ctx, "module"); mo != "" {
module = mo
} else {
log.Printf("you must set module")
return output.FLB_ERROR
}
if key := output.FLBPluginConfigKey(ctx, "message_key"); key != "" {
messageKey = key
} else {
log.Printf("you must set message_key")
return output.FLB_ERROR
}
config := sarama.NewConfig()
config.Producer.Return.Successes = true
if required_acks := output.FLBPluginConfigKey(ctx, "required_acks"); required_acks != "" {
if acks, err := strconv.Atoi(required_acks); err == nil {
config.Producer.RequiredAcks = sarama.RequiredAcks(acks)
}
}
if compression_codec := output.FLBPluginConfigKey(ctx, "compression_codec"); compression_codec != "" {
if codec, err := strconv.Atoi(compression_codec); err == nil {
config.Producer.Compression = sarama.CompressionCodec(codec)
}
}
if max_retry := output.FLBPluginConfigKey(ctx, "max_retry"); max_retry != "" {
if max_retry, err := strconv.Atoi(max_retry); err == nil {
config.Producer.Retry.Max = max_retry
}
}
if timeout == 0 {
timeout = 5 * time.Minute
}
// If Kafka is not running on init, wait to connect
deadline := time.Now().Add(timeout)
for tries := 0; time.Now().Before(deadline); tries++ {
var err error
if producer == nil {
producer, err = sarama.NewSyncProducer(brokers, config)
}
if err == nil {
return output.FLB_OK
}
log.Printf("Cannot connect to Kafka: (%s) retrying...", err)
time.Sleep(time.Second * 30)
}
log.Printf("Kafka failed to respond after %s", timeout)
return output.FLB_ERROR
}
//export FLBPluginFlush
// FLBPluginFlush is called from fluent-bit when data need to be sent. is called from fluent-bit when data need to be sent.
func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
var h codec.MsgpackHandle
var b []byte
var m interface{}
var err error
b = C.GoBytes(data, length)
dec := codec.NewDecoderBytes(b, &h)
// Iterate the original MessagePack array
var msgs []*sarama.ProducerMessage
for {
// decode the msgpack data
err = dec.Decode(&m)
if err != nil {
if err == io.EOF {
break
}
log.Printf("Failed to decode msgpack data: %v
", err)
return output.FLB_ERROR
}
// Get a slice and their two entries: timestamp and map
slice := reflect.ValueOf(m)
data := slice.Index(1)
// Convert slice data to a real map and iterate
mapData := data.Interface().(map[interface{}]interface{})
flattenData, err := Flatten(mapData, "", UnderscoreStyle)
if err != nil {
break
}
message := ""
host := ""
for k, v := range flattenData {
value := ""
switch t := v.(type) {
case string:
value = t
case []byte:
value = string(t)
default:
value = fmt.Sprintf("%v", v)
}
if k == "pod_name" {
host = value
}
if k == messageKey {
message = value
}
}
if message == "" || host == "" {
break
}
m := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(fmt.Sprintf("host=%s|module=%s", host, module)),
Value: sarama.ByteEncoder(message),
}
msgs = append(msgs, m)
}
err = producer.SendMessages(msgs)
if err != nil {
log.Printf("FAILED to send kafka message: %s
", err)
return output.FLB_ERROR
}
return output.FLB_OK
}
//export FLBPluginExit
func FLBPluginExit() int {
producer.Close()
return output.FLB_OK
}
func main() {
}
FLBPluginExit 插件退出的时候需要执行的一些方法,比如关闭连接。
FLBPluginRegister 注册插件
FLBPluginInit 插件初始化
FLBPluginFlush flush到数据到output
FLBPluginConfigKey 获取配置文件中参数
PS
当然除了FLBPluginConfigKey之外,也可以通过获取环境变量来获得设置参数。
ctx相当于一个上下文,负责之间的数据的传递。
编译的时候
</>复制代码
go build -buildmode=c-shared -o out_kafka.so .
生成out_kafka.so
执行的时候
总结</>复制代码
/fluent-bit/bin/fluent-bit" -c /fluent-bit/etc/fluent-bit.conf -e /fluent-bit/out_kafka.so
采用类似的编写结构,就可以定制化自己的输出插件了。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/32693.html
摘要:采用实现的插件前言目前社区日志采集和处理的组件不少,之前方案中的,社区中的,方案中的以及大数据用到比较多的。适合采用的方案,实现日志中心化收集的方案。主要负责采集,负责处理和传送。 采用golang实现Fluent Bit的output插件 前言 目前社区日志采集和处理的组件不少,之前elk方案中的logstash,cncf社区中的fluentd,efk方案中的filebeat,以及大...
摘要:是一个开源和多平台的,它允许您从不同的来源收集数据日志,统一并将它们发送到多个目的地。例如日志收集日志分析主要讲部署的集群。日志主要有和的日志,一般采用部署,自然而然就是要支持格式日志的采集。业务落盘的日志。部署方案采取部署。 前言 收集日志的组件多不胜数,有ELK久负盛名组合中的logstash, 也有EFK组合中的filebeat,更有cncf新贵fluentd,另外还有大数据领域...
摘要:是一个开源和多平台的,它允许您从不同的来源收集数据日志,统一并将它们发送到多个目的地。例如日志收集日志分析主要讲部署的集群。日志主要有和的日志,一般采用部署,自然而然就是要支持格式日志的采集。业务落盘的日志。部署方案采取部署。 前言 收集日志的组件多不胜数,有ELK久负盛名组合中的logstash, 也有EFK组合中的filebeat,更有cncf新贵fluentd,另外还有大数据领域...
摘要:我推荐你使用进行日志收集,将作为的出口。集群目前暂时没有提供日志查看机制。以如下的形式启动容器,容器日志将发往配置的。 【作者barnett】本文介绍了k8s官方提供的日志收集方法,并介绍了Fluentd日志收集器并与其他产品做了比较。最后介绍了好雨云帮如何对k8s进行改造并使用ZeroMQ以消息的形式将日志传输到统一的日志处理中心。 容器日志存在形式 目前容器日志有两种输出形式: ...
摘要:我推荐你使用进行日志收集,将作为的出口。集群目前暂时没有提供日志查看机制。以如下的形式启动容器,容器日志将发往配置的。 【作者barnett】本文介绍了k8s官方提供的日志收集方法,并介绍了Fluentd日志收集器并与其他产品做了比较。最后介绍了好雨云帮如何对k8s进行改造并使用ZeroMQ以消息的形式将日志传输到统一的日志处理中心。 容器日志存在形式 目前容器日志有两种输出形式: ...
阅读 2417·2023-04-25 23:15
阅读 2098·2021-11-22 09:34
阅读 1656·2021-11-15 11:39
阅读 1040·2021-11-15 11:37
阅读 2329·2021-10-14 09:43
阅读 3585·2021-09-27 13:59
阅读 1587·2019-08-30 15:43
阅读 3595·2019-08-30 15:43