Skip to content

Elastic知识体系

ELK系列

Centos7安装与配置ELK7.2

# 导入私钥
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

# ElasticSearch
rpm -ivh elasticsearch-7.2.0-x86_64.rpm
systemctl enable elasticsearch.service
systemctl start elasticsearch.service

# Kibana
rpm -ivh kibana-7.2.0-x86_64.rpm
systemctl enable kibana.service
systemctl start kibana.service

# Logstash
rpm -ivh logstash-7.2.0.rpm
# 可以以服务启动logstash,但是不建议这么做

配置ElasticSearch

  • vim /etc/elasticsearch/elasticsearch.yml
# 基础的配置选项
cluster.name: bloodzer0 # 集群名字
node.name: node-1 # 节点名字
node.attr.rack: r1 # 节点属性

path.data: /var/lib/elasticsearch # 数据路径
path.logs: /var/log/elasticsearch # 日志路径

bootstrap.memory_lock: true # 内存锁定,禁用虚拟内存

network.host: 0.0.0.0 # 绑定主机
http.port: 9200 # 绑定端口

discovery.seed_hosts: ["elk-server"] # 发现主机
cluster.initial_master_nodes: ["node-1"] # 主节点

gateway.recover_after_nodes: 3 # 直到N个节点启动恢复集群
action.destructive_requires_name: true # 删除索引需要显示名称
  • vim /etc/elasticsearch/jvm.options
# 设置堆大小,一般不超过物理存储的50%
-Xms1g
-Xmx1g

# 堆转储路径
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/var/lib/elasticsearch

# jdk * GC logging

# JVM错误日志
-XX:ErrorFile=/var/log/elasticsearch/hs_err_pid%p.log

查看ElasticSearch集群

  • 集群概述:http://10.10.10.15:9200/
  • 集群健康性:http://10.10.10.15:9200/_cat/health?v
# green: 所有主分片和从分片都可用
# yellow: 所有主分片可用,但存在不可用的从分片
# red: 某些数据由于某种原因不可用
  • 查看索引:http://10.10.10.15:9200/_cat/indices?v

配置ElasticSearch Template

参考:https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html

  • 创建Template
curl -X PUT "10.10.10.15:9200/_template/template_name" -H 'Content-Type: application/json' -d'
{
  "index_patterns": ["te*", "bar*"],
  "settings": {
    "number_of_shards": 1
  },
  "mappings": {
    "_source": {
      "enabled": false
    },
    "properties": {
      "host_name": {
        "type": "keyword"
      },
      "created_at": {
        "type": "date",
        "format": "EEE MMM dd HH:mm:ss Z yyyy"
      }
    }
  }
}
'
  • 修改Template:与创建template一致。
  • 删除Template
curl -X DELETE "10.10.10.15:9200/_template/template_name"

配置ElasticSearch Mapping

参考:https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html

配置ElasticSearch索引

参考:https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html

  • 关闭/开启索引
curl -X POST "10.10.10.15:9200/my_index/_close"

curl -X POST "10.10.10.15:9200/my_index/_open"
  • 定期清理索引
import os
import datetime

day_ago = (datetime.datetime.now() - datetime.timedelta(days=3)).strftime("%Y.%m.%d")
command = 'curl -XDELETE "http://10.10.10.15/index_name-%s"' % day_ago
os.system(command)

ElasticSearch常用插件

  • Head

配置Kibana

vim /etc/kibana/kibana.yml

server.port: 5601 # 绑定端口
server.host: "0.0.0.0" # 绑定主机
elasticsearch.hosts: ["http://10.10.10.15:9200"] # ElasticSearch地址

配置Kibana图表

参考:https://www.elastic.co/cn/products/kibana/features

Timelion

以下案例在wazuh入侵检测系统的日志中进行

.es(index=wazuh-alerts-3.x*,q='rule.description: "PAM: Login session opened."').label(登录),.es(index=wazuh-alerts-3.x*,q='rule.description: "PAM: Login session closed."').label(退出)

kibana_timelion_1

Kibana中的Grok Debugger

在Dev Tools中:http://10.10.10.15:5601/app/kibana#/dev_tools/grokdebugger

配置Logstash

node.name: test # 节点名称

path.data: /var/lib/logstash # 数据存储目录
pipeline.id: main # pipeline ID
pipeline.workers: 2 # 输出通道的worker数量,默认为CPU核数

pipeline.batch.size: 125 # 批处理数据的大小
pipeline.batch.delay: 50 # 批处理数据的延迟

config.reload.automatic: false # 是否自动重新加载被修改的配置文件
config.reload.interval: 3s # 配置文件检查间隔时间

测试Logstash

/usr/share/logstash/bin/logstash -e 'input { stdin {} } output { stdout {} }'

Logstash Filter

配置Logstash Patterns

Logstash Patterns目录:/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns/

  • Logstash在配置文件中使用自定义的Pattern
filter {
    grok {
        patterns_dir => ["patterns_dir"]
    }
}

使用geoip解析IP地址

filter {
    geoip {
        source => "remote_addr" # 指定用来解析的原IP字段
    }
}

使用codec/multiline合并多行数据

filter {
    multiline {
        pattern => "^%{TIMESTAMP_ISO8601}"
        negate => ture
        what => previous # 如果不是以时间开头的行合并到上一行
    }
}

# 还有另外一种写法
filter {
    multiline {
        pattern => "%{TIMESTAMP_ISO8601}"
        what => next # 如果是以时间开头的行,则开始新的一行
    }
}

ELK认证体系

Nginx HTTP Auth Basic

  • 系统安装Nginx
# 安装nginx与nginx认证模块
yum install epel-release -y
yum install nginx.x86_64 httpd-tools.x86_64 -y

  • Nginx配置文件如下:vim /etc/nginx/nginx.conf
server {
    listen 8080;
    server_name kibana;
    auth_basic "Restricted Access";
    auth_basic_user_file /etc/nginx/kibana-user;

    location / {
        proxy_pass http://10.10.10.15:5601;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection 'upgrade';
        proxy_set_header Host $host;
        proxy_cache_bypass $http_upgrade;
    }
}
  • 配置账号密码
# 设置账号密码
htpasswd -cm /etc/nginx/kibana-user username

# 重启nginx服务
systemctl restart nginx.service

Nginx Lua

SearchGuard

官方文档

Shield

官方文档

XPack

Beats系列

Centos7安装与配置Filebeat

rpm -ivh filebeat-7.2.0-x86_64.rpm

配置Filebeat

  • 删除不必要的字段:在Filebeat7.X的版本中,默认的@timestamp、type、host、agent、ecs这些字段是不能删除的。
processors:
- drop_fields:
  fields: ["field"]

Filebeat实现多行合并

- document_type: tomcat
  paths:
    - /var/log/java/log #日志文件地址
  input_type: log #从文件中读取
  tail_files: true #以文件末尾开始读取数据
  multiline:
    pattern: ^\d{4}
    match: after
    negate: true

解决ELK常见问题

ElasticSearch脑裂

参考资料

Elasticsearch史上最全最常用工具清单

Elasticsearch学习(1)—— 简介

ES脑裂问题分析及优化

Logstash最佳实践

如何在ELK中解析各类日志文件

ELK学习笔记之F5利用ELK进行应用数据挖掘系列(1)-HTTP

ELK学习笔记之F5利用EELK进行应用数据挖掘系列(2)-DNS