Springboot使用influxDB時序數據庫的實現
項目中需要存放大量設備日志,且需要對其進行簡單的數據分析,信息提取工作.
結合眾多考量因素,項目決定使用時序數據庫中的領頭羊InfluxDB.
引入依賴
項目中使用influxdb-java,在pom文件中添加如下依賴(github地址:https://github.com/influxdata/influxdb-java):
<dependency> <groupId>org.influxdb</groupId> <artifactId>influxdb-java</artifactId> <version>2.15</version> </dependency>
application.yaml文件配置如下所示(請按照實際情況填寫):
spring: influx: url: * password: admin user: 123 database: log_management
配置
(1) 創建配置類
@Configuration public class InfluxDbConfig { @Value("${spring.influx.url:''}") private String influxDBUrl; @Value("${spring.influx.user:''}") private String userName; @Value("${spring.influx.password:''}") private String password; @Value("${spring.influx.database:''}") private String database; @Bean public InfluxDbUtils influxDbUtils() { return new InfluxDbUtils(userName, password, influxDBUrl, database, ""); } }
@Data public class InfluxDbUtils { private String userName; private String password; private String url; public String database; private String retentionPolicy; // InfluxDB實例 private InfluxDB influxDB; // 數據保存策略 public static String policyNamePix = "logRetentionPolicy_"; public InfluxDbUtils(String userName, String password, String url, String database, String retentionPolicy) { this.userName = userName; this.password = password; this.url = url; this.database = database; this.retentionPolicy = retentionPolicy == null || "".equals(retentionPolicy) ? "autogen" : retentionPolicy; this.influxDB = influxDbBuild(); } /** * 連接數據庫 ,若不存在則創建 * * @return influxDb實例 */ private InfluxDB influxDbBuild() { if (influxDB == null) { influxDB = InfluxDBFactory.connect(url, userName, password); } try { createDB(database); influxDB.setDatabase(database); } catch (Exception e) { log.error("create influx db failed, error: {}", e.getMessage()); } finally { influxDB.setRetentionPolicy(retentionPolicy); } influxDB.setLogLevel(InfluxDB.LogLevel.BASIC); return influxDB; } }
構建實體類
InfluxDB中,measurement對應於傳統關系型數據庫中的table(database為配置文件中的log_management).
InfluxDB裡存儲的數據稱為時間序列數據,時序數據有零個或多個數據點.
數據點包括time(一個時間戳),measurement(例如logInfo),零個或多個tag,其對應於level,module,device_id),至少一個field(即日志內容,msg=something error).
InfluxDB會根據tag數值建立時間序列(因此tag數值不能選取諸如UUID作為特征值,易導致時間序列過多,導致InfluxDB崩潰),並建立相應索引,以便優化諸如查詢速度.
@Builder @Data @Measurement(name = "logInfo") public class LogInfo { // Column中的name為measurement中的列名 // 此外,需要註意InfluxDB中時間戳均是以UTC時保存,在保存以及提取過程中需要註意時區轉換 @Column(name = "time") private String time; // 註解中添加tag = true,表示當前字段內容為tag內容 @Column(name = "module", tag = true) private String module; @Column(name = "level", tag = true) private String level; @Column(name = "device_id", tag = true) private String deviceId; @Column(name = "msg") private String msg; }
保存數據
以下代碼為單條日志保存,influxdb-java亦支持批量保存(因為與InfluxDB通訊均是通過http,因此建議批量保存以減少性能損耗).
LogInfo logInfo = LogInfo.builder() .level(jsonObject.getString("level")) .module(module) .deviceId(deviceId) .msg(jsonObject.getString("msg")) .build(); Point point = Point.measurementByPOJO(logInfo.getClass()) .addFieldsFromPOJO(logInfo) .time(jsonObject.getLong("time"), TimeUnit.MILLISECONDS) .build(); // 出於業務考量,設備可以設置不同的保存策略(策略名為固定前綴+設備ID) influxDB.write(influxDBUtils.database, InfluxDbUtils.policyNamePix + deviceId, point);
查詢數據
因為代碼與業務耦合比較厲害,因此此處僅截選做概要示范.
// InfluxDB支持分頁查詢,因此可以設置分頁查詢條件 String pageQuery = " LIMIT " + request.getPageSize() + " OFFSET " + ((request.getPageNum() - 1) * request.getPageSize()); // 此處查詢所有內容,如果 String queryCmd = "SELECT * FROM " // 查詢指定設備下的日志信息 // 要指定從 RetentionPolicyName(保存策略前綴+設備ID).measurement(logInfo) 中查詢指定數據) + InfluxDbUtils.policyNamePix + request.getDeviceId() + "." + "logInfo" // 添加查詢條件(註意查詢條件選擇tag值,選擇field數值會嚴重拖慢查詢速度) + queryCondition // 查詢結果需要按照時間排序 + " ORDER BY time DESC" // 添加分頁查詢條件 + pageQuery;
選擇時序數據庫,不建議使用刪除以及更新操作,因此不做介紹.
可以通過創建或者RetentionPolicy,來添加或者更新數據的刪除時間.
到此這篇關於Springboot使用influxDB時序數據庫的實現的文章就介紹到這瞭,更多相關Springboot使用influxDB內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- docker環境搭建JMeter+Grafana+influxdb可視化性能監控平臺的教程
- 借助Docker搭建JMeter+Grafana+Influxdb監控平臺的詳細教程
- 支持SpEL表達式的自定義日志註解@SysLog介紹
- JPA如何設置表名和實體名,表字段與實體字段的對應
- SpringBoot整合Mongodb實現增刪查改的方法