课程标题:基于ELKKafka构建分布式ahrefhttps:www。bs178。comrizhitargetblankclassinfotextkey日志a采集系统 1。传统日志采集存在哪些缺点 2。elkkafka日志采集的原理 3。基于dockercompose安装elkkafka环境 4。基于AOP并发队列实现日志的采集 分布式日志采集产生背景 在传统项目中,如果在生产环境中,有多台不同的服务器集群,如果生产环境需要通过日志定位项目的Bug的话,需要在每台节点上使用传统的命令方式查询,这样效率非常低下。 因此我们需要集中化的管理日志,ELK则应运而生。 传统方式服务器搜索日志命令:tail200f日志文件名称ELKKafka组成 ElkEElasticSeach(存储日志信息) lLogstash(搬运工) KKibana连接到我们ElasticSeach图形化界面查询日志 Elkkafka实现分布式日志采集 为什么需要将日志存储在ElasticSeach而不是MySQL中呢 ElasticSeach底层使用到倒排索引存储数据,在搜索日志效率比mysql要高的。elkkafka原理 1。springboot项目会基于aop的方式拦截系统中日志 请求与响应日志信息前置或者环绕通知; 2。将该日志投递到我们kafka中注意该过程一定要是异步的形式,如果是同步形式会影响到整体 接口的响应速度。 3。Logstash数据源kafka订阅kafka的主题获取日志消息内容 4。Logstash在将日志消息内容输出到es中存放 5。开发者使用Kibana连接到ElasticSeach查询存储日志内容。为什么ELK需要结合Kafka 如果只整合elk不结合kafka这样的话每个服务器节点上都会安装Logstash做读写日志IO操作,可能性能不是很好,而且比较冗余。 ELKKafka环境构建dockercompose构建ELKKafka环境 整个环境采用dockercompose来实现构建 注意:环境cpu多核内存4GB以上 kafka环境的安装: 1。使用dockercompose安装kafka 对dockercompose不熟悉可以查看:http:www。mayikt。comfrontcouinfo3010 dockercompose安装包https:note。youdao。comynoteshare1index。html?idb72a076dc75471fb86deba42924a0a3dtypenote docker相关学习文档:http:note。youdao。comnoteshare?ide99e3eba0355e7c9905d0395d20b9f27sub962FA9CD33974C77A948376958D4B3DD 2。dockercompose文件 3。mkdirdockerkakfa 4。cddockerkakfa 5。创建dockercompose。ymlversion:2services:zookeeper:image:wurstmeisterzookeeperports:2181:2181restart:alwayskafka:image:wurstmeisterkafka:2。122。3。0ports:9092:9092environment:KAFKAZOOKEEPERCONNECTzookeeper:2181KAFKAADVERTISEDLISTENERSPLAINTEXT:192。168。75。129:9092KAFKALISTENERSPLAINTEXT::9092volumes:varrundocker。sock:varrundocker。sockrestart:alwayskafkamanager:image:sheepkillerkafkamanager镜像:开源的web管理kafka集群的界面environment:ZKHOSTS:192。168。75。129修改:宿主机IPports:9001:9000暴露端口elasticsearch:image:daocloud。iolibraryelasticsearch:6。5。4restart:alwayscontainername:elasticsearchenvironment:ESJAVAOPTSXms512mXmx512mports:9200:9200kibana:image:daocloud。iolibrarykibana:6。5。4restart:alwayscontainername:kibanaports:5601:5601environment:elasticsearchurlhttp:192。168。75。129:9200dependson:elasticsearch dockerrunzookeeper容器 dockerrunkafka容器 dockerrunkafka容器ElasticSeach dockerrunKibana容器 dockerrunLogstash容器 使用容器编排技术 6。关闭防火墙 systemctlstopfirewalld serviceiptablesstop 7。dockercomposeup执行即可。 没有这个命令需要先安装dockercompose 注意:构建elkkafka环境过程中,需要非常多的依赖镜像。 如果es启动报错:无法启动大多数内存不足的原因 建议虚拟机内存4G以上 es启动报错:maxvirtualmemoryareasvm。maxcount(65530)istoo 解决步骤: 1。先切换到root用户; 2。执行命令: sysctlwvm。maxmapcount262144 可以查看结果: sysctlagrepvm。maxmapcount 会显示如下信息: vm。maxmapcount262144 注意: 上述方法修改之后,如果重启虚拟机将失效,所以: 一劳永逸的解决办法: 在etcsysctl。conf文件的最后添加一行代码: vm。maxmapcount262144 即可永久修改。验证elkkafka环境 dockerps 访问:zk192。168。75。143:2181 访问:eshttp:192。168。75。143:9200 访问:kibanahttp:192。168。75。143:5601appkibanadevtoolsconsole?g() 安装logstash上传logstash6。4。3。tar。gz到服务中tarzxvflogstash6。4。3。tar。gzcdlogstash6。4。3binlogstashplugininstalllogstashinputkafkabinlogstashplugininstalllogstashoutputelasticsearch 注意:安装 binlogstashplugininstalllogstashinputkafka binlogstashplugininstalllogstashoutputelasticsearch 本机电脑需要有JDK的环境,如果没有JDK环境直接安装logstashinputkafka或者logstashoutputelasticsearch会报错的 创建在logstashconfig目录创建kafka。confinput{kafka{bootstrapservers192。168。75。143:9092topicsmayiktlog}}filter{Onlymatcheddataaresendtooutput。}output{elasticsearch{actionindexTheoperationonEShosts192。168。75。143:9200ElasticSearchhost,canbearray。indexmylogsTheindextowritedatato。}} 进入logstashbin目录执行。logstashf。。configkafka。conf springboot项目整合elkkafka maven依赖dependenciesdependencygroupIdorg。springframework。bootgroupIdspringbootstarterwebartifactIdexclusionsexclusiongroupIdcom。fasterxml。jackson。coregroupIdjacksondatabindartifactIdexclusionexclusionsdependencydependencygroupIdorg。projectlombokgroupIdlombokartifactIdscopeprovidedscopedependencydependencygroupIdcom。alibabagroupIdfastjsonartifactIdversion1。2。66versiondependencydependencygroupIdorg。springframework。kafkagroupIdspringkafkaartifactIddependencydependencygroupIdorg。springframework。bootgroupIdspringbootstarteraopartifactIddependencydependencygroupIdcommonslanggroupIdcommonslangartifactIdversion2。6versiondependencydependenciesaop拦截系统日志importjava。net。InetAimportjava。net。UnknownHostEimportjava。text。SimpleDateFimportjava。util。Aimportjava。util。Dimportjavax。servlet。http。HttpServletRimportcom。alibaba。fastjson。JSONOimportcom。mayikt。container。LogCimportorg。aspectj。lang。JoinPimportorg。aspectj。lang。annotation。;importorg。springframework。beans。factory。annotation。Aimportorg。springframework。beans。factory。annotation。Vimportorg。springframework。stereotype。Cimportorg。springframework。util。concurrent。ListenableFimportorg。springframework。web。context。request。RequestContextHimportorg。springframework。web。context。request。ServletRequestAAspectComponentpublicclassAopLogAspect{Value({server。port})privateStringserverP申明一个切点里面是execution表达式Pointcut(execution(com。mayikt。api。service。。(。。)))privatevoidserviceAspect(){}AutowiredprivateLogContainerlogC请求method前打印内容Before(valueserviceAspect())publicvoidmethodBefore(JoinPointjoinPoint){ServletRequestAttributesrequestAttributes(ServletRequestAttributes)RequestContextHolder。getRequestAttributes();HttpServletRequestrequestrequestAttributes。getRequest();JSONObjectjsonObjectnewJSONObject();SimpleDateFormatdfnewSimpleDateFormat(yyyyMMddHH:mm:ss);设置日期格式jsonObject。put(requesttime,df。format(newDate()));jsonObject。put(requesturl,request。getRequestURL()。toString());jsonObject。put(requestmethod,request。getMethod());jsonObject。put(signature,joinPoint。getSignature());jsonObject。put(requestargs,Arrays。toString(joinPoint。getArgs()));IP地址信息jsonObject。put(ipaddres,getIpAddr(request):serverPort);JSONObjectrequestJsonObjectnewJSONObject();requestJsonObject。put(request,jsonObject);jsonObject。put(requesttime,df。format(newDate()));jsonObject。put(logtype,info);将ahrefhttps:www。bs178。comrizhitargetblankclassinfotextkey日志a信息投递到kafka中StringlogrequestJsonObject。toJSONString();ListenableFutureSendResultString,ObjectsendkafkaTemplate。send(mayiktlog,ctx);logContainer。addLog(log);}在方法执行完结后打印返回内容AfterReturning(returningo,pointcutserviceAspect())publicvoidmethodAfterReturing(Objecto){ServletRequestAttributesrequestAttributes(ServletRequestAttributes)RequestContextHolder。getRequestAttributes();HttpServletRequestrequestrequestAttributes。getRequest();JSONObjectrespJSONObjectnewJSONObject();JSONObjectjsonObjectnewJSONObject();SimpleDateFormatdfnewSimpleDateFormat(yyyyMMddHH:mm:ss);设置日期格式jsonObject。put(responsetime,df。format(newDate()));jsonObject。put(responsecontent,JSONObject。toJSONString(o));IP地址信息jsonObject。put(ipaddres,getIpAddr(request):serverPort);jsonObject。put(logtype,info);respJSONObject。put(response,jsonObject);将ahrefhttps:www。bs178。comrizhitargetblankclassinfotextkey日志a信息投递到kafka中kafkaTemplate。send(mayiktlog,respJSONObject。toJSONString());logContainer。put(respJSONObject。toJSONString());}异常通知parampointAfterThrowing(pointcutserviceAspect(),throwinge)publicvoidserviceAspect(JoinPointpoint,Exceptione){ServletRequestAttributesrequestAttributes(ServletRequestAttributes)RequestContextHolder。getRequestAttributes();HttpServletRequestrequestrequestAttributes。getRequest();JSONObjectjsonObjectnewJSONObject();SimpleDateFormatdfnewSimpleDateFormat(yyyyMMddHH:mm:ss);设置日期格式jsonObject。put(requesttime,df。format(newDate()));jsonObject。put(requesturl,request。getRequestURL()。toString());jsonObject。put(requestmethod,request。getMethod());jsonObject。put(signature,point。getSignature());jsonObject。put(requestargs,Arrays。toString(point。getArgs()));jsonObject。put(error,e。toString());IP地址信息jsonObject。put(ipaddres,getIpAddr(request):serverPort);jsonObject。put(logtype,info);JSONObjectrequestJsonObjectnewJSONObject();requestJsonObject。put(request,jsonObject);将ahrefhttps:www。bs178。comrizhitargetblankclassinfotextkey日志a信息投递到kafka中StringlogrequestJsonObject。toJSONString();logContainer。addLog(log);}publicstaticStringgetIpAddr(HttpServletRequestrequest){XForwardedFor(XFF)是用来识别通过HTTP代理或负载均衡方式连接到Web服务器的客户端最原始的IP地址的HTTP请求头字段。StringipAddressrequest。getHeader(xforwardedfor);if(ipAddressnullipAddress。length()0unknown。equalsIgnoreCase(ipAddress)){ipAddressrequest。getHeader(ProxyClientIP);}if(ipAddressnullipAddress。length()0unknown。equalsIgnoreCase(ipAddress)){ipAddressrequest。getHeader(WLProxyClientIP);}if(ipAddressnullipAddress。length()0unknown。equalsIgnoreCase(ipAddress)){ipAddressrequest。getRemoteAddr();if(ipAddress。equals(127。0。0。1)ipAddress。equals(0:0:0:0:0:0:0:1)){根据网卡取本机配置的IPInetAtry{inetInetAddress。getLocalHost();}catch(UnknownHostExceptione){e。printStackTrace();}ipAddressinet。getHostAddress();}}对于通过多个代理的情况,第一个IP为客户端真实IP,多个IP按照,分割if(ipAddress!nullipAddress。length()15){。。。。length()15if(ipAddress。indexOf(,)0){ipAddressipAddress。substring(0,ipAddress。indexOf(,));}}returnipA}}配置文件内容spring:application:服务的名称name:mayiktelkkafkajackson:dateformat:yyyyMMddHH:mm:sskafka:bootstrapservers:192。168。75。143:9092指定kafkaserver的地址,集群配多个,中间,逗号隔开producer:keyserializer:org。apache。kafka。common。serialization。StringSerializervalueserializer:org。apache。kafka。common。serialization。StringSerializerconsumer:groupid:defaultconsumergroup群组IDenableautocommit:trueautocommitinterval:1000keydeserializer:org。apache。kafka。common。serialization。StringDeserializervaluedeserializer:org。apache。kafka。common。serialization。StringDeserializerserver:port:9000