SpringBoot集成MaxCompute的示例代碼
1、SDK方式集成
使用odps-sdk-core集成, 官方文檔地址MaxCompute Java SDK介紹
1.1、依賴引入odps-sdk-core
<properties> <java.version>1.8</java.version> <!--maxCompute sdk 版本號--> <max-compute-sdk.version>0.40.8-public</max-compute-sdk.version> </properties> <dependencies> <!--max compute sdk--> <dependency> <groupId>com.aliyun.odps</groupId> <artifactId>odps-sdk-core</artifactId> <version>${max-compute-sdk.version}</version> </dependency> </dependencies>
1.2、編寫連接工具類
編寫MaxComputeSdkUtil以SDK方式連接MaxCompute
1.2.1、重要類和方法說明
1、連接參數類:
@Data public class MaxComputeSdkConnParam { /**阿裡雲accessId 相當於用戶名 */ private String aliyunAccessId; /**阿裡雲accessKey 相當於密碼 */ private String aliyunAccessKey; /**阿裡雲maxCompute服務接口地址 默認是http://service.odps.aliyun.com/api*/ private String maxComputeEndpoint; /**項目名稱*/ private String projectName; }
2、查詢表元數據信息實體
主要是字段:tableName, comment。還可以自己添加其他字段
@Data @NoArgsConstructor @AllArgsConstructor public class TableMetaInfo { /**表名稱*/ private String tableName; /**表註釋*/ private String comment; }
3、公共方法(初始化)
/**默認的odps接口地址 在Odps中也可以看到該變量*/ private static final String defaultEndpoint = "http://service.odps.aliyun.com/api"; /**開啟全表掃描的配置*/ private static final String FULL_SCAN_CONFIG = "odps.sql.allow.fullscan"; /**分頁查詢sql模板*/ private static final String PAGE_SELECT_TEMPLATE_SQL = "select z.* from (%s) z limit %s, %s;"; /**分頁查詢統計數量模板SQL*/ private static final String PAGE_COUNT_TEMPLATE_SQL = "select count(1) from (%s) z;"; /**sdk的odps客戶端*/ private final Odps odps; /**odps連接參數*/ private final MaxComputeSdkConnParam connParam; public MaxComputeSdkUtil(MaxComputeSdkConnParam param){ this.connParam = param; // 構建odps客戶端 this.odps = buildOdps(); } /** * 構建odps客戶端 用於執行sql等操作 * @return odps客戶端 */ private Odps buildOdps() { // 阿裡雲賬號密碼 AccessId 和 AccessKey final String aliyunAccessId = connParam.getAliyunAccessId(); final String aliyunAccessKey = connParam.getAliyunAccessKey(); // 創建阿裡雲賬戶 final AliyunAccount aliyunAccount = new AliyunAccount(aliyunAccessId, aliyunAccessKey); // 使用阿裡雲賬戶創建odps客戶端 final Odps odps = new Odps(aliyunAccount); // 傳入瞭的話就是用傳入的 沒有傳入使用默認的 final String endpoint = connParam.getMaxComputeEndpoint(); try { odps.setEndpoint(ObjectUtils.isEmpty(endpoint) ? defaultEndpoint : endpoint); } catch (Exception e) { // 端點格式不正確 throw new BizException(ResultCode.MAX_COMPUTE_ENDPOINT_ERR); } // 設置項目 odps.setDefaultProject(connParam.getProjectName()); return odps; }
4、查詢表信息
/** * 獲取表信息 */ public List<TableMetaInfo> getTableInfos(){ final Tables tables = odps.tables(); List<TableMetaInfo> resultTables = new ArrayList<>(); try { for (Table table : tables) { // tableName final String name = table.getName(); // 描述 final String comment = table.getComment(); final TableMetaInfo info = new TableMetaInfo(name, comment); resultTables.add(info); } } catch (Exception e) { e.printStackTrace(); final String errMsg = ObjectUtils.isEmpty(e.getMessage()) ? "" : e.getMessage(); if (errMsg.contains("ODPS-0410051:Invalid credentials")){ throw new BizException(ResultCode.MAX_COMPUTE_UNAME_ERR); } if (errMsg.contains("ODPS-0410042:Invalid signature value")){ throw new BizException(ResultCode.MAX_COMPUTE_PWD_ERR); } if (errMsg.contains("ODPS-0420095: Access Denied")){ throw new BizException(ResultCode.MAX_COMPUTE_PROJECT_ERR); } } return resultTables; }
5、執行SQL封裝
/** * 執行sql查詢 * @param querySql 查詢sql * @param fullScan 是否開啟全表掃描 如果查詢多個分區數據,需要開啟全表掃描 * @return List<Map<String, Object>> */ public List<Map<String, Object>> queryData(String querySql, boolean fullScan){ try { // 配置全表掃描嗎 configFullScan(fullScan); // 使用任務執行SQL final Instance instance = SQLTask.run(odps, querySql); // 等待執行成功 instance.waitForSuccess(); // 封裝返回結果 List<Record> records = SQLTask.getResult(instance); // 結果轉換為Map return buildMapByRecords(records); } catch (OdpsException e) { e.printStackTrace(); throw new BizException(ResultCode.MAX_COMPUTE_SQL_EXEC_ERR); } } /** * 開啟和移除全表掃描配置 * @param fullScan 是否全表掃描 */ private void configFullScan(boolean fullScan) { if (fullScan){ // 開啟全表掃描配置 Map<String, String> config = new HashMap<>(); log.info("===>>開啟全表掃描, 查詢多個分區數據"); config.put(FULL_SCAN_CONFIG, "true"); odps.setGlobalSettings(config); }else { // 移除全表掃描配置 odps.getGlobalSettings().remove(FULL_SCAN_CONFIG); } } /** * 將List<Record>準換為List<Map></> * @param records sql查詢結果 * @return 返回結果 */ private List<Map<String, Object>> buildMapByRecords(List<Record> records) { List<Map<String, Object>> listMap = new ArrayList<>(); for (Record record : records) { Column[] columns = record.getColumns(); Map<String, Object> map = new LinkedHashMap<>(); for (Column column : columns) { String name = column.getName(); Object value = record.get(name); // maxCompute裡面的空返回的是使用\n if ("\\N".equalsIgnoreCase(String.valueOf(value))) { map.put(name, ""); } else { map.put(name, value); } } listMap.add(map); } return listMap; }
6、分頁查詢分裝
/** * 執行sql查詢【分頁查詢】 * @param querySql 查詢sql * @param page 頁碼 從1開始 第n頁傳n * @param size 每頁記錄數 * @param fullScan 是否開啟全表掃描 如果查詢多個分區數據,需要開啟全表掃描 * @return List<Map<String, Object>> */ public List<Map<String, Object>> queryData(String querySql, Integer page, Integer size, boolean fullScan){ // 重寫SQl,添加limit offset, limit // 1、替換分號 querySql = querySql.replaceAll(";", ""); // 2、格式化SQL Integer offset = (page - 1 ) * size; // 得到執行sql final String execSql = String.format(PAGE_SELECT_TEMPLATE_SQL, querySql, offset, size); log.info("=======>>>執行分頁sql為:{}", execSql); // 調用執行SQL數據 return queryData(execSql, fullScan); } /** * 執行分頁查詢 * @param querySql 分頁查詢sql * @param page 頁碼 從1開始 第n頁傳n * @param size 每頁記錄數 * @return 分頁查詢結果 */ public PageResult<Map<String, Object>> pageQueryMap(String querySql, Integer page, Integer size){ // 1、替換分號 querySql = querySql.replaceAll(";", ""); String countSql = String.format(PAGE_COUNT_TEMPLATE_SQL, querySql); log.info("=======>>>執行分頁統計總數sql為:{}", countSql); // 查詢總數 final List<Map<String, Object>> countMap = queryData(countSql, false); if (CollectionUtils.isEmpty(countMap)){ return new PageResult<>(0L, new ArrayList<>()); } long count = 0L; for (Object value : countMap.get(0).values()) { count = Long.parseLong(String.valueOf(value)); } if (count == 0){ return new PageResult<>(0L, new ArrayList<>()); } // 執行分頁查詢 開啟全表掃描 final List<Map<String, Object>> resultList = queryData(querySql, page, size, true); return new PageResult<>(count, resultList); } /** * 執行分頁查詢 * @param querySql 分頁查詢sql * @param page 頁碼 從1開始 第n頁傳n * @param size 每頁記錄數 * @return 分頁查詢結果 */ public <T>PageResult<T> pageQuery(String querySql, Integer page, Integer size, Class<T> clazz){ final PageResult<Map<String, Object>> result = pageQueryMap(querySql, page, size); List<T> rows = new ArrayList<>(); for (Map<String, Object> row : result.getRows()) { final T t = JSONObject.parseObject(JSONObject.toJSONString(row), clazz); rows.add(t); } return new PageResult<>(result.getTotal(), rows); }
1.2.2 工具類測試
使用測試數據測試工具類
public static void main(String[] args) { // 構建連接參數 final MaxComputeSdkConnParam connParam = new MaxComputeSdkConnParam(); connParam.setAliyunAccessId("您的阿裡雲賬號accessId"); connParam.setAliyunAccessKey("您的阿裡雲賬號accessKey"); connParam.setProjectName("項目名"); // 實例化工具類 final MaxComputeSdkUtil sdkUtil = new MaxComputeSdkUtil(connParam); // 查詢所有表 final List<TableMetaInfo> tableInfos = sdkUtil.getTableInfos(); for (TableMetaInfo tableInfo : tableInfos) { System.out.println(tableInfo.getTableName()); } // 分頁查詢數據 final PageResult<Map<String, Object>> page = sdkUtil.pageQueryMap("select * from ods_cust;", 2, 10); System.out.println(page.getTotal()); for (Map<String, Object> map : page.getRows()) { System.out.println(JSONObject.toJSONString(map)); } }
1.2.3 為什麼要開啟全表掃描
maxCompute存在使用限制如下:
當使用select語句時,屏顯最多隻能顯示10000行結果。當select語句作為子句時則無此限制,select子句會將全部結果返回給上層查詢。
select語句查詢分區表時默認禁止全表掃描。
自2018年1月10日20:00:00後,在新創建的項目上執行SQL語句時,默認情況下,針對該項目裡的分區表不允許執行全表掃描操作。在查詢分區表數據時必須指定分區,由此減少SQL的不必要I/O,從而減少計算資源的浪費以及按量計費模式下不必要的計算費用。
如果您需要對分區表進行全表掃描,可以在全表掃描的SQL語句前加上命令set odps.sql.allow.fullscan=true;,並和SQL語句一起提交執行。假設sale_detail表為分區表,需要同時執行如下語句進行全表查詢:
2、JDBC方式集成
使用odps-jdbc集成, 官方文檔地址MaxCompute Java JDBC介紹
<properties> <java.version>1.8</java.version> <!--maxCompute-jdbc-版本號--> <max-compute-jdbc.version>3.0.1</max-compute-jdbc.version> </properties> <dependencies> <!--max compute jdbc--> <dependency> <groupId>com.aliyun.odps</groupId> <artifactId>odps-jdbc</artifactId> <version>${max-compute-jdbc.version}</version> <classifier>jar-with-dependencies</classifier> </dependency> </dependencies>
2.2、編寫連接工具類
編寫MaxComputeSdkUtil以JDBC方式連接MaxCompute
2.2.1、重要類和方法說明
1、連接參數類:
@Data public class MaxComputeJdbcConnParam { /**阿裡雲accessId 相當於用戶名 */ private String aliyunAccessId; /**阿裡雲accessKey 相當於密碼 */ private String aliyunAccessKey; /** maxcompute_endpoint */ private String endpoint; /**項目名稱*/ private String projectName; }
2、公共方法(初始化)
/**JDBC 驅動名稱*/ private static final String DRIVER_NAME = "com.aliyun.odps.jdbc.OdpsDriver"; private static final String SELECT_ALL_TABLE_SQL = "select table_name, table_comment from Information_Schema.TABLES"; private static final String SELECT_FIELD_BY_TABLE_SQL = "select column_name, column_comment from Information_Schema.COLUMNS where table_name = '%s'"; /**分頁查詢sql模板*/ private static final String PAGE_SELECT_TEMPLATE_SQL = "select z.* from (%s) z limit %s, %s;"; /**分頁查詢統計數量模板SQL*/ private static final String PAGE_COUNT_TEMPLATE_SQL = "select count(1) from (%s) z;"; /**連接*/ private final Connection conn; /** * 連接參數 */ private final MaxComputeJdbcConnParam connParam; public MaxComputeJdbcUtil(MaxComputeJdbcConnParam connParam) { this.connParam = connParam; this.conn = buildConn(); } /** * 創建連接 * @return 數據庫連接 */ private Connection buildConn() { try { Class.forName(DRIVER_NAME); } catch (ClassNotFoundException e) { e.printStackTrace(); throw new BizException(ResultCode.MAX_COMPUTE_JDBC_DRIVE_LOAD_ERR); } try { // JDBCURL連接模板 String jdbcUrlTemplate = "jdbc:odps:%s?project=%s&useProjectTimeZone=true"; // 使用驅動管理器連接獲取連接 return DriverManager.getConnection( String.format(jdbcUrlTemplate, connParam.getEndpoint(), connParam.getProjectName()), connParam.getAliyunAccessId(), connParam.getAliyunAccessKey()); } catch (SQLException e) { e.printStackTrace(); throw new BizException(ResultCode.MAX_COMPUTE_JDBC_DRIVE_LOAD_ERR); } }
3、查詢表信息
/** * 獲取表信息 * @return 表信息列表 */ public List<TableMetaInfo> getTableInfos(){ List<TableMetaInfo> resultList = new ArrayList<>(); Statement statement = null; ResultSet resultSet = null; try { // 創建statement 使用SQL直接查詢 statement = conn.createStatement(); // 執行查詢語句 resultSet = statement.executeQuery(SELECT_ALL_TABLE_SQL); while (resultSet.next()){ final String tableName = resultSet.getString("table_name"); final String tableComment = resultSet.getString("table_comment"); final TableMetaInfo info = new TableMetaInfo(tableName, tableComment); resultList.add(info); } return resultList; } catch (SQLException e) { e.printStackTrace(); throw new BizException(ResultCode.MAX_COMPUTE_SQL_EXEC_ERR); } finally { // 關閉resultSet closeResultSet(resultSet); // 關閉statement closeStatement(statement); } }
4、執行SQL封裝
/** * 執行sql查詢 * @param querySql 查詢sql * @return List<Map<String, Object>> */ public List<Map<String, Object>> queryData(String querySql){ List<Map<String, Object>> resultList = new ArrayList<>(); Statement statement = null; ResultSet resultSet = null; try { // 創建statement statement = conn.createStatement(); // 執行查詢語句 resultSet = statement.executeQuery(querySql); // 構建結果返回 buildMapByRs(resultList, resultSet); return resultList; } catch (SQLException e) { e.printStackTrace(); throw new BizException(ResultCode.MAX_COMPUTE_SQL_EXEC_ERR); } finally { // 關閉resultSet closeResultSet(resultSet); // 關閉statement closeStatement(statement); } } /** * 將ResultSet轉換為List<Map<String, Object>> * @param resultList 轉換的集合 * @param resultSet ResultSet * @throws SQLException e */ private void buildMapByRs(List<Map<String, Object>> resultList, ResultSet resultSet) throws SQLException { // 獲取元數據 ResultSetMetaData metaData = resultSet.getMetaData(); while (resultSet.next()) { // 獲取列數 int columnCount = metaData.getColumnCount(); Map<String, Object> map = new HashMap<>(); for (int i = 0; i < columnCount; i++) { String columnName = metaData.getColumnName(i + 1); Object object = resultSet.getObject(columnName); // maxCompute裡面的空返回的是使用\n if ("\\N".equalsIgnoreCase(String.valueOf(object))) { map.put(columnName, ""); } else { map.put(columnName, object); } } resultList.add(map); } } private void closeStatement(Statement statement){ if (statement != null){ try { statement.close(); } catch (SQLException e) { e.printStackTrace(); } } } private void closeResultSet(ResultSet resultSet){ if (resultSet != null){ try { resultSet.close(); } catch (SQLException e) { e.printStackTrace(); } } }
5、分頁查詢分裝
/** * 執行sql查詢 * @param querySql 查詢sql * @return List<Map<String, Object>> */ public List<Map<String, Object>> queryData(String querySql, Integer page, Integer size){ List<Map<String, Object>> resultList = new ArrayList<>(); Statement statement = null; ResultSet resultSet = null; try { // 1、替換分號 querySql = querySql.replaceAll(";", ""); // 創建statement statement = conn.createStatement(); // 2、格式化SQL int offset = (page - 1 ) * size; final String execSql = String.format(PAGE_SELECT_TEMPLATE_SQL, querySql, offset, size); log.info("=======>>>執行分頁sql為:{}", execSql); // 執行查詢語句 resultSet = statement.executeQuery(execSql); // 構建結果返回 buildMapByRs(resultList, resultSet); return resultList; } catch (SQLException e) { e.printStackTrace(); throw new BizException(ResultCode.MAX_COMPUTE_SQL_EXEC_ERR); } finally { // 關閉resultSet closeResultSet(resultSet); // 關閉statement closeStatement(statement); } } /** * 執行分頁查詢 * @param querySql 分頁查詢sql * @param page 頁碼 從1開始 第n頁傳n * @param size 每頁記錄數 * @return 分頁查詢結果 */ public PageResult<Map<String, Object>> pageQueryMap(String querySql, Integer page, Integer size){ // 1、替換分號 querySql = querySql.replaceAll(";", ""); String countSql = String.format(PAGE_COUNT_TEMPLATE_SQL, querySql); log.info("=======>>>執行分頁統計總數sql為:{}", countSql); // 查詢總數 final List<Map<String, Object>> countMap = queryData(countSql); if (CollectionUtils.isEmpty(countMap)){ return new PageResult<>(0L, new ArrayList<>()); } long count = 0L; for (Object value : countMap.get(0).values()) { count = Long.parseLong(String.valueOf(value)); } if (count == 0){ return new PageResult<>(0L, new ArrayList<>()); } // 執行分頁查詢 開啟全表掃描 final List<Map<String, Object>> resultList = queryData(querySql, page, size); return new PageResult<>(count, resultList); } /** * 執行分頁查詢 * @param querySql 分頁查詢sql * @param page 頁碼 從1開始 第n頁傳n * @param size 每頁記錄數 * @return 分頁查詢結果 */ public <T>PageResult<T> pageQuery(String querySql, Integer page, Integer size, Class<T> clazz){ final PageResult<Map<String, Object>> result = pageQueryMap(querySql, page, size); List<T> rows = new ArrayList<>(); for (Map<String, Object> row : result.getRows()) { final T t = JSONObject.parseObject(JSONObject.toJSONString(row), clazz); rows.add(t); } return new PageResult<>(result.getTotal(), rows); }
2.2.2 工具類測試
使用測試數據測試工具類
public static void main(String[] args) { final MaxComputeJdbcConnParam connParam = new MaxComputeJdbcConnParam(); connParam.setAliyunAccessId("您的阿裡雲賬號accessId"); connParam.setAliyunAccessKey("您的阿裡雲賬號accessKey"); connParam.setProjectName("項目名"); connParam.setEndpoint("http://service.cn-hangzhou.maxcompute.aliyun.com/api"); final MaxComputeJdbcUtil jdbcUtil = new MaxComputeJdbcUtil(connParam); // 獲取表信息 final List<TableMetaInfo> tableInfos = jdbcUtil.getTableInfos(); for (TableMetaInfo tableInfo : tableInfos) { System.out.println(tableInfo); } // 獲取字段信息 final String tableName = tableInfos.get(new Random().nextInt(tableInfos.size())).getTableName(); final List<TableColumnMetaInfo> fields = jdbcUtil.getFieldByTableName(tableName); for (TableColumnMetaInfo field : fields) { System.out.println(field.getFieldName() + "-" + field.getComment()); } // 執行查詢 final List<Map<String, Object>> list = jdbcUtil.queryData("select * from ods_cust;"); for (Map<String, Object> map : list) { System.out.println(JSONObject.toJSONString(map)); } // 執行分頁查詢 final List<Map<String, Object>> list2 = jdbcUtil.queryData("select * from ods_cust;", 2, 10); for (Map<String, Object> map : list2) { System.out.println(JSONObject.toJSONString(map)); } // 執行分頁查詢 並返回count final PageResult<Map<String, Object>> list3 = jdbcUtil.pageQueryMap("select * from ods_cust;", 2, 10); System.out.println(list3.getTotal()); for (Map<String, Object> map : list3.getRows()) { System.out.println(JSONObject.toJSONString(map)); } jdbcUtil.close(); }
項目地址
springboot集成maxCompute
到此這篇關於SpringBoot集成MaxCompute的文章就介紹到這瞭,更多相關SpringBoot集成MaxCompute內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- Java原生操作JDBC連接以及原理詳解
- 關於Java 項目封裝sqlite連接池操作持久化數據的方法
- 在Jpa框架下拼接原生sql 並執行的操作
- Redis中pop出隊列多個元素思考
- IDEA 鏈接Mysql數據庫並執行查詢操作的完整代碼