elasticsearch+logstash並使用java代碼實現日志檢索
為瞭項目日志不被泄露,數據展示不采用Kibana
1、環境準備
1.1 創建普通用戶
#創建用戶 useradd querylog #設置密碼 passwd queylog #授權sudo權限 查找sudoers文件位置 whereis sudoers #修改文件為可編輯 chmod -v u+w /etc/sudoers #編輯文件 vi /etc/sudoers #收回權限 chmod -v u-w /etc/sudoers #第一次使用sudo會有提示 We trust you have received the usual lecture from the local System Administrator. It usually boils down to these three things: #1) Respect the privacy of others. #2) Think before you type. #3) With great power comes great responsibility. 用戶創建完成。
1.2 安裝jdk
su queylog cd /home/queylog #解壓jdk-8u191-linux-x64.tar.gz tar -zxvf jdk-8u191-linux-x64.tar.gz sudo mv jdk1.8.0_191 /opt/jdk1.8 #編輯/ect/profile vi /ect/profile export JAVA_HOME=/opt/jdk1.8 export JRE_HOME=$JAVA_HOME/jre export CLASSPATH=.:$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATH export PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH #刷新配置文件 source /ect/profile #查看jdk版本 java -verion
1.3 防火墻設置
#放行指定IP firewall-cmd --permanent --add-rich-rule="rule family="ipv4" source address="172.16.110.55" accept" #重新載入 firewall-cmd --reload
2、安裝elasticsearch
2.1 elasticsearch配置
註意:elasticsearch要使用普通用戶啟動要不然會報錯
su queylog cd /home/queylog #解壓elasticsearch-6.5.4.tar.gz tar -zxvf elasticsearch-6.5.4.tar.gz sudo mv elasticsearch-6.5.4 /opt/elasticsearch #編輯es配置文件 vi /opt/elasticsearch/config/elasticsearch.yml # 配置es的集群名稱 cluster.name: elastic # 修改服務地址 network.host: 192.168.8.224 # 修改服務端口 http.port: 9200 #切換root用戶 su root #修改/etc/security/limits.conf 追加以下內容 vi /etc/security/limits.conf * hard nofile 655360 * soft nofile 131072 * hard nproc 4096 * soft nproc 2048 #編輯 /etc/sysctl.conf,追加以下內容: vi /etc/sysctl.conf vm.max_map_count=655360 fs.file-max=655360 #保存後,重新加載: sysctl -p #切換回普通用戶 su queylog #啟動elasticsearch ./opt/elasticsearch/bin/elasticsearch #測試 curl http://192.168.8.224:9200 #控制臺會打印 { "name" : "L_dA6oi", "cluster_name" : "elasticsearch", "cluster_uuid" : "eS7yP6fVTvC8KMhLutOz6w", "version" : { "number" : "6.5.4", "build_flavor" : "default", "build_type" : "tar", "build_hash" : "d2ef93d", "build_date" : "2018-12-17T21:17:40.758843Z", "build_snapshot" : false, "lucene_version" : "7.5.0", "minimum_wire_compatibility_version" : "5.6.0", "minimum_index_compatibility_version" : "5.0.0" }, "tagline" : "You Know, for Search" }
2.2 把elasticsearch作為服務進行管理
#切換root用戶 su root #編寫服務配置文件 vi /usr/lib/systemd/system/elasticsearch.service [unit] Description=Elasticsearch Documentation=http://www.elastic.co Wants=network-online.target After=network-online.target [Service] Environment=ES_HOME=/opt/elasticsearch Environment=ES_PATH_CONF=/opt/elasticsearch/config Environment=PID_DIR=/opt/elasticsearch/config EnvironmentFile=/etc/sysconfig/elasticsearch WorkingDirectory=/opt/elasticsearch User=queylog Group=queylog ExecStart=/opt/elasticsearch/bin/elasticsearch -p ${PID_DIR}/elasticsearch.pid # StandardOutput is configured to redirect to journalctl since # some error messages may be logged in standard output before # elasticsearch logging system is initialized. Elasticsearch # stores its logs in /var/log/elasticsearch and does not use # journalctl by default. If you also want to enable journalctl # logging, you can simply remove the "quiet" option from ExecStart. StandardOutput=journal StandardError=inherit # Specifies the maximum file descriptor number that can be opened by this process LimitNOFILE=65536 # Specifies the maximum number of process LimitNPROC=4096 # Specifies the maximum size of virtual memory LimitAS=infinity # Specifies the maximum file size LimitFSIZE=infinity # Disable timeout logic and wait until process is stopped TimeoutStopSec=0 # SIGTERM signal is used to stop the Java process KillSignal=SIGTERM # Send the signal only to the JVM rather than its control group KillMode=process # Java process is never killed SendSIGKILL=no # When a JVM receives a SIGTERM signal it exits with code 143 SuccessExitStatus=143 [Install] WantedBy=multi-user.target vi /etc/sysconfig/elasticsearch elasticsearch # ####################### # Elasticsearch home directory ES_HOME=/opt/elasticsearch # Elasticsearch Java path JAVA_HOME=/home/liyijie/jdk1.8 CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JAVA_HOMR/jre/lib # Elasticsearch configuration directory ES_PATH_CONF=/opt/elasticsearch/config # Elasticsearch PID directory PID_DIR=/opt/elasticsearch/config ############################# # Elasticsearch Service # ############################# # SysV init.d # The number of seconds to wait before checking if elasticsearch started successfully as a daemon process ES_STARTUP_SLEEP_TIME=5 ################################ # Elasticsearch Properties # ################################ # Specifies the maximum file descriptor number that can be opened by this process # When using Systemd,this setting is ignored and the LimitNOFILE defined in # /usr/lib/systemd/system/elasticsearch.service takes precedence #MAX_OPEN_FILES=65536 # The maximum number of bytes of memory that may be locked into RAM # Set to "unlimited" if you use the 'bootstrap.memory_lock: true' option # in elasticsearch.yml. # When using Systemd,LimitMEMLOCK must be set in a unit file such as # /etc/systemd/system/elasticsearch.service.d/override.conf. #MAX_LOCKED_MEMORY=unlimited # Maximum number of VMA(Virtual Memory Areas) a process can own # When using Systemd,this setting is ignored and the 'vm.max_map_count' # property is set at boot time in /usr/lib/sysctl.d/elasticsearch.conf #MAX_MAP_COUNT=262144 # 重新加載服務 systemctl daemon-reload #切換普通用戶 su queylog #啟動elasticsearch sudo systemctl start elasticsearch #設置開機自啟動 sudo systemctl enable elasticsearch
3、安裝logstash
3.1、logstash配置
su queylog cd /home/queylog #解壓 logstash-6.5.4.tar.gz tar -zxvf logstash-6.5.4.tar.gz sudo mv logstash-6.5.4 /opt/logstash #編輯es配置文件 vi /opt/logstash/config/logstash.yml xpack.monitoring.enabled: true xpack.monitoring.elasticsearch.username: elastic xpack.monitoring.elasticsearch.password: changeme xpack.monitoring.elasticsearch.url: ["http://192.168.8.224:9200"] #在bin目錄下創建logstash.conf vi /opt/logstash/bin/logstash.conf input { # 以文件作為來源 file { # 日志文件路徑 path => "/opt/tomcat/logs/catalina.out" start_position => "beginning" # (end, beginning) type=> "isp" } } #filter { #定義數據的格式,正則解析日志(根據實際需要對日志日志過濾、收集) #grok { # match => { "message" => "%{IPV4:clientIP}|%{GREEDYDATA:request}|%{NUMBER:duration}"} #} #根據需要對數據的類型轉換 #mutate { convert => { "duration" => "integer" }} #} # 定義輸出 output { elasticsearch { hosts => "192.168.43.211:9200" #Elasticsearch 默認端口 index => "ind" document_type => "isp" } } #給該用戶授權 chown queylog:queylog /opt/logstash #啟動logstash ./opt/logstash/bin/logstash -f logstash.conf # 安裝並配置啟動logstash後查看es索引是否創建完成 curl http://192.168.8.224:9200/_cat/indices
4、java代碼部分
之前在SpringBoot整合ElasticSearch與Redis的異常解決
查閱資料,這個歸納的原因比較合理。
原因分析:程序的其他地方使用瞭Netty,這裡指redis。這影響在實例化傳輸客戶端之前初始化處理器的數量。 實例化傳輸客戶端時,我們嘗試初始化處理器的數量。 由於在其他地方使用Netty,因此已經初始化並且Netty會對此進行防范,因此首次實例化會因看到的非法狀態異常而失敗。
解決方案
在SpringBoot啟動類中加入:
System.setProperty("es.set.netty.runtime.available.processors", "false");
4.1、引入pom依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency>
4.2、修改配置文件
spring.data.elasticsearch.cluster-name=elastic # restapi使用9200 # java程序使用9300 spring.data.elasticsearch.cluster-nodes=192.168.43.211:9300
4.3、對應的接口以及實現類
import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.annotations.Field; @Document(indexName = "ind", type = "isp") public class Bean { @Field private String message; public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } @Override public String toString() { return "Tomcat{" + ", message='" + message + '\'' + '}'; } }
import java.util.Map; public interface IElasticSearchService { Map<String, Object> search(String keywords, Integer currentPage, Integer pageSize) throws Exception ; //特殊字符轉義 default String escape( String s) { StringBuilder sb = new StringBuilder(); for(int i = 0; i < s.length(); ++i) { char c = s.charAt(i); if (c == '\\' || c == '+' || c == '-' || c == '!' || c == '(' || c == ')' || c == ':' || c == '^' || c == '[' || c == ']' || c == '"' || c == '{' || c == '}' || c == '~' || c == '*' || c == '?' || c == '|' || c == '&' || c == '/') { sb.append('\\'); } sb.append(c); } return sb.toString(); } }
import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.PageRequest; import org.springframework.data.elasticsearch.core.ElasticsearchTemplate; import org.springframework.data.elasticsearch.core.aggregation.AggregatedPage; import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * ElasticSearch實現類 */ @Service public class ElasticSearchServiceImpl implements IElasticSearchService { Logger log = LoggerFactory.getLogger(ElasticSearchServiceImpl.class); @Autowired ElasticsearchTemplate elasticsearchTemplate; @Resource HighlightResultHelper highlightResultHelper; @Override public Map<String, Object> search(String keywords, Integer currentPage, Integer pageSize) { keywords= escape(keywords); currentPage = Math.max(currentPage - 1, 0); List<HighlightBuilder.Field> highlightFields = new ArrayList<>(); //設置高亮 把查詢到的關鍵字進行高亮 HighlightBuilder.Field message = new HighlightBuilder.Field("message").fragmentOffset(80000).numOfFragments(0).requireFieldMatch(false).preTags("<span style='color:red'>").postTags("</span>"); highlightFields.add(message); HighlightBuilder.Field[] highlightFieldsAry = highlightFields.toArray(new HighlightBuilder .Field[highlightFields.size()]); //創建查詢構造器 NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder(); //過濾 按字段權重進行搜索 查詢內容不為空按關鍵字、摘要、其他屬性權重 BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); queryBuilder.withPageable(PageRequest.of(currentPage, pageSize)); if (!MyStringUtils.isEmpty(keywords)){ boolQueryBuilder.must(QueryBuilders.queryStringQuery(keywords).field("message")); } queryBuilder.withQuery(boolQueryBuilder); queryBuilder.withHighlightFields(highlightFieldsAry); log.info("查詢語句:{}", queryBuilder.build().getQuery().toString()); //查詢 AggregatedPage<Bean> result = elasticsearchTemplate.queryForPage(queryBuilder.build(), Bean .class,highlightResultHelper); //解析結果 long total = result.getTotalElements(); int totalPage = result.getTotalPages(); List<Bean> blogList = result.getContent(); Map<String, Object> map = new HashMap<>(); map.put("total", total); map.put("totalPage", totalPage); map.put("pageSize", pageSize); map.put("currentPage", currentPage + 1); map.put("blogList", blogList); return map; }
import com.alibaba.fastjson.JSONObject; import org.apache.commons.beanutils.PropertyUtils; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.text.Text; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.fetch.subphase.highlight.HighlightField; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.domain.Pageable; import org.springframework.data.elasticsearch.core.SearchResultMapper; import org.springframework.data.elasticsearch.core.aggregation.AggregatedPage; import org.springframework.data.elasticsearch.core.aggregation.impl.AggregatedPageImpl; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; /** * ElasticSearch高亮配置 */ @Component public class HighlightResultHelper implements SearchResultMapper { Logger log = LoggerFactory.getLogger(HighlightResultHelper.class); @Override public <T> AggregatedPage<T> mapResults(SearchResponse response, Class<T> clazz, Pageable pageable) { List<T> results = new ArrayList<>(); for (SearchHit hit : response.getHits()) { if (hit != null) { T result = null; if (StringUtils.hasText(hit.getSourceAsString())) { result = JSONObject.parseObject(hit.getSourceAsString(), clazz); } // 高亮查詢 for (HighlightField field : hit.getHighlightFields().values()) { try { PropertyUtils.setProperty(result, field.getName(), concat(field.fragments())); } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { log.error("設置高亮字段異常:{}", e.getMessage(), e); } } results.add(result); } } return new AggregatedPageImpl<T>(results, pageable, response.getHits().getTotalHits(), response .getAggregations(), response.getScrollId()); } public <T> T mapSearchHit(SearchHit searchHit, Class<T> clazz) { List<T> results = new ArrayList<>(); for (HighlightField field : searchHit.getHighlightFields().values()) { T result = null; if (StringUtils.hasText(searchHit.getSourceAsString())) { result = JSONObject.parseObject(searchHit.getSourceAsString(), clazz); } try { PropertyUtils.setProperty(result, field.getName(), concat(field.fragments())); } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { log.error("設置高亮字段異常:{}", e.getMessage(), e); } results.add(result); } return null; } private String concat(Text[] texts) { StringBuffer sb = new StringBuffer(); for (Text text : texts) { sb.append(text.toString()); } return sb.toString(); } }
import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest(classes = CbeiIspApplication.class) public class ElasticSearchServiceTest {w private static Logger logger= LoggerFactory.getLogger(EncodePhoneAndCardTest.class); @Autowired private IElasticSearchService elasticSearchService; @Test public ResponseVO getLog(){ try { Map<String, Object> search = elasticSearchService.search("Exception", 1, 10); logger.info( JSON.toJSONString(search)); } catch (Exception e) { e.printStackTrace(); } }
例如:以上就是今天要講的內容,本文僅僅簡單介紹瞭elasticsearch跟logstash的使用, 文章若有不當之處,歡迎評論指出~
到此這篇關於elasticsearch+logstash並使用java代碼實現日志檢索的文章就介紹到這瞭,更多相關elasticsearch logstash日志檢索內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- 使用logstash同步mysql數據到elasticsearch實現
- 一文秒懂logstash收集springboot日志的方法
- logstash將mysql數據同步到elasticsearch方法詳解
- 基於Docker搭建ELK 日志系統的方法
- spring-data-elasticsearch @Field註解無效的完美解決方案