婷婷综合国产,91蜜桃婷婷狠狠久久综合9色 ,九九九九九精品,国产综合av

主頁 > 知識庫 > nginx lua集成kafka的實現方法

nginx lua集成kafka的實現方法

熱門標簽:百度地圖標注公司位置要多少錢 虛假地圖標注 地圖標注如何改成微信號 承德地圖標注公司 400電話號碼辦理多少錢 山東企業外呼系統公司 靈圖uu電子寵物店地圖標注 濮陽好的聯通400電話申請 地圖標注黃河的位置

第一步:進入opresty目錄

[root@node03 openresty]# cd /export/servers/openresty/
[root@node03 openresty]# ll
total 356
drwxr-xr-x 2 root root  4096 Jul 26 11:33 bin
drwxrwxr-x 44 1000 1000  4096 Jul 26 11:31 build
drwxrwxr-x 43 1000 1000  4096 Nov 13 2017 bundle
-rwxrwxr-x 1 1000 1000 45908 Nov 13 2017 configure
-rw-rw-r-- 1 1000 1000 22924 Nov 13 2017 COPYRIGHT
drwxr-xr-x 6 root root  4096 Jul 26 11:33 luajit
drwxr-xr-x 6 root root  4096 Aug 1 08:14 lualib
-rw-r--r-- 1 root root  5413 Jul 26 11:32 Makefile
drwxr-xr-x 11 root root  4096 Jul 26 11:35 nginx
drwxrwxr-x 2 1000 1000  4096 Nov 13 2017 patches
drwxr-xr-x 44 root root  4096 Jul 26 11:33 pod
-rw-rw-r-- 1 1000 1000  3689 Nov 13 2017 README.markdown
-rw-rw-r-- 1 1000 1000  8690 Nov 13 2017 README-win32.txt
-rw-r--r-- 1 root root 218352 Jul 26 11:33 resty.index
drwxr-xr-x 5 root root  4096 Jul 26 11:33 site
drwxr-xr-x 2 root root  4096 Aug 1 10:54 testlua
drwxrwxr-x 2 1000 1000  4096 Nov 13 2017 util
[root@node03 openresty]# 

說明:接下來我們關注兩個目錄 lualib 和 nginx

1.lualib: 是存放opresty所需要的集成軟件包的

2.nginx: 是nginx服務目錄

接下來,我們進入lualib目錄一看究竟:

[root@node03 openresty]# cd lualib/
[root@node03 lualib]# ll
total 116
-rwxr-xr-x 1 root root 101809 Jul 26 11:33 cjson.so
drwxr-xr-x 3 root root  4096 Jul 26 11:33 ngx
drwxr-xr-x 2 root root  4096 Jul 26 11:33 rds
drwxr-xr-x 2 root root  4096 Jul 26 11:33 redis
drwxr-xr-x 9 root root  4096 Aug 1 10:34 resty

這里我們看到了redis和ngx集成軟件包,說明我們可以之間使用nginx和redis而無需導入任何依賴包!!!!

下面看看resty里面有些說明呢????

[root@node03 lualib]# cd resty/
[root@node03 resty]# ll
total 152
-rw-r--r-- 1 root root 6409 Jul 26 11:33 aes.lua
drwxr-xr-x 2 root root 4096 Jul 26 11:33 core
-rw-r--r-- 1 root root  596 Jul 26 11:33 core.lua
drwxr-xr-x 2 root root 4096 Jul 26 11:33 dns
drwxr-xr-x 2 root root 4096 Aug 1 10:42 kafka  #這是我們自己導入的
drwxr-xr-x 2 root root 4096 Jul 26 11:33 limit
-rw-r--r-- 1 root root 4616 Jul 26 11:33 lock.lua
drwxr-xr-x 2 root root 4096 Jul 26 11:33 lrucache
-rw-r--r-- 1 root root 4620 Jul 26 11:33 lrucache.lua
-rw-r--r-- 1 root root 1211 Jul 26 11:33 md5.lua
-rw-r--r-- 1 root root 14544 Jul 26 11:33 memcached.lua
-rw-r--r-- 1 root root 21577 Jul 26 11:33 mysql.lua
-rw-r--r-- 1 root root  616 Jul 26 11:33 random.lua
-rw-r--r-- 1 root root 9227 Jul 26 11:33 redis.lua
-rw-r--r-- 1 root root 1192 Jul 26 11:33 sha1.lua
-rw-r--r-- 1 root root 1045 Jul 26 11:33 sha224.lua
-rw-r--r-- 1 root root 1221 Jul 26 11:33 sha256.lua
-rw-r--r-- 1 root root 1045 Jul 26 11:33 sha384.lua
-rw-r--r-- 1 root root 1359 Jul 26 11:33 sha512.lua
-rw-r--r-- 1 root root  236 Jul 26 11:33 sha.lua
-rw-r--r-- 1 root root  698 Jul 26 11:33 string.lua
-rw-r--r-- 1 root root 5178 Jul 26 11:33 upload.lua
drwxr-xr-x 2 root root 4096 Jul 26 11:33 upstream
drwxr-xr-x 2 root root 406 Jul 26 11:33 websocket

這里我們看到了熟悉的mysql.lua和redis.lua,好了其他的先不要管

注意:這里的 kafka 這個包是沒有的,說明opnresty么有集成kafka。此處我已經提前導入啦kafka集成包

我們看看kafka里面多有哪些包:

[root@node03 resty]# cd kafka
[root@node03 kafka]# ll
total 48
-rw-r--r-- 1 root root 1369 Aug 1 10:42 broker.lua
-rw-r--r-- 1 root root 5537 Aug 1 10:42 client.lua
-rw-r--r-- 1 root root  710 Aug 1 10:42 errors.lua
-rw-r--r-- 1 root root 10718 Aug 1 10:42 producer.lua
-rw-r--r-- 1 root root 4072 Aug 1 10:42 request.lua
-rw-r--r-- 1 root root 2118 Aug 1 10:42 response.lua
-rw-r--r-- 1 root root 1494 Aug 1 10:42 ringbuffer.lua
-rw-r--r-- 1 root root 4845 Aug 1 10:42 sendbuffer.lua

附上 kafka 集成包:kafka_jb51.rar

第二步:創建kafka測試lua文件

1.退回到openresty

[root@node03 kafka]# cd /export/servers/openresty/

2.創建測試文件

[root@node03 openresty]# mkdir -r testlua
#這里文件名自己取,文件位置自己定,但必須找得到

這里文件名自己取,文件位置自己定,但必須找得到!!!!!!!!!!!下面會用到!!!!!!!!!!

3.進入剛剛創建的文件夾并創建kafkalua.lua腳本文件

創建文件:vim kafkalua.lua或者touch kafkalua.lua

[root@node03 openresty]# cd testlua/
[root@node03 testlua]# ll
total 8
-rw-r--r-- 1 root root 3288 Aug 1 10:54 kafkalua.lua

kafkalua.lua:

--測試語句可以不用
ngx.say('hello kafka file configuration successful!!!!!!')

--數據采集閾值限制,如果lua采集超過閾值,則不采集
local DEFAULT_THRESHOLD = 100000
-- kafka分區數
local PARTITION_NUM = 6
-- kafka主題名稱
local TOPIC = 'B2CDATA_COLLECTION1'
-- 輪詢器共享變量KEY值
local POLLING_KEY = "POLLING_KEY"
-- kafka集群(定義kafka broker地址,ip需要和kafka的host.name配置一致)
local function partitioner(key, num, correlation_id)
  return tonumber(key)
end
--kafka broker列表
local BROKER_LIST = {{host="192.168.52.100",port=9092},{host="192.168.52.110",port=9092},{host="192.168.52.120",port=9092}}
--kafka參數,
local CONNECT_PARAMS = { producer_type = "async", socket_timeout = 30000, flush_time = 10000, request_timeout = 20000, partitioner = partitioner }
-- 共享內存計數器,用于kafka輪詢使用
local shared_data = ngx.shared.shared_data
local pollingVal = shared_data:get(POLLING_KEY)
if not pollingVal then
  pollingVal = 1
  shared_data:set(POLLING_KEY, pollingVal)
end
--獲取每一條消息的計數器,對PARTITION_NUM取余數,均衡分區
local partitions = '' .. (tonumber(pollingVal) % PARTITION_NUM)
shared_data:incr(POLLING_KEY, 1)

-- 并發控制
local isGone = true
--獲取ngx.var.connections_active進行過載保護,即如果當前活躍連接數超過閾值進行限流保護
if tonumber(ngx.var.connections_active) > tonumber(DEFAULT_THRESHOLD) then
  isGone = false
end
-- 數據采集
if isGone then

  local time_local = ngx.var.time_local
  if time_local == nil then
    time_local = ""
  end

  local request = ngx.var.request
  if request == nil then
    request = ""
  end

  local request_method = ngx.var.request_method
  if request_method == nil then
    request_method = ""
  end

  local content_type = ngx.var.content_type
  if content_type == nil then
    content_type = ""
  end
  ngx.req.read_body()
  local request_body = ngx.var.request_body
  if request_body == nil then
    request_body = ""
  end

  local http_referer = ngx.var.http_referer
  if http_referer == nil then
    http_referer = ""
  end

  local remote_addr = ngx.var.remote_addr
  if remote_addr == nil then
    remote_addr = ""
  end

  local http_user_agent = ngx.var.http_user_agent
  if http_user_agent == nil then
    http_user_agent = ""
  end

  local time_iso8601 = ngx.var.time_iso8601
  if time_iso8601 == nil then
    time_iso8601 = ""
  end

  local server_addr = ngx.var.server_addr
  if server_addr == nil then
    server_addr = ""
  end

  local http_cookie = ngx.var.http_cookie
  if http_cookie == nil then
    http_cookie = ""
  end
--封裝數據
  local message = time_local .."#CS#".. request .."#CS#".. request_method .."#CS#".. content_type .."#CS#".. request_body .."#CS#".. http_referer .."#CS#".. remote_addr .."#CS#".. http_user_agent .."#CS#".. time_iso8601 .."#CS#".. server_addr .."#CS#".. http_cookie;
--引入kafka的producer
local producer = require "resty.kafka.producer"
--創建producer
local bp = producer:new(BROKER_LIST, CONNECT_PARAMS)
--發送數據
local ok, err = bp:send(TOPIC, partitions, message)
--打印錯誤日志
  if not ok then
    ngx.log(ngx.ERR, "kafka send err:", err)
    return
  end
end

第三步:修改nginx配置文件nginx.conf

1.進入ngin/conf目錄

[root@node03 openresty]# cd /export/servers/openresty/nginx/conf/
[root@node03 conf]# ll
total 76
-rw-r--r-- 1 root root 1077 Jul 26 11:33 fastcgi.conf
-rw-r--r-- 1 root root 1077 Jul 26 11:33 fastcgi.conf.default
-rw-r--r-- 1 root root 1007 Jul 26 11:33 fastcgi_params
-rw-r--r-- 1 root root 1007 Jul 26 11:33 fastcgi_params.default
-rw-r--r-- 1 root root 2837 Jul 26 11:33 koi-utf
-rw-r--r-- 1 root root 2223 Jul 26 11:33 koi-win
-rw-r--r-- 1 root root 5170 Jul 26 11:33 mime.types
-rw-r--r-- 1 root root 5170 Jul 26 11:33 mime.types.default
-rw-r--r-- 1 root root 3191 Aug 1 10:52 nginx.conf
-rw-r--r-- 1 root root 2656 Jul 26 11:33 nginx.conf.default
-rw-r--r-- 1 root root 636 Jul 26 11:33 scgi_params
-rw-r--r-- 1 root root 636 Jul 26 11:33 scgi_params.default
-rw-r--r-- 1 root root 664 Jul 26 11:33 uwsgi_params
-rw-r--r-- 1 root root 664 Jul 26 11:33 uwsgi_params.default
-rw-r--r-- 1 root root 3610 Jul 26 11:33 win-utf

2.修改nginx.conf

[root@node03 conf]# vim nginx.conf

    #1.說明找到第一個server
    #2.在server上面添加兩行代碼如下
    #3.在server里面添加kafka相關的代碼如下
    
    
#------------------添加的代碼---------------------------------------
 #開啟共享字典,設置內存大小為10M,供每個nginx的線程消費
 lua_shared_dict shared_data 10m;
 #配置本地域名解析
 resolver 127.0.0.1;
#------------------添加的代碼---------------------------------------

 server {
    listen    80;
    server_name localhost;

    #charset koi8-r;

    #access_log logs/host.access.log main;
    location / {
      root  html;
      index index.html index.htm;
    }

    #------------------添加的代碼---------------------------------------
    location /kafkalua { #這里的kafkalua就是工程名字,不加默認為空
      #開啟nginx監控
      stub_status on;
      #加載lua文件
      default_type text/html;
      #指定kafka的lua文件位置,就是我們剛才創建的kafkalua.lua(前面已經強調要記住的!!!!)
      content_by_lua_file /export/servers/openresty/testlua/kafkalua.lua;
    }
    #------------------添加的代碼---------------------------------------
}

說明:location /kafkalua{...}這里的kafkalua是工程名,可以隨意取也可以不取,但是必須要記住!!!

看到我們上面配置了兩個location,第一個為location /{...}第二個為location /kafkalua{...}那么他們有什么區別呢???先向下看,迷霧將會慢慢揭開。

第四步:啟動nginx

1.進入nginx/sbin

[root@node03 sbin]# cd /export/servers/openresty/nginx/sbin/
[root@node03 sbin]# ll
total 16356
-rwxr-xr-x 1 root root 16745834 Jul 26 11:33 nginx

2.測試配置文件是否正確

[root@node03 sbin]# nginx -t
nginx: the configuration file /export/servers/openresty/nginx/conf/nginx.conf syntax is ok
nginx: configuration file /export/servers/openresty/nginx/conf/nginx.conf test is successful
#看到已經成功啦

3.啟動nginx

[root@node03 sbin]# nginx
#不顯示任何東西一般是成功啦

4.查看nginx是否啟動成功

[root@node03 sbin]# ps -ef | grep nginx
root    3730   1 0 09:24 ?    00:00:00 nginx: master process nginx
nobody   3731  3730 0 09:24 ?    00:00:20 nginx: worker process is shutting down
nobody   5766  3730 0 12:17 ?    00:00:00 nginx: worker process
root    5824  3708 0 12:24 pts/1  00:00:00 grep nginx
#看到有兩個nginx進程,表示成功le

5.瀏覽器訪問nginx

在瀏覽器輸入:node03/kafkalua

說明:如何么有配置hosts則輸入openresty所在設備的地址如:192.168.52.120/kafkalua

在瀏覽器輸入:node03/或者 192.168.52.120/

再在瀏覽器輸入:node03:80/kafkalua 和 node03:80/試試 搬來nginx.conf來看看:

node03:80/kafkalua 這里的nide03是服務器的別名或者之間寫文服務器地址,80是【listen 80;】配置的監聽端口,80端口可以省略不寫,如果這寫成【listen 8088;】那么瀏覽器需輸入 node03:8088/kafkalua (這里不能省略8088),kafkalua是工程名。

 server {
    listen    80;
    server_name localhost;

    #charset koi8-r;

    #access_log logs/host.access.log main;
    location / {
      root  html;
      index index.html index.htm;
    }

    #------------------添加的代碼---------------------------------------
    location /kafkalua { #這里的kafkalua就是工程名字,不加默認為空
      #開啟nginx監控
      stub_status on;
      #加載lua文件
      default_type text/html;
      #指定kafka的lua文件位置,就是我們剛才創建的kafkalua.lua(前面已經強調要記住的!!!!)
      content_by_lua_file /export/servers/openresty/testlua/kafkalua.lua;
    }

第五步:創建測試爬蟲程序

1.創建maven工程導入依賴

 <dependencies>
    <dependency>
      <groupId>org.jsoup</groupId>
      <artifactId>jsoup</artifactId>
      <version>1.11.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.httpcomponents</groupId>
      <artifactId>httpclient</artifactId>
      <version>4.5.4</version>
    </dependency>
  </dependencies>

2.偽爬蟲程序

public class SpiderGoAirCN {
  private static String basePath = "http://node03/kafkalua";
  public static void main(String[] args) throws Exception {
    for (int i = 0; i < 50000; i++) {
      // 請求查詢信息
      spiderQueryao();
      // 請求html
      spiderHtml();
      // 請求js
      spiderJs();
      // 請求css
      spiderCss();
      // 請求png
      spiderPng();
      // 請求jpg
      spiderJpg();
      Thread.sleep(100);
    }
  }

  /**
   * 
   * @throws Exception
   */
  public static void spiderQueryao() throws Exception {
    // 1.指定目標網站   ^.*/B2C40/query/jaxb/direct/query.ao.*$
    String url = basePath + "/B2C40/query/jaxb/direct/query.ao";
    // 2.發起請求
    HttpPost httpPost = new HttpPost(url);
    // 3. 設置請求參數
    httpPost.setHeader("Time-Local", getLocalDateTime());
    httpPost.setHeader("Requst",
          "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
    httpPost.setHeader("Request Method", "POST");
    httpPost.setHeader("Content-Type",
        "application/x-www-form-urlencoded; charset=UTF-8");
    httpPost.setHeader(
        "Referer",
        "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1="
            + getGoTime() + "&at=1&ct=0&it=0");
    httpPost.setHeader("Remote Address", "192.168.56.80");
    httpPost.setHeader(
        "User-Agent",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
    httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
    httpPost.setHeader("Server Address", "243.45.78.132");
    httpPost.setHeader(
        "Cookie",
        "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D"
            + getGoTime()
            + "%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1("
            + getGoTime() + ")");
    // 4.設置請求參數
    ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
    parameters
        .add(new BasicNameValuePair(
            "json",
            "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
    httpPost.setEntity(new UrlEncodedFormEntity(parameters));
    // 5. 發起請求
    CloseableHttpClient httpClient = HttpClients.createDefault();
    CloseableHttpResponse response = httpClient.execute(httpPost);
    // 6.獲取返回值
    System.out.println(response != null);
  }

  public static void spiderHtml() throws Exception {
    // 1.指定目標網站     ^.*html.*$
    String url = basePath + "/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=CTU&d1=2018-01-17&at=1&ct=0&it=0";
    // 2.發起請求
    HttpPost httpPost = new HttpPost(url);
    // 3. 設置請求參數
    httpPost.setHeader("Time-Local", getLocalDateTime());
    httpPost.setHeader("Requst",
        "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
    httpPost.setHeader("Request Method", "POST");
    httpPost.setHeader("Content-Type",
        "application/x-www-form-urlencoded; charset=UTF-8");
    httpPost.setHeader(
        "Referer",
        "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0");
    httpPost.setHeader("Remote Address", "192.168.56.1");
    httpPost.setHeader(
        "User-Agent",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
    httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
    httpPost.setHeader("Server Address", "192.168.56.80");
    httpPost.setHeader(
        "Cookie",
        "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
    // 4.設置請求參數
    // httpPost.setEntity(new StringEntity(
    // "depcity=CAN&arrcity=WUH&flightdate=20180220&adultnum=1&childnum=0&infantnum=0&cabinorder=0&airline=1&flytype=0&international=0&action=0&segtype=1&cache=0&preUrl=&isMember="));
    ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
    parameters
        .add(new BasicNameValuePair(
            "json",
            "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
    httpPost.setEntity(new UrlEncodedFormEntity(parameters));
    // 5. 發起請求
    CloseableHttpClient httpClient = HttpClients.createDefault();
    CloseableHttpResponse response = httpClient.execute(httpPost);
    // 6.獲取返回值
    System.out.println(response != null);
  }

  public static void spiderJs() throws Exception {

    // 1.指定目標網站
    String url = basePath +"/B2C40/dist/main/modules/common/requireConfig.js";
    // 2.發起請求
    HttpPost httpPost = new HttpPost(url);
    // 3. 設置請求參數
    httpPost.setHeader("Time-Local", getLocalDateTime());
    httpPost.setHeader("Requst",
        "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
    httpPost.setHeader("Request Method", "POST");
    httpPost.setHeader("Content-Type",
        "application/x-www-form-urlencoded; charset=UTF-8");
    httpPost.setHeader(
        "Referer",
        "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0");
    httpPost.setHeader("Remote Address", "192.168.56.1");
    httpPost.setHeader(
        "User-Agent",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
    httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
    httpPost.setHeader("Server Address", "192.168.56.80");
    httpPost.setHeader(
        "Cookie",
        "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
    // 4.設置請求參數
    ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
    parameters
        .add(new BasicNameValuePair(
            "json",
            "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
    httpPost.setEntity(new UrlEncodedFormEntity(parameters));
    // 5. 發起請求
    CloseableHttpClient httpClient = HttpClients.createDefault();
    CloseableHttpResponse response = httpClient.execute(httpPost);
    // 6.獲取返回值
    System.out.println(response != null);
  }

  public static void spiderCss() throws Exception {

    // 1.指定目標網站
    String url = basePath +"/B2C40/dist/main/css/flight.css";
    // 2.發起請求
    HttpPost httpPost = new HttpPost(url);
    // 3. 設置請求參數
    httpPost.setHeader("Time-Local", getLocalDateTime());
    httpPost.setHeader("Requst",
        "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
    httpPost.setHeader("Request Method", "POST");
    httpPost.setHeader("Content-Type",
        "application/x-www-form-urlencoded; charset=UTF-8");
    httpPost.setHeader("Referer",
        "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html");
    httpPost.setHeader("Remote Address", "192.168.56.1");
    httpPost.setHeader(
        "User-Agent",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
    httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
    httpPost.setHeader("Server Address", "192.168.56.80");
    httpPost.setHeader(
        "Cookie",
        "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
    // 4.設置請求參數
    ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
    parameters
        .add(new BasicNameValuePair(
            "json",
            "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
    httpPost.setEntity(new UrlEncodedFormEntity(parameters));
    // 5. 發起請求
    CloseableHttpClient httpClient = HttpClients.createDefault();
    CloseableHttpResponse response = httpClient.execute(httpPost);
    // 6.獲取返回值
    System.out.println(response != null);
  }

  public static void spiderPng() throws Exception {

    // 1.指定目標網站
    String url =basePath + "/B2C40/dist/main/images/common.png";
    // 2.發起請求
    HttpPost httpPost = new HttpPost(url);
    // 3. 設置請求參數
    httpPost.setHeader("Time-Local", getLocalDateTime());
    httpPost.setHeader("Requst",
        "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
    httpPost.setHeader("Request Method", "POST");
    httpPost.setHeader("Content-Type",
        "application/x-www-form-urlencoded; charset=UTF-8");
    httpPost.setHeader(
        "Referer",
        "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0");
    httpPost.setHeader("Remote Address", "192.168.56.1");
    httpPost.setHeader(
        "User-Agent",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
    httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
    httpPost.setHeader("Server Address", "192.168.56.80");
    httpPost.setHeader(
        "Cookie",
        "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
    // 4.設置請求參數
    ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
    parameters
        .add(new BasicNameValuePair(
            "json",
            "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
    httpPost.setEntity(new UrlEncodedFormEntity(parameters));
    // 5. 發起請求
    CloseableHttpClient httpClient = HttpClients.createDefault();
    CloseableHttpResponse response = httpClient.execute(httpPost);
    // 6.獲取返回值
    System.out.println(response != null);
  }

  public static void spiderJpg() throws Exception {

    // 1.指定目標網站
    String url = basePath +"/B2C40/dist/main/images/loadingimg.jpg";
    // 2.發起請求
    HttpPost httpPost = new HttpPost(url);
    // 3. 設置請求參數
    httpPost.setHeader("Time-Local", getLocalDateTime());
    httpPost.setHeader("Requst",
        "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
    httpPost.setHeader("Request Method", "POST");
    httpPost.setHeader("Content-Type",
        "application/x-www-form-urlencoded; charset=UTF-8");
    httpPost.setHeader(
        "Referer",
        "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0");
    httpPost.setHeader("Remote Address", "192.168.56.1");
    httpPost.setHeader(
        "User-Agent",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
    httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
    httpPost.setHeader("Server Address", "192.168.56.80");
    httpPost.setHeader(
        "Cookie",
        "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
    // 4.設置請求參數
    ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
    parameters
        .add(new BasicNameValuePair(
            "json",
            "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
    httpPost.setEntity(new UrlEncodedFormEntity(parameters));
    // 5. 發起請求
    CloseableHttpClient httpClient = HttpClients.createDefault();
    CloseableHttpResponse response = httpClient.execute(httpPost);
    // 6.獲取返回值
    System.out.println(response != null);
  }

  public static String getLocalDateTime() {
    DateFormat df = new SimpleDateFormat("dd/MMM/yyyy'T'HH:mm:ss +08:00",
        Locale.ENGLISH);
    String nowAsISO = df.format(new Date());
    return nowAsISO;

  }

  public static String getISO8601Timestamp() {
    DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss+08:00");
    String nowAsISO = df.format(new Date());
    return nowAsISO;
  }

  public static String getGoTime() {
    DateFormat df = new SimpleDateFormat("yyyy-MM-dd");
    String nowAsISO = df.format(new Date());
    return nowAsISO;
  }

  public static String getBackTime() {
    Date date = new Date();// 取時間
    Calendar calendar = new GregorianCalendar();
    calendar.setTime(date);
    calendar.add(calendar.DATE, +1);// 把日期往前減少一天,若想把日期向后推一天則將負數改為正數
    date = calendar.getTime();
    SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
    String dateString = formatter.format(date);
    return dateString;
  }
}

第六步:啟動kafka

1.創建主題topic

[root@node01 bin]# kafka-topics.sh --zookeeper node01:2181 --partitions 3 
--replication-factor 3 --create --topic B2CDATA_COLLECTION1

2.開啟kafka消費者

[root@node01 bin]# kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 
--topic B2CDATA_COLLECTION1

第七步:開啟爬蟲程序并觀察結果

1.啟動爬蟲程序

2.觀察消費者窗口如下

第八步:啟動kafka-manager觀察

1.啟動kafka-manager

[root@node01 conf]# cd /export/servers/kafka-manager-1.3.3.23/bin/
[root@node01 bin]# ll
total 36
-rwxr-xr-x 1 root root 13747 May 1 06:27 kafka-manager
-rw-r--r-- 1 root root 9975 May 1 06:27 kafka-manager.bat
-rwxr-xr-x 1 root root 1383 May 1 06:27 log-config
-rw-r--r-- 1 root root  105 May 1 06:27 log-config.bat
[root@node01 bin]# 

#啟動
[root@node01 bin]# ./kafka-manager 

啟動后的窗口:

2.瀏覽器訪問

瀏覽器輸入:node01:9000

kafka manager使用不做講解,觀察B2CDATA_COLLECTION1主題消費情況:

​ 有三個分區,每個分區消費的消息差多說明成功啦,

​ 如果不一樣,則是kafkalua.lua 腳本中沒有配置分區策略,默認分區會導致 數據傾斜 我們需配置自己的分區策略!

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。

標簽:安康 泰安 福州 德宏 淮安 鷹潭 上海 樂山

巨人網絡通訊聲明:本文標題《nginx lua集成kafka的實現方法》,本文關鍵詞  nginx,lua,集成,kafka,的,實現,;如發現本文內容存在版權問題,煩請提供相關信息告之我們,我們將及時溝通與處理。本站內容系統采集于網絡,涉及言論、版權與本站無關。
  • 相關文章
  • 下面列出與本文章《nginx lua集成kafka的實現方法》相關的同類信息!
  • 本頁收集關于nginx lua集成kafka的實現方法的相關信息資訊供網民參考!
  • 推薦文章
    婷婷综合国产,91蜜桃婷婷狠狠久久综合9色 ,九九九九九精品,国产综合av
    国产98色在线|日韩| 亚洲日本丝袜连裤袜办公室| 91国偷自产一区二区三区观看| 国产精品白丝jk黑袜喷水| 奇米一区二区三区av| 午夜影院久久久| 日韩在线卡一卡二| 日韩avvvv在线播放| 麻豆精品新av中文字幕| 国产一区不卡精品| 成人一区二区三区视频在线观看| 国产精品性做久久久久久| 国产91在线观看| 91日韩在线专区| 欧美日韩中字一区| 欧美日韩aaa| 精品国产乱码久久| 欧美激情在线看| 伊人婷婷欧美激情| 日本少妇一区二区| 国产精品一区二区久久不卡 | 色综合久久综合中文综合网| 色一情一乱一乱一91av| 欧美日韩国产系列| 久久久蜜桃精品| 亚洲精品日日夜夜| 美女视频黄久久| av在线不卡免费看| 欧美一区二区视频在线观看| 久久久国产精品午夜一区ai换脸| |精品福利一区二区三区| 午夜电影一区二区| 国产传媒欧美日韩成人| 欧美午夜一区二区三区免费大片| 日韩一级大片在线观看| 中文字幕一区在线观看| 免费一区二区视频| 91美女片黄在线| 久久综合九色综合欧美98 | 一区二区三区小说| 免费成人在线视频观看| 91老司机福利 在线| 日韩视频在线你懂得| 亚洲免费观看高清完整版在线观看| 日韩精品电影在线| 色婷婷精品大在线视频| 久久久综合网站| 日韩黄色一级片| 色综合久久六月婷婷中文字幕| 日韩一区二区三区免费看 | 99视频国产精品| 精品国产免费人成电影在线观看四季 | 国模大尺度一区二区三区| 在线观看国产日韩| 国产精品视频你懂的| 老司机一区二区| 91麻豆精品国产91久久久久 | 中文字幕一区二区视频| 精品一区二区三区免费观看 | 亚洲欧美精品午睡沙发| 国产又粗又猛又爽又黄91精品| 欧美亚洲日本国产| 中文字幕一区二区三| 国产精品1024久久| 精品国产91乱码一区二区三区 | 精品成人一区二区三区| 午夜精品久久久久影视| 色婷婷综合久久久久中文一区二区 | 91视频精品在这里| 国产精品丝袜一区| 懂色av中文字幕一区二区三区| 欧美成人午夜电影| 看片的网站亚洲| 精品国产乱码91久久久久久网站| 激情五月播播久久久精品| 精品久久国产字幕高潮| 久久国产日韩欧美精品| 日韩精品一区二区三区swag| 蜜桃视频免费观看一区| 日韩欧美黄色影院| 国精品**一区二区三区在线蜜桃| 日韩精品一区二区三区中文精品| 日本最新不卡在线| 精品国产亚洲一区二区三区在线观看| 老司机精品视频线观看86| 日韩久久久久久| 国产精品一色哟哟哟| 国产精品午夜春色av| 99久久精品久久久久久清纯| 亚洲啪啪综合av一区二区三区| 在线视频欧美区| 日韩高清不卡在线| 精品国产乱码久久久久久图片| 国产高清精品在线| 亚洲图片激情小说| 欧美日本在线看| 麻豆成人久久精品二区三区小说| 精品国产免费一区二区三区四区| 成人一区二区三区在线观看| 亚洲日本在线天堂| 欧美狂野另类xxxxoooo| 国产一区二区三区免费播放| 国产精品麻豆久久久| 欧美在线一区二区三区| 蜜臀a∨国产成人精品| 欧美激情一区二区在线| 欧美日韩激情一区| 国产一区二区伦理| 亚洲黄色在线视频| 日韩精品一区在线观看| 成人av影视在线观看| 亚洲午夜免费视频| 国产三级久久久| 欧美日韩成人综合在线一区二区| 国产精品一区二区无线| 亚洲成a人v欧美综合天堂下载 | 亚洲精品高清在线| 日韩欧美国产午夜精品| av福利精品导航| 精品在线视频一区| 午夜国产精品影院在线观看| 中文字幕一区二区三区四区| 欧美videos中文字幕| 欧洲激情一区二区| 成人av网站在线观看免费| 美女高潮久久久| 亚洲香肠在线观看| 最新日韩av在线| 久久九九久久九九| 日韩欧美国产一区二区在线播放| 91久久香蕉国产日韩欧美9色| 国产成人高清在线| 国内成人自拍视频| 精品在线免费观看| 奇米一区二区三区av| 亚洲国产中文字幕在线视频综合| 国产亚洲精久久久久久| 欧美tickling网站挠脚心| 欧美日韩精品三区| 在线观看亚洲专区| 91啪九色porn原创视频在线观看| 国产91精品精华液一区二区三区 | 一区二区久久久久久| 中文字幕亚洲在| 国产精品国产精品国产专区不蜜| xnxx国产精品| 久久久国产午夜精品| 久久午夜电影网| 久久精品一区二区三区四区| 久久女同性恋中文字幕| 精品黑人一区二区三区久久| 欧美变态口味重另类| 26uuu精品一区二区| 国产日韩在线不卡| 国产精品久久一级| 亚洲老司机在线| 亚洲一区二区三区激情| 午夜国产不卡在线观看视频| 午夜日韩在线观看| 久久99精品国产.久久久久| 狠狠色丁香久久婷婷综合_中| 国产一区视频在线看| 成人白浆超碰人人人人| 一本色道久久综合亚洲aⅴ蜜桃| 91网上在线视频| 欧美男女性生活在线直播观看| 91精品国产综合久久精品麻豆| 欧美一级搡bbbb搡bbbb| 久久综合色婷婷| 中文字幕一区二区三区不卡| 亚洲国产成人精品视频| 久久国内精品自在自线400部| 国产一区二区三区综合| 99久久婷婷国产综合精品 | 精东粉嫩av免费一区二区三区| 国产丶欧美丶日本不卡视频| 色综合色综合色综合色综合色综合| 欧美三级三级三级爽爽爽| 日韩欧美国产系列| 国产精品视频免费看| 亚洲综合另类小说| 久久99九九99精品| 91亚洲永久精品| 91精品国产欧美一区二区18 | 美女精品一区二区| 99re热视频这里只精品| 欧美二区在线观看| 中文字幕精品一区二区三区精品| 亚洲国产日韩一级| 国产aⅴ综合色| 日韩一区二区在线免费观看| 国产精品国产三级国产aⅴ无密码| 天堂资源在线中文精品| 成人动漫在线一区| 精品免费视频.| 偷窥少妇高潮呻吟av久久免费| 风间由美一区二区av101| 欧美精品1区2区3区| 中文字幕在线播放不卡一区| 韩国精品免费视频|