前情回顾
前文我们完成了kafka消费逻辑实现,并将消息放入elasticsearch,然后通过kibana可视化工具查看我们的日志。
本节目标
前文只是完成了kafka消息消费以及放入elastic,这次将项目完善,使其支持热更新,就是当config.yaml中监控的日志改变,或者etcd数据有改变时,动态启动协程监控新增日志,关闭取消的日志监控协程。
新增变量控制协程自启动
kafkaconsumer.go中新增如下代码
var topicMap map[string]map[int32]*TopicData
var topicSet map[string]bool
var etcd_topicSet map[string]bool
var etcd_topicMap map[string]map[int32]*TopicData
var topicChan chan *TopicPart
var etcd_topicChan chan *TopicPart
var consumer_list []sarama.Consumer
var etcdcli *clientv3.Client
func init() {
topicMap = make(map[string]map[int32]*TopicData)
etcd_topicMap = make(map[string]map[int32]*TopicData)
topicSet = make(map[string]bool)
etcd_topicSet = make(map[string]bool)
topicChan = make(chan *TopicPart, 20)
etcd_topicChan = make(chan *TopicPart, 20)
consumer_list = make([]sarama.Consumer, 0, 20)
}
topicMap用来存储config.yaml中直接记录的日志topic以及协程参数,etcd_topicMap用来记录etcd中记录的topic以及协程参数。 topicSet用来记录config.yaml中直接记录的日志topic, etcd_topicSet用来记录etcd中记录的topic。 topicChan当监控日志写入elastic的协程异常崩溃时,通过该chan返回topic信息,然后我们通过topicMap找到topic对应的协程重启。 etcd_topicChan协程根据etcd的保存的topic,监控etlastic处理,如果该协程崩溃,则通过etcd_topicMap中查找topic对应的协程重启。 consumer_list保存了kafka消费者列表。 etcdcli时etcd的客户端,用来处理etcd读写。
根据config中配置的日志topic生成set
func ConstructTopicSet() map[string]bool {
topicSetTmp := make(map[string]bool)
configtopics, _ := logconfig.ReadConfig(logconfig.InitVipper(), "collectlogs")
if configtopics == nil {
goto CONFTOPIC
}
for _, configtopic := range configtopics.([]interface{}) {
confmap := configtopic.(map[interface{}]interface{})
for key, val := range confmap {
if key.(string) == "logtopic" {
topicSetTmp[val.(string)] = true
}
}
}
CONFTOPIC:
return topicSetTmp
}
根据config中配置的etcd键值获取val,然后获取topic生成set
etcdconsumer.go中通过GetTopicSet从etcd中读取topic生生set
func GetTopicSet(cli *clientv3.Client) (interface{}, error) {
etcdKeys, etcdres := logconfig.ReadConfig(logconfig.InitVipper(), "etcdkeys")
if !etcdres {
fmt.Println("read config etcdkeys failed")
return nil, errors.New("read config etcdkeys failed")
}
fmt.Println(reflect.TypeOf(etcdKeys))
topicSet := make(map[string]bool)
for _, keyval := range etcdKeys.([]interface{}) {
ctxtime, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctxtime, keyval.(string))
cancel()
if err != nil {
fmt.Println("get failed, err:", err)
continue
}
for _, ev := range resp.Kvs {
fmt.Printf("%s : %s ...\n", ev.Key, ev.Value)
etcdLogConf := make([]*etcdlogconf.EtcdLogConf, 0, 20)
unmarsherr := json.Unmarshal(ev.Value, &etcdLogConf)
if unmarsherr != nil {
fmt.Println("unmarshal error !, error is ", unmarsherr)
continue
}
for _, etcdval := range etcdLogConf {
topicSet[etcdval.Topic] = true
}
}
}
return topicSet, nil
}
将集合转化为map,并配置协程然后启动
func ConvertSet2Map(consumer sarama.Consumer, topicSet map[string]bool,
topicMaps map[string]map[int32]*TopicData, topic_chan chan *TopicPart) {
for key, _ := range topicSet {
partitionList, err := consumer.Partitions(key)
if err != nil {
fmt.Println("get consumer partitions failed")
fmt.Println("error is ", err.Error())
continue
}
for partition := range partitionList {
pc, err := consumer.ConsumePartition(key, int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Println("consume partition error is ", err.Error())
continue
}
// defer pc.AsyncClose()
topicData := new(TopicData)
topicData.Ctx, topicData.Cancel = context.WithCancel(context.Background())
topicData.KafConsumer = pc
topicData.TPartition = new(TopicPart)
topicData.TPartition.Partition = int32(partition)
topicData.TPartition.Topic = key
_, okm := topicMaps[key]
if !okm {
topicMaps[key] = make(map[int32]*TopicData)
}
topicMaps[key][int32(partition)] = topicData
go PutIntoES(topicData, topic_chan)
}
}
}
从kafka中读取消息,并调用上面的函数启动协程监控es
从kafka中读取信息,然后根据配置生成set和map,启动协程监控es
func ConsumeTopic(consumer sarama.Consumer) {
ConvertSet2Map(consumer, topicSet, topicMap, topicChan)
ConvertSet2Map(consumer, etcd_topicSet, etcd_topicMap, etcd_topicChan)
//监听配置文件
ctx, cancel := context.WithCancel(context.Background())
pathChan := make(chan interface{})
etcdChan := make(chan interface{})
go logconfig.WatchConfig(ctx, logconfig.InitVipper(), pathChan, etcdChan)
defer func(cancel context.CancelFunc) {
consumer_once.Do(func() {
if err := recover(); err != nil {
fmt.Println("consumer main goroutine panic, ", err)
}
cancel()
})
}(cancel)
for {
select {
//检测监控路径的协程崩溃,重启
case topicpart := <-topicChan:
fmt.Printf("receive goroutine exited, topic is %s, partition is %d\n",
topicpart.Topic, topicpart.Partition)
//重启消费者读取数据的协程
val, ok := topicMap[topicpart.Topic]
if !ok {
continue
}
tp, ok := val[topicpart.Partition]
if !ok {
continue
}
tp.Ctx, tp.Cancel = context.WithCancel(context.Background())
go PutIntoES(tp, topicChan)
//检测etcd配置解析后,监控路径的协程崩溃,重启
case topicpart := <-etcd_topicChan:
fmt.Printf("receive goroutine exited, topic is %s, partition is %d\n",
topicpart.Topic, topicpart.Partition)
//重启消费者读取数据的协程
val, ok := etcd_topicMap[topicpart.Topic]
if !ok {
continue
}
tp, ok := val[topicpart.Partition]
if !ok {
continue
}
tp.Ctx, tp.Cancel = context.WithCancel(context.Background())
go PutIntoES(tp, etcd_topicChan)
//检测vipper监控返回配置的更新
case pathchange, ok := <-pathChan:
if !ok {
fmt.Println("vipper watch goroutine exited")
goto LOOPEND
}
//fmt.Println(pathchange)
topicSetTemp := make(map[string]bool)
for _, chval := range pathchange.([]interface{}) {
for logkey, logval := range chval.(map[interface{}]interface{}) {
if logkey.(string) == "logtopic" {
topicSetTemp[logval.(string)] = true
}
}
}
UpdateTopicLogRoutine(topicSetTemp)
//fmt.Println(topicSetTemp)
case etcdchange, ok := <-etcdChan:
if !ok {
fmt.Println("vipper watch goroutine extied")
goto LOOPEND
}
fmt.Println(etcdchange)
topicsetTemp, err := etcdconsumer.GetTopicSet(etcdcli)
if err != nil {
continue
}
UpdateEtcdTopicLogRoutine(topicsetTemp.(map[string]bool))
}
}
LOOPEND:
fmt.Printf("for exited ")
}
go logconfig.WatchConfig 启动协程,调用vipper监控配置,当配置有更新时消费者协程处理更新的topic。 同时支持子协程异常崩溃时,消费者协程重启该协程。
通过kibana看到的日志信息
kibana中ManageMent管理,然后新增index,elastic的index我们设置的是topic,所以我们新建几个index, etcd_log, golang_log, logdir2。 然后kibana中可以看到这几个index的日志信息了
源码下载
https://github.com/secondtonone1/golang-/tree/master/logcatchsys 感谢关注我的公众号