婷婷综合国产,91蜜桃婷婷狠狠久久综合9色 ,九九九九九精品,国产综合av

主頁 > 知識庫 > golang連接kafka消費進ES操作

golang連接kafka消費進ES操作

熱門標簽:杭州人工智能電銷機器人費用 赤峰電銷 谷歌美發店地圖標注 利用地圖標注位置 400開頭電話怎樣申請 地圖區域圖標注后導出 江蘇呼叫中心外呼系統有效果嗎 貴州電話智能外呼系統 官渡電銷外呼管理系統怎么收費

1.首先初始化conf配置把kafka和ES的地址配置好還有一個日志方便查看

配置信息如下 用到的庫是

github.com/astaxie/beego/config
[logs]
log_level = debug
log_path = "./logs/log_transfer.log"
[kafka]
server_addr = 192.168.0.134:9092
topic = nginx_log
[ES]
addr = http://192.168.0.134:9200/

2.讀取conf配置存取進結構體

type LogConfig struct {
  kafkaAddr string
  ESAddr string
  LogPath string
  LogLevel string
  Topic string
}
var (
   logConfig *LogConfig
)

3.讀取conf配置代碼如下

func initConfig(conftype string,filename string)(err error) {
   conf, err := config. NewConfig(conftype,filename)
   if err != nil {
    fmt. Println( "new config faild,err:",err)
     return
  }
   logConfig = LogConfig{}
   logConfig.LogLevel = conf. String( "logs::log_level")
   if len(logConfig.LogLevel) == 0 {
     logConfig.LogLevel = "debug"
  }
   logConfig.LogPath = conf. String( "logs::log_path")
   if len(logConfig.LogPath) == 0 {
     logConfig.LogPath = "./logs"
  }
   logConfig.kafkaAddr = conf. String( "kafka::server_addr")
   if len(logConfig.kafkaAddr) == 0 {
     err = fmt. Errorf( "invalid kafka addr err")
     return
  }
   logConfig.ESAddr = conf. String( "ES::addr")
   if len(logConfig.ESAddr) == 0 {
     err = fmt. Errorf( "invalid ES addr err")
     return
  }
   logConfig.Topic = conf. String( "kafka::topic")
   if len(logConfig.Topic) == 0 {
     err = fmt. Errorf( "invalid topic addr err")
     return
  }
   return
}

4.完成了initConfig的初始化

5.初始化initLogger

func convertLogLevel(level string) int {
   switch(level) {
     case "debug":
       return logs.LevelDebug
     case "warn":
       return logs.LevelWarn
     case "info":
       return logs.LevelInfo
     case "trace":
       return logs.LevelTrace
  }
   return logs.LevelDebug
}
func initLogger(logpath string, logLevel string) (err error) {
   config := make( map[ string] interface{})
  config[ "filename"] = logpath
  config[ "level"] = convertLogLevel(logLevel)
   configStr, err := json. Marshal(config)
   if err!= nil {
    fmt. Println( "marshal failed,err:",err)
     return
  }
  logs. SetLogger(logs.AdapterFile, string(configStr))
   return
}

6.初始化kafka

type KafkaClient struct {
  client sarama.Consumer
  addr string
  topic string
  wg sync.WaitGroup
}
var (
   kafkaClient *KafkaClient
)
func initKafKa(addr string,topic string)(err error) {
   kafkaClient = KafkaClient{}
   consumer, err := sarama. NewConsumer(strings. Split(addr, ","), nil)
   if err != nil {
    logs. Error( "Failed to strat consumer :",err)
     return 
  }
   kafkaClient.client = consumer
   kafkaClient.addr = addr
   kafkaClient.topic = topic
   return
}

7.初始化ES

gopkg.in/olivere/elastic.v2 // 這個是操作ES的庫

type LogMessage struct {
  App string
  Topic string
  Message string
}
var (
   esClient *elastic.Client
)
func initES(addr string)(err error) {
  
   client, err := elastic. NewClient(elastic. SetSniff( false),elastic. SetURL(addr))
   if err != nil {
    fmt. Println( "connect es error",err)
     return
  }
   esClient = client
   return
}

8.干活把kafka的數據寫入ES

github.com/Shopify/sarama 這個是操作kafka的驅動庫

func run()(err error) {
  fmt. Println( "run")
   partitionList, err := kafkaClient.client. Partitions(kafkaClient.topic)
   if err != nil {
    logs. Error( "ini failed ,err:%v",err)
    fmt. Printf( "ini failed ,err:%v",err)
     return
  }
   for partition := range partitionList {
    fmt. Println( "for進入")
     pc, errRet := kafkaClient.client. ConsumePartition(kafkaClient.topic, int32(partition),sarama.OffsetNewest)
     if errRet != nil {
       err = errRet
      logs. Error( "Failed to start consumer for partition %d: %s \n ",partition,err)
      fmt. Printf( "Failed to start consumer for partition %d: %s \n ",partition,err)
       return
    }
     defer pc. AsyncClose()
    fmt. Println( "馬上進入協程")
    kafkaClient.wg. Add( 1)
     go func(pc sarama.PartitionConsumer){
      fmt. Println( "進來了")
      
       for msg := range pc. Messages() {
        fmt. Println( "func執行")
        logs. Debug( "Partition:%d,Offset:%d,key:%s,value:%s",msg.Partition,msg.Offset, string(msg.Key), string(msg.Value))
         //fmt.Println()
         err = sendToES(kafkaClient.topic,msg.Value)
         if err != nil {
          logs. Warn( "send to es failed,err:%v",err)
        }
      }
      kafkaClient.wg. Done()
    }(pc)    
  }
  kafkaClient.wg. Wait()
  fmt. Println( "協程執行完畢")
   return
}

上面代碼是讀kafka消費數據通過sendToES這個函數發送至ES里面

sendToES代碼如下

func sendToES(topic string,data [] byte) (err error) {
   msg := LogMessage{}
   msg.Topic = topic
   msg.Message = string(data)
   _, err = esClient. Index().
       Index(topic).
       Type(topic).
       BodyJson(msg).
       Do()
     if err != nil {
      
       return
    }
   return
}

Index就是索引名稱

index().type().bodyjson().do()這樣的寫法是鏈式執行操作

9.寫完了基本操作后 再寫一個模擬寫入數據進kafka的數據 代碼如下

func main() {
config := sarama. NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
client, err := sarama. NewSyncProducer([] string{ "127.0.0.1:9092"}, config)
if err != nil {
fmt. Println( "producer close,err:", err)
return
}
defer client. Close()
var n int= 0
for {
n++
msg := sarama.ProducerMessage{}
msg.Topic = "nginx_log"
msg.Value = sarama. StringEncoder( "this is a good test,hello maomaoChong!!," + strconv. Itoa(n))
pid, offset, err := client. SendMessage(msg)
if err != nil {
fmt. Println( "send message failed,", err)
return
}
fmt. Printf( "pid:%v offset:%v \n ", pid, offset)
time. Sleep(time.Second * 2)
}
}

這個就是生產者往kafka里面寫入數據進去消費

10.我們啟動我們的kafka 注意kafka依賴于zookeeper 先啟動ZK然后啟動kafka

我這里用的是zookeeper-3.4.12網上有下載

啟動ZK

ZK已經成功啟動

11.啟動kafka 我這里是kafka_2.11-1.1.0

.\bin\windows\kafka-server-start.bat .\config\server.properties

kafka已經跑起來了

12.把kafka消費測試端也啟動

.\bin\windows\kafka-console-consumer.bat --topic nginx_log --zookeeper 127.0.0.1 2181

消費端啟動成功 一直等待數據進來消費

13.然后我們把ES 和Kib 都啟動了

這是我們的ES版本是5.5.1的 已經跑起來了 接著啟動我們的kib

kib里面有個配置config下面的叫kibana.yml里面設置好ES的地址和端口就處于監聽ES狀態

啟動kib有點慢 稍微等一下就好

此時啟動好了kib

14.測試kib是否啟動

默認地址是http://localhost:5601

進入成功 確定沒問題

15.編譯我們的代碼 寫數據進kafka

從上面看我們知道一個再寫 一個再消費

16.編譯運行我們把kafka寫入進ES里面的代碼

運行了 這里就把kafka消費的數據 寫入進ES里面

17.我們看一下數據是否有 進入kib

我們看到有數據了 已經成功了~

以上為個人經驗,希望能給大家一個參考,也希望大家多多支持腳本之家。如有錯誤或未考慮完全的地方,望不吝賜教。

您可能感興趣的文章:
  • Golang 實現Thrift客戶端連接池方式
  • golang 通過ssh代理連接mysql的操作
  • 淺談golang結構體偷懶初始化
  • golang實現各種情況的get請求操作
  • Golang 實現分片讀取http超大文件流和并發控制
  • 在Golang中使用http.FileServer返回靜態文件的操作
  • golang 生成定單號的操作

標簽:宜春 武漢 保定 河池 松原 黔西 泰安 鷹潭

巨人網絡通訊聲明:本文標題《golang連接kafka消費進ES操作》,本文關鍵詞  golang,連接,kafka,消費,進,;如發現本文內容存在版權問題,煩請提供相關信息告之我們,我們將及時溝通與處理。本站內容系統采集于網絡,涉及言論、版權與本站無關。
  • 相關文章
  • 下面列出與本文章《golang連接kafka消費進ES操作》相關的同類信息!
  • 本頁收集關于golang連接kafka消費進ES操作的相關信息資訊供網民參考!
  • 推薦文章
    婷婷综合国产,91蜜桃婷婷狠狠久久综合9色 ,九九九九九精品,国产综合av
    一区二区三区中文字幕精品精品| 久久精品国产精品青草| 无码av中文一区二区三区桃花岛| 激情六月婷婷久久| 蜜臀av一区二区在线观看| 2017欧美狠狠色| 一区二区三区在线免费播放| 国产电影精品久久禁18| 日韩三级中文字幕| 亚洲一区二区3| 91视频一区二区三区| 国产精品久久久久久久久搜平片| 美腿丝袜亚洲综合| 欧美一区二区久久久| 亚洲chinese男男1069| 在线观看一区二区视频| 亚洲欧美另类综合偷拍| 99久久99久久免费精品蜜臀| 欧美国产成人精品| 国产91精品入口| 中文字幕精品一区二区三区精品| 九九**精品视频免费播放| 欧美本精品男人aⅴ天堂| 蜜臀久久99精品久久久久久9| 欧美日韩一区二区在线观看| 亚洲小说欧美激情另类| 欧美日韩一区 二区 三区 久久精品| 国产精品国产精品国产专区不片 | 91免费版pro下载短视频| 亚洲国产精品av| 成人污污视频在线观看| 一区在线播放视频| 日韩欧美国产综合| 色婷婷av久久久久久久| 91成人免费在线| 国产亚洲va综合人人澡精品| 亚洲国产一区在线观看| 成人天堂资源www在线| 88在线观看91蜜桃国自产| 中文字幕av资源一区| 日本女优在线视频一区二区| 色婷婷综合久久久久中文 | 亚洲成人一区二区| 国产乱妇无码大片在线观看| 欧美日韩国产影片| 中文在线一区二区| 国产另类ts人妖一区二区| 6080午夜不卡| 亚洲成人777| 国产精品美日韩| 欧美中文字幕一二三区视频| 日本vs亚洲vs韩国一区三区 | 国产日产欧产精品推荐色| 成人av网在线| 午夜欧美在线一二页| 日本在线观看不卡视频| 色国产综合视频| 国产亚洲综合av| 一区二区三区四区精品在线视频| 国产精品国产成人国产三级| 国产成人免费9x9x人网站视频| 欧日韩精品视频| 国产精品免费久久| 亚洲高清视频在线| 91在线你懂得| 综合分类小说区另类春色亚洲小说欧美| 亚洲三级理论片| 偷拍一区二区三区| 日本中文字幕一区二区视频 | 天天操天天干天天综合网| 日韩精品一级中文字幕精品视频免费观看| 99视频精品在线| 久久精品国产澳门| 精品日韩在线一区| 韩国午夜理伦三级不卡影院| 欧美四级电影网| 亚洲视频一区在线| 国产成人在线网站| 不卡欧美aaaaa| 91热门视频在线观看| 日韩亚洲欧美一区| 亚洲日本免费电影| 国产精品成人免费| 99精品偷自拍| 亚洲国产欧美日韩另类综合| 欧美高清一级片在线| 蜜桃视频在线观看一区二区| 精品国产sm最大网站| 精品国内二区三区| 成人免费毛片app| 国产福利精品一区二区| 免费在线观看不卡| 天天亚洲美女在线视频| 亚洲国产综合人成综合网站| 17c精品麻豆一区二区免费| 久久久久久久久岛国免费| 欧美一区二区三区系列电影| 欧美性生活影院| 91国内精品野花午夜精品| av一区二区三区黑人| 国产成人av影院| 国产69精品久久久久777| 国产一区二区三区高清播放| 久久99精品久久久久婷婷| 麻豆精品精品国产自在97香蕉| 午夜电影一区二区| 欧美日韩三级一区二区| 亚洲地区一二三色| 久久久99免费| 欧美自拍丝袜亚洲| 国产.欧美.日韩| 日韩精品三区四区| 国产精品传媒入口麻豆| 欧美不卡一区二区| 欧美亚洲尤物久久| 成人黄色小视频| 极品美女销魂一区二区三区| 亚洲一级电影视频| 国产精品日日摸夜夜摸av| 欧美一激情一区二区三区| 色爱区综合激月婷婷| 成人午夜在线免费| 国产综合色精品一区二区三区| 亚洲国产欧美在线| 一区二区高清免费观看影视大全| 久久精品视频一区| 欧美不卡激情三级在线观看| 欧美人xxxx| 精品视频999| 欧美这里有精品| 91在线免费视频观看| 成人性生交大片免费看中文网站| 久久成人久久鬼色| 精品国产99国产精品| 欧美成人一区二区| 国产精品国产三级国产普通话蜜臀| 亚洲精品亚洲人成人网在线播放| 婷婷中文字幕一区三区| 国产高清在线精品| 色94色欧美sute亚洲13| 日韩精品一区二区在线观看| 国产精品久久久久久久久快鸭 | 成人午夜在线免费| 欧美日韩色综合| 久久久久久久av麻豆果冻| 国产盗摄一区二区| 色综合网站在线| 精品人在线二区三区| 亚洲日本成人在线观看| 日本三级亚洲精品| 91蜜桃免费观看视频| 精品国免费一区二区三区| 亚洲精品视频免费观看| 韩国女主播成人在线| 在线观看亚洲精品| 国产亚洲一区二区三区| 日欧美一区二区| 91在线观看污| 国产亚洲婷婷免费| 奇米在线7777在线精品| 一本大道av伊人久久综合| 精品日韩成人av| 午夜欧美视频在线观看| fc2成人免费人成在线观看播放 | 91九色最新地址| 国产亚洲精久久久久久| 麻豆久久久久久久| 欧美日韩国产精品成人| 亚洲美腿欧美偷拍| 成人h精品动漫一区二区三区| 日韩一卡二卡三卡四卡| 亚洲不卡一区二区三区| 91麻豆国产精品久久| 欧美国产精品专区| 国产精品一区二区在线播放| 日韩欧美123| 日韩高清在线电影| 欧美疯狂做受xxxx富婆| 亚洲国产成人精品视频| 欧美性欧美巨大黑白大战| 黑人精品欧美一区二区蜜桃| 不卡av在线免费观看| 欧美福利视频一区| 国产精品九色蝌蚪自拍| 美女高潮久久久| 在线免费观看日本欧美| 久久精品视频在线免费观看| 午夜精品免费在线| 99国内精品久久| 久久久久久一二三区| 亚洲电影视频在线| 99久免费精品视频在线观看| 精品少妇一区二区三区视频免付费 | 欧美日韩美少妇| 国产欧美一区二区精品久导航 | 久久久777精品电影网影网| 亚洲一二三专区| 色综合久久久久| 久久久久久电影| 精品一区二区三区在线播放|