Skip to content

Senders

Sun Jianbo edited this page Oct 25, 2017 · 12 revisions

Senders的主要作用是将Parser后的数据发送至Sender支持的各类服务,目前支持发送到的服务包括: Pandora、ElasticSearch、File、InfluxDB、MongoDB五种服务。

  1. Pandora Sender: 发送到Pandora(七牛大数据处理平台)。
  2. ElasticSearch Sender: 发送到ElasticSearch。
  3. File Sender: 发送到本地文件。
  4. InfluxDB Sender: 发送到InfluxDB。
  5. MongoDB Accumulate Sender: 聚合后发送到MongoDB。

除了上述五种发送到服务之外,logkit还支持一种发送到本地磁盘进行数据发送预处理的模式(fault_tolerant设置为true,ft_strategy设置为always_save),进行了fault_tolerant模式设置后,数据的reader/parser就和发送异步,数据可以持续读取并解析进入磁盘队列,sender则可以多线程发送,可以有效提升发送效率,并发发送数据。

典型配置如下

"senders":[{
        "name":"pandora_sender",
        "sender_type":"pandora",
        "pandora_ak":"",
        "pandora_sk":"",
        "pandora_host":"https://pipeline.qiniu.com",
        "pandora_repo_name":"yourRepoName",
        "pandora_region":"nb",
        "pandora_schema":"field1 pandora_field1,field2,field3 pandora_field3",
        "fault_tolerant":"true",
        "ft_sync_every":"5",
        "ft_save_log_path":"./ft_log",
        "ft_write_limit":"1",
        "ft_strategy":"always_save",
        "ft_procs":"2"
}]
  1. name: 是sender的标识
  2. sender_type: sender类型,支持file, mongodb_acc, pandora, influxdb
  3. fault_tolerant: 是否用异步容错方式进行发送,默认为false。
  4. ft_save_log_path: 当fault_tolerant为true时候必填。该路径必须为文件夹,该文件夹会作为本地磁盘队列,存放数据,进行异步容错发送。
  5. ft_sync_every:当fault_tolerant为true时候必填。多少次发送数据会记录一次本地磁盘队列的offset。
  6. ft_write_limit:选填,为了避免速率太快导致磁盘压力加大,可以根据系统情况自行限定写入本地磁盘的速率,单位MB/s。默认10MB/s
  7. ft_strategy: 选填,该选项设置为backup_only的时候,数据不经过本地队列直接发送到下游,设为always_save时则所有数据会先发送到本地队列,选concurrent的时候会直接并发发送,不经过队列。无论该选项设置什么,失败的数据都会加入到重试队列中异步循环重试。默认选项为always_save
  8. ft_procs :该选项表示从本地队列获取数据点并向下游发送的并发数,如果ft_strategy设置为backup_only,则本项设置无效,只有本地队列有数据时,该项配置才有效,默认并发数为1.
  9. ft_memory_channel: 选填,默认为"false",不启用。开启该选项会使用memory channel作为 fault_tolerant 中disk queue的替代,相当于把fault_tolerant 作为一个队列使用,使得发送和数据读取解析变为异步,加速整个发送的过程。但是使用 memory channel 数据不落磁盘,会有数据丢失的风险。该功能适合与 ft_procs 连用,利用内存队列,异步后,在发送端多并发加速。
  10. ft_memory_channel_size: 选填,默认为"100",单位为batch,也就是100代表100个待发送的批次,当ft_memory_channel启用时有效,设置memory channel的大小。 注意:该选项设置的大小表达的是队列中可存储的元素个数,并不是占用的内存大小

补充说明

  • 设置fault_tolerant为"true"时,会维持一个本地队列缓存起需要发送的数据。当数据发送失败的时候会在本地队列进行重试,此时如果发送错误,不会影响logkit继续收集日志。
  • 设置fault_tolerant为"true"时,可以保证每次服务重启的时候都从上次的发送offset继续进行发送。在parse过程中产生中间结果,需要高容错性发送的时候最适合采用此种模式。
  • 设置fault_tolerant为"true"时,一般希望日志收集程序对机器性能影响较小的时候,建议首先考虑将ft_strategy设置为backup_only,配置这个选项会首先尝试发送数据,发送失败的数据才放到备份队列等待下次重试。如果还希望更小的性能影响,并且数据敏感性不高,也可以不使用fault_tolerant模式。
  • 当日志发送的速度已经赶不上日志生产速度时,设置fault_tolerant为"true",且ft_strategy设置为concurrent,通过设置ft_procs加大并发,ft_procs设置越大并发度越高,发送越快,对机器性能带来的影响也越大。
  • 如果ft_procs增加已经不能再加大发送日志速度,那么就需要 加大ft_write_limit限制,为logkit 的队列提升磁盘的读写速度。
  • senders支持多个sender配置,但是我们不推荐在senders中加入多个sender,因为一旦某个sender发送缓慢,就会导致其他sender等待这个sender发送成功后再发。简单来说,配置多个sender会互相影响。

如何添加更多Sender(自定义Sender)?

Sender的主要作用是将队列中的数据发送至Sender支持的各类服务,实现logkit的sender仅需实现如下接口:

type Sender interface {
	Name() string
	Send([]Data) error
	Close() error
}

其中包括三个函数,Name()标识Sender名称,Send()发送数据,Close()关闭一些服务连接等常规操作。

实现一个Sender的注意事项

  1. 多线程发送:多线程发送可以充分利用CPU多核的能力,提升发送效率,这一点我们已经设计了ft sender作为框架解决了该问题。
  2. 错误处理与等待:服务端偶尔出现一些异常是很正常的事情,此时就要做好不同错误情况的处理,不会因为某个错误而导致程序出错,另外一方面,一旦发现出错应该让sender等待一定时间再发送,设定一个对后端友好的变长错误等待机制也非常重要。一般情况下,可以采用随着连续错误出现递增等待时间的方法,直到一个最顶峰(如10s),就不再增加,当服务端恢复后再取消等待。
  3. 数据压缩发送:带宽是非常珍贵的资源,通常服务端都会提供gzip压缩的数据接收接口,而sender利用这些接口,将数据压缩后发送,能节省大量带宽成本。
  4. 带宽限流:通常情况下数据收集工具只是机器上的一个附属程序,主要资源如带宽还是要预留给主服务,所以限制sender的带宽用量也是非常重要的功能,限流的方法就可以采用前面Channel一节提到的令牌桶算法。
  5. 字段填充(UUID/timestamp):通常情况下收集的数据信息可能不是完备的,需要填充一些信息进去,如全局唯一的UUID、代表收集时间的timestamp等字段,提供这些字段自动填充的功能,有利于用户对其数据做唯一性、时效性等方面的判断。
  6. 字段别名:解析后的字段名称中经常会出现一些特殊字符,如"$","@"等符号,如果发送的服务端不支持这些特殊字符,就需要提供一些重命名的功能,将这些字段映射到一个别的名称。
  7. 字段筛选:未必解析后的所有字段数据都需要发送,这时候提供一个字段筛选的功能,可以方便用户选择去掉一些无用的字段,也可以节省传输的成本。也可以在Transformer中也提供类似discard transformer的功能,将某个字段去掉。
  8. 类型转换:类型转换是一个说来简单但是做起来非常繁琐的事情,不只是纯粹的整型转换成浮点型,或者字符串转成整型这么简单,还涉及到你发送到的服务端支持的一些特殊类型,如date时间类型等,更多的类型转换实际上相当于最佳实践,能够做好这些类型转换,就会让用户体验得到极大提升。
  9. 简单、简单、简单:除了上述这些,剩下的就是尽可能的让用户使用简单。比如假设我们要写一个 mysql sender,mysql的数据库和表如果不存在,可能数据会发送失败,那就可以考虑提前创建;又比如数据如果有更新了,那么就需要将针对这些更新的字段去更新服务的Schema等等。

注册Sender

与Parser类似,实现完毕的Sender注意要注册进SenderRegistry中,如下所示:

func NewSenderRegistry() *SenderRegistry {
	ret := &SenderRegistry{
		senderTypeMap: map[string]func(conf.MapConf) (Sender, error){},
	}
	ret.RegisterSender(TypeFile, NewFileSender)
	ret.RegisterSender(TypePandora, NewPandoraSender)
	ret.RegisterSender(TypeMongodbAccumulate, NewMongodbAccSender)
	ret.RegisterSender(TypeInfluxdb, NewInfluxdbSender)
	ret.RegisterSender(TypeElastic, NewElasticSender)
	ret.RegisterSender(TypeDiscard, NewDiscardSender)

       // ret.RegisterSender(TypeMyNewSender, NewMyNewSender)

	return ret
}
Clone this wiki locally