当前位置: 首页 > news >正文

搭建网站是什么上海发布微博

搭建网站是什么,上海发布微博,企业品牌策划设计,济南网站开发定制简介 随着企业IT基础设施的不断扩大#xff0c;Linux服务器的数量也日益增多#xff0c;传统的单机日志管理方式已无法满足对日志数据集中管理、审计和分析的需求。尤其是在大型集群环境中#xff0c;如何高效地收集、存储和分析日志成为了一项重要的技术挑战。 背景 在实…简介 随着企业IT基础设施的不断扩大Linux服务器的数量也日益增多传统的单机日志管理方式已无法满足对日志数据集中管理、审计和分析的需求。尤其是在大型集群环境中如何高效地收集、存储和分析日志成为了一项重要的技术挑战。 背景 在实现对大型Linux集群的日志集中管理与审计特别是针对rsyslog日志的收集、操作命令日志以及登录日志的审计。通过部署集中的rsyslog服务端能够统一收集1300多台Linux服务器的系统日志确保日志数据的集中化管理。使用Filebeat作为日志收集工具将日志数据推送至Logstash进行清洗和转换最终存储到Elasticsearch中并通过Kibana实现实时数据可视化展示。这种方式不仅简化了日志管理流程还提高了系统的监控效率和安全性。 需求 统一管理 1300 台服务器的Linux系统日志能够及时发现问题和告警。 解决方案 集中管理通过统一的服务端收集所有Linux服务器的日志数据减少单独配置每台服务器的工作量。日志审计对操作命令日志、登录日志等进行审计确保系统行为的可追溯性。数据清洗与分析通过日志清洗与格式转换确保日志数据的标准化便于后续的分析和可视化展示。实时展示利用Kibana将清洗后的数据实时可视化帮助运维人员快速发现潜在问题。 整体架构 为实现这些目标我们设计了以下的系统架构 rsyslog作为日志收集的核心组件它负责将来自Linux系统的各种日志包括/var/log/messages等统一推送到中心化的日志服务端。Filebeat作为轻量级的日志收集器部署在各个Linux节点上负责将日志文件传输到Logstash。Logstash对收集到的日志进行清洗、解析和转换确保数据符合预定格式便于存入Elasticsearch。Elasticsearch存储经过清洗和转换的日志数据提供强大的全文搜索和数据查询功能。Kibana通过Kibana仪表盘实时展示存储在Elasticsearch中的日志数据帮助运维人员进行数据分析和可视化展示。 rsyslog汇总 rsyslog服务端 首先配置rsyslog服务器可以统一收集集群内部的日志。 # 加载本地系统日志模块例如通过 logger 命令发送的日志 $ModLoad imuxsock # 加载内核日志模块之前由 rklogd 处理 $ModLoad imklog # 加载 UDP 模块支持通过 UDP 协议接收日志 $ModLoad imudp # 配置 UDP 服务器在 514 端口接收日志 $UDPServerRun 514 # 加载 TCP 模块支持通过 TCP 协议接收日志 $ModLoad imtcp # 配置 TCP 服务器在 514 端口接收日志 $InputTCPServerRun 514 # 定义一个自定义的日志格式模板 myFormat $template myFormat,%timestamp:::date-rfc3339% %fromhost-ip% %HOSTNAME% [%programname%] %syslogseverity-text%:%msg%\n # 设置默认的文件格式为传统的 rsyslog 格式 $ActionFileDefaultTemplate RSYSLOG_TraditionalFileFormat # 加载 /etc/rsyslog.d/ 目录下的所有配置文件 $IncludeConfig /etc/rsyslog.d/*.conf # 配置收集 info 级别的日志排除 mail、authpriv、cron 类别的日志输出到 /var/log/messages 文件并使用 myFormat 格式 *.info;mail.none;authpriv.none;cron.none /var/log/messages;myFormat # 收集所有 authpriv 类别的日志通常是认证相关的日志输出到 /var/log/secure 文件并使用 myFormat 格式 authpriv.* /var/log/secure;myFormat # 收集所有 mail 类别的日志输出到 /var/log/maillog 文件使用异步写入- 表示异步 mail.* -/var/log/maillog # 收集所有 cron 类别的日志定时任务日志输出到 /var/log/cron 文件 cron.* /var/log/cron # 收集所有紧急级别emerg的日志将其通过系统消息发送 *.emerg :omusrmsg:* # 收集 uucp 和 news 类别的严重级别crit日志输出到 /var/log/spooler 文件 uucp,news.crit /var/log/spooler # 收集所有 local7 类别的日志输出到 /var/log/boot.log 文件 local7.* /var/log/boot.log 重启服务端 systemctl restart rsyslog修改前 修改后 rsyslog客户端 所有集群的客户端配置最后一行IP就可以把数据汇总在一切 [rootzabbix ~]# cat /etc/rsyslog.conf |tail -n 2 #authpriv.* 10.10.10.17 *.* 192.168.102.20 # rsyslog 服务端的IP 重启 systemctl restart rsyslog.service日志已经打印到rsyslog 服务端的 /var/log/messages /var/log/secure 等文件 filebeaet收集 在rsyslog 服务端的安装filebeaet并且使用如下配置启动 filebeat.config.modules:path: ${path.config}/modules.d/*.ymlreload.enabled: false filebeat.inputs: - type: logenabled: truetail_files: truepaths:- /var/log/messages output.logstash:hosts: [192.168.1.100:5514] # 把日志发送到logstatsh中logstatsh配置 [rootgame logstash]# cat config/rsyslog.conf input {beats {port 5514type syslog} }filter {grok {match { message %{TIMESTAMP_ISO8601:time} %{IP:client_ip} %{HOSTNAME:host_name} \[%{DATA:type}\] %{GREEDYDATA:info}}overwrite [message]}mutate { split [type,,] }mutate{add_field {types %{[type][1]} } }mutate{remove_field [ tags,agent,host,log,ecs,type ]}date {match [time, yyyy-MM-dd HH:mm:ss,SSS, UNIX]target timestamplocale cn} }output {stdout {codec rubydebug}elasticsearch {hosts [127.0.0.1:9200]index message} } 数据格式 2024-12-03T18:35:3008:00 192.168.102.30 master01 [kubelet] info: E1203 18:35:30.827608 1065 summary_sys_containers.go:83] Failed to get system container stats errfailed to get cgroup stats for \/system.slice/docker.service\: failed to get container info for \/system.slice/docker.service\: unknown container \/system.slice/docker.service\ containerName/system.slice/docker.serviceGROK解析 %{TIMESTAMP_ISO8601:time} %{IP:client_ip} %{HOSTNAME:host_name} \[%{DATA:type}\] %{GREEDYDATA:info}输出格式 {client_ip: 192.168.102.30,time: 2024-12-03T18:35:3008:00,type: kubelet,host_name: master01,info: info: E1203 18:35:30.827608 1065 summary_sys_containers.go:83] \Failed to get system container stats\ err\failed to get cgroup stats for \\\/system.slice/docker.service\\\: failed to get container info for \\\/system.slice/docker.service\\\: unknown container \\\/system.slice/docker.service\\\\ containerName\/system.slice/docker.service\\r }操作系统命令审计 Linux一般都是终端执行命令我们可以让命令写到message日志上在通过过滤获取操作命令记录。 Linux终端配置 [rootzabbix ~]# cat /etc/profile | tail -n 2 unset MAILCHECK export PROMPT_COMMAND{ msg$(history 1 | { read x y; echo $y; });logger -p local2.info euid$(whoami) $(who am i) pwd $msg; }日志格式 Oct 11 17:32:41 zabbix root: euidroot root pts/0 2021-10-11 15:13 (10.10.10.3) /root cat /etc/profile Oct 11 17:32:47 zabbix root: euidroot root pts/0 2021-10-11 15:13 (10.10.10.3) /root cat /etc/profile | tail -n 2 rsyslog服务端展示 filebeat配置 [rootlogserver01 filebeat]# cat system_messages.yml # Filebeat inputs filebeat.inputs: - type: logenabled: truetail_files: truepaths:- /var/log/messages logstatsh配置 [rootlogserver01 config]# cat system_userlog_FromKafkaInES.conf input{beats {host 172.17.9.200port 5046}# kafka{ # bootstrap_servers [172.17.8.232:6667] # topics [sys_os_exe] # codec json # group_id ELK_SYSTEM_EXE_GROUP # consumer_threads 3 # client_id logstash # decorate_events false # auto_offset_reset earliest # request_timeout_ms 300000 # session_timeout_ms 20000 # max_poll_interval_ms 600000 # } } filter{if ([message] ~ euid){grok{match {message ^(?exetime\d-\d-\d)(?:[^\d])(?hhmmss\d:\d:\d)(?:[^\d]\d:\d)(?:\s)(?deshost[^ ])(?:\s)(?name[^ ])(?:\s\[)(?loginuser[^ |\]]*)(?:\]\s[^ ]\seuid)(?exeuser[^ ])(?:\s)(?userinfo[^\(])(?:\s\()(?srchost[^\)])(?:\)\s)(?exepath[^ ])(\s)(?exeinfo.*)}}if _grokparsefailure in [tags] { drop { } }mutate{add_field [tmp_exeinfo,%{exeinfo}]}mutate{split [exetime,-]split [tmp_exeinfo, ]}mutate{add_field [indextime,%{[exetime][0]}%{[exetime][1]}]add_field [evtTime,%{[exetime][0]}-%{[exetime][1]}-%{[exetime][2]} %{hhmmss}]add_field [cmd,%{[tmp_exeinfo][0]}]}#Retention log insertion time to ES..............ruby { code event.set(inserttime, event.get(timestamp).time.to_i) }#replace InsertTime with evtTime yyyy-MM-dd HH:mm:ss eg:2020-06-29 09:24:29date{match [evtTime,yyyy-MM-dd HH:mm:ss]#kibana use this time....................target timestamp}mutate{replace [evtTime,%{evtTime} 0800]}date{match [evtTime,yyyy-MM-dd HH:mm:ss 0800]timezone UTC#log event time timestamp................target logtimestamp}#log event time long string......................ruby { code event.set(longtime, event.get(logtimestamp).time.to_i) }mutate{remove_field [ tmp_exeinfo,evtTime,host,ecs,log,hhmmss,input,agent,exetime ]}}else{drop{}} } output{stdout{codec rubydebug}if [indextime] !~ index{elasticsearch{hosts [http://172.17.9.176:9200]#hosts 172.17.9.176index sys_os_userlog_%{[indextime]}user *********password *********}} } 数据格式 2024-12-03T18:39:0508:00 192.168.102.30 master01 [root] info: euidroot root pts/0 2024-12-03 18:21 (192.168.96.19) /root [2024-12-03 18:39:05]ip a GROK解析 ^(?exetime\d-\d-\d)(?:[^\d])(?hhmmss\d:\d:\d)(?:[^\d]\d:\d)(?:\s)(?deshost[^ ])(?:\s)(?name[^ ])(?:\s\[)(?loginuser[^ |\]]*)(?:\]\s[^ ]\seuid)(?exeuser[^ ])(?:\s)(?userinfo[^\(])(?:\s\()(?srchost[^\)])(?:\)\s)(?exepath[^ ])(\s)(?exeinfo.*)输出格式 {loginuser: root,hhmmss: 18:39:05,exepath: /root,deshost: 192.168.102.30,srchost: 192.168.96.19,name: master01,exeinfo: [2024-12-03 18:39:05]ip a\r,exeuser: root,userinfo: root pts/0 2024-12-03 18:21,exetime: 2024-12-03 }系统用户登录审计 filebeat配置 [rootlogserver01 filebeat]# cat system_secure.yml # Filebeat inputs filebeat.inputs: - type: logenabled: truetail_files: truepaths:- /var/log/secure # Filebeat outppp_id: messuts output.logstash:hosts: [172.17.9.200:5045] logstatsh配置 [rootlogserver01 config]# cat system_login_FromKafkaInES.conf input{beats {host 172.17.9.200port 5045}# # kafka{ # bootstrap_servers [172.17.8.232:6667] # topics [sys_os_login] # codec json # group_id ELK_SYSTEM_LOGIN_GROUP # consumer_threads 3 # client_id logstash # decorate_events false # auto_offset_reset earliest # request_timeout_ms 300000 # session_timeout_ms 20000 # max_poll_interval_ms 600000 # } } filter{#login successed logif ([message] ~ Accepted){grok{match {message ^(?atime\d-\d-\d)(?:[^\d])(?hhmmss\d:\d:\d)(?:[^\d]\d:\d)(?:\s)(?deshost\d\.\d\.\d\.\d)(?:\s)(?name[^ ])(?:[\S\s]*Failed\spassword\sfor[\sinvalid\suser]*\s)(?loginuser[^ ])(?:\sfrom\s)(?srchost[\d.])(?:\s\w\s\d\s)(?loginmode\w*)}}if _grokparsefailure in [tags] { drop { } }mutate{add_field [type,systemlogin]split [atime,-]}mutate{add_field [indextime,%{[atime][0]}%{[atime][1]}]add_field [evtTime,%{[atime][0]}-%{[atime][1]}-%{[atime][2]} %{hhmmss}]}#Retention log insertion time to ES..............ruby { code event.set(inserttime, event.get(timestamp).time.to_i) }#replace InsertTime with evtTime yyyy-MM-dd HH:mm:ss eg:2020-06-29 09:24:29date{match [evtTime,yyyy-MM-dd HH:mm:ss]#kibana use this time....................target timestamp}mutate{replace [evtTime,%{evtTime} 0800]}date{match [evtTime,yyyy-MM-dd HH:mm:ss 0800]timezone UTC#log event time timestamp................target logtimestamp}#log event time long string......................ruby { code event.set(longtime, event.get(logtimestamp).time.to_i) }mutate{remove_field [ evtTime,host,ecs,log,hhmmss,input,agent,atime ]}}#login failed logelse if ([message] ~ Failed password for){grok{match {message ^(?atime\d-\d-\d)(?:[^\d])(?hhmmss\d:\d:\d)(?:[^\d]\d:\d)(?:\s)(?deshost\d\.\d\.\d\.\d)(?:[\S\s]*Failed\spassword\sfor[\sinvalid\suser]*\s)(?loginuser[^ ])(?:\sfrom\s)(?srchost[\d.])(?:\s\w\s\d\s)(?loginmode\w*)}}if _grokparsefailure in [tags] { drop { } }mutate{add_field [type,systemloginfailed]split [atime,-]}mutate{add_field [indextime,%{[atime][0]}%{[atime][1]}]add_field [evtTime,%{[atime][0]}-%{[atime][1]}-%{[atime][2]} %{hhmmss}]}#Retention log insertion time to ES..............ruby { code event.set(inserttime, event.get(timestamp).time.to_i) }#replace InsertTime with evtTime yyyy-MM-dd HH:mm:ss eg:2020-06-29 09:24:29date{match [evtTime,yyyy-MM-dd HH:mm:ss]#kibana use this time....................target timestamp}mutate{replace [evtTime,%{evtTime} 0800]}date{match [evtTime,yyyy-MM-dd HH:mm:ss 0800]timezone UTC#log event time timestamp................target logtimestamp}#log event time long string......................ruby { code event.set(longtime, event.get(logtimestamp).time.to_i) }mutate{remove_field [ evtTime,host,ecs,log,hhmmss,input,agent,atime ]}}#other logelse{drop{}} } output{if [type] systemlogin{if [indextime] !~ index{stdout{codec rubydebug}elasticsearch{hosts 172.17.9.176index sys_os_systemlogin_%{[indextime]}user elasticpassword f5OPbv6sqfstmc}}}else if [type] systemloginfailed{if [indextime] !~ index{stdout{codec rubydebug}elasticsearch{hosts 172.17.9.176index sys_os_systemloginfailed_%{[indextime]}user elasticpassword xxxxxxx}}} } 成功日志解析 日志格式 2024-12-03T18:45:0908:00 192.168.102.42 node03 [sshd] info: Accepted password for root from 192.168.96.19 port 14347 ssh2 GROK语法 %{TIMESTAMP_ISO8601:atime} %{IP:deshost} %{HOSTNAME:name} \[%{WORD:type}\] %{DATA:loglevel}: Accepted password for %{WORD:loginuser} from %{IP:srchost} port %{NUMBER:port} %{WORD:loginmode}日志格式 {loginuser: root,atime: 2024-12-03T18:45:0908:00,type: sshd,deshost: 192.168.102.42,srchost: 192.168.96.19,port: 14347,loglevel: info,name: node03,loginmode: ssh2 }失败日志格式 日志格式 2024-12-03T19:01:5708:00 192.168.102.42 node03 [sshd] info: Failed password for invalid user 123 from 192.168.96.19 port 12103 ssh2GROK语法 ^(?atime\d-\d-\d)(?:[^\d])(?hhmmss\d:\d:\d)(?:[^\d]\d:\d)(?:\s)(?deshost\d\.\d\.\d\.\d)(?:[\S\s]*Failed\spassword\sfor[\sinvalid\suser]*\s)(?loginuser[^ ])(?:\sfrom\s)(?srchost[\d.])(?:\s\w\s\d\s)(?loginmode\w*)日志输出 {loginuser: 123,atime: 2024-12-03,hhmmss: 19:01:57,deshost: 192.168.102.42,srchost: 192.168.96.19,loginmode: ssh2 }
http://www.hyszgw.com/news/92952.html

相关文章:

  • 信息发布网站有哪些清理网站后台缓存
  • 网站关键词快排名wordpress php5.3.5
  • 蓬莱市建设局网站wordpress极简主题
  • 网站开发人员需求分析企业邮箱价格
  • 美橙极速建站系统域名解析错误是怎么回事
  • 电商型网站网站项目流程表
  • 无锡网站的优化北京推出“北京中轴线”
  • 财政局网站开发合同ui交互设计软件
  • 宝坻手机网站建设网络空间治理
  • 新手建站广告联盟赚钱域名服务器的主要功能是
  • python做网站点登入没反映wordpress公众号登陆
  • 网站修改报价个人网站的留言板怎么做
  • 网站内容更新教程申请个网站
  • 北京轨道交通建设管理有限公司网站注册网站怎么注册
  • 回忆网站模板番禺区手机版网站建设
  • 珠宝企业的门户网站开发百度网络推广怎么做
  • 网站开发 发布wordpress微信个人支付宝
  • 设计高端网站哪家好域名如何绑定网站
  • 网站开发工程师特点公司怎样制作网站
  • 最讨厌网站wordpress置顶重复了
  • 网站搭建的注意事项南海营销网站建设
  • 帮客户做网站wordpress过滤
  • 网站做系统叫什么软件有哪些做静态网站软件
  • 基于百度地图的网站开发山东平台网站建设平台
  • 好用的影视网站模板wordpress 插件 h5
  • 网站图片优化器做网站找哪个软件
  • 网站开发培训费用桂林做网站哪家公司好
  • 网站建设费怎么入账潜江招聘资讯网
  • 一级a做爰片免费网站丶郑州网站建设公司服务公司
  • 网站栏目结构哪些天津智能网站建设价位