SpringBoot集成MaxCompute的示例代碼

1、SDK方式集成

使用odps-sdk-core集成, 官方文檔地址MaxCompute Java SDK介紹

1.1、依賴引入odps-sdk-core

<properties>
    <java.version>1.8</java.version>
    <!--maxCompute sdk 版本號-->
    <max-compute-sdk.version>0.40.8-public</max-compute-sdk.version>
</properties>

<dependencies>
  <!--max compute sdk-->
  <dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>odps-sdk-core</artifactId>
    <version>${max-compute-sdk.version}</version>
</dependency>
</dependencies>

1.2、編寫連接工具類

編寫MaxComputeSdkUtil以SDK方式連接MaxCompute

1.2.1、重要類和方法說明

1、連接參數類:

@Data
public class MaxComputeSdkConnParam {
    /**阿裡雲accessId 相當於用戶名 */
    private String aliyunAccessId;
    /**阿裡雲accessKey 相當於密碼 */
    private String aliyunAccessKey;
    /**阿裡雲maxCompute服務接口地址 默認是http://service.odps.aliyun.com/api*/
    private String maxComputeEndpoint;
    /**項目名稱*/
    private String projectName;
}

2、查詢表元數據信息實體

主要是字段:tableName, comment。還可以自己添加其他字段

@Data
@NoArgsConstructor
@AllArgsConstructor
public class TableMetaInfo {
    /**表名稱*/
    private String tableName;
    /**表註釋*/
    private String comment;
}

3、公共方法(初始化)

/**默認的odps接口地址 在Odps中也可以看到該變量*/
private static final String defaultEndpoint = "http://service.odps.aliyun.com/api";
/**開啟全表掃描的配置*/
private static final String FULL_SCAN_CONFIG = "odps.sql.allow.fullscan";
/**分頁查詢sql模板*/
private static final String PAGE_SELECT_TEMPLATE_SQL = "select z.* from (%s) z limit %s, %s;";
/**分頁查詢統計數量模板SQL*/
private static final String PAGE_COUNT_TEMPLATE_SQL = "select count(1) from (%s) z;";
/**sdk的odps客戶端*/
private final Odps odps;

/**odps連接參數*/
private final MaxComputeSdkConnParam connParam;

public MaxComputeSdkUtil(MaxComputeSdkConnParam param){
    this.connParam = param;
    // 構建odps客戶端
    this.odps = buildOdps();
}

/**
 * 構建odps客戶端 用於執行sql等操作
 * @return odps客戶端
 */
private Odps buildOdps() {
    // 阿裡雲賬號密碼  AccessId 和 AccessKey
    final String aliyunAccessId = connParam.getAliyunAccessId();
    final String aliyunAccessKey = connParam.getAliyunAccessKey();
    // 創建阿裡雲賬戶
    final AliyunAccount aliyunAccount = new AliyunAccount(aliyunAccessId, aliyunAccessKey);

    // 使用阿裡雲賬戶創建odps客戶端
    final Odps odps = new Odps(aliyunAccount);

    // 傳入瞭的話就是用傳入的 沒有傳入使用默認的
    final String endpoint = connParam.getMaxComputeEndpoint();
    try {
        odps.setEndpoint(ObjectUtils.isEmpty(endpoint) ? defaultEndpoint : endpoint);
    } catch (Exception e) {
        // 端點格式不正確
        throw new BizException(ResultCode.MAX_COMPUTE_ENDPOINT_ERR);
    }

    // 設置項目
    odps.setDefaultProject(connParam.getProjectName());
    return odps;
}

4、查詢表信息

/**
 * 獲取表信息
 */
public List<TableMetaInfo> getTableInfos(){
    final Tables tables = odps.tables();
    List<TableMetaInfo> resultTables = new ArrayList<>();
    try {
        for (Table table : tables) {
            // tableName
            final String name = table.getName();
            // 描述
            final String comment = table.getComment();
            final TableMetaInfo info = new TableMetaInfo(name, comment);
            resultTables.add(info);
        }
    } catch (Exception e) {
        e.printStackTrace();
        final String errMsg = ObjectUtils.isEmpty(e.getMessage()) ? "" : e.getMessage();
        if (errMsg.contains("ODPS-0410051:Invalid credentials")){
            throw new BizException(ResultCode.MAX_COMPUTE_UNAME_ERR);
        }
        if (errMsg.contains("ODPS-0410042:Invalid signature value")){
            throw new BizException(ResultCode.MAX_COMPUTE_PWD_ERR);
        }
        if (errMsg.contains("ODPS-0420095: Access Denied")){
            throw new BizException(ResultCode.MAX_COMPUTE_PROJECT_ERR);
        }
    }
    return resultTables;
}

5、執行SQL封裝

/**
 * 執行sql查詢
 * @param querySql 查詢sql
 * @param fullScan 是否開啟全表掃描 如果查詢多個分區數據,需要開啟全表掃描
 * @return List<Map<String, Object>>
 */
public List<Map<String, Object>> queryData(String querySql, boolean fullScan){
    try {
        // 配置全表掃描嗎
        configFullScan(fullScan);
        // 使用任務執行SQL
        final Instance instance = SQLTask.run(odps, querySql);
        // 等待執行成功
        instance.waitForSuccess();
        // 封裝返回結果
        List<Record> records = SQLTask.getResult(instance);
        // 結果轉換為Map
        return buildMapByRecords(records);
    } catch (OdpsException e) {
        e.printStackTrace();
        throw new BizException(ResultCode.MAX_COMPUTE_SQL_EXEC_ERR);
    }
}

/**
 * 開啟和移除全表掃描配置
 * @param fullScan 是否全表掃描
 */
private void configFullScan(boolean fullScan) {
        if (fullScan){
        // 開啟全表掃描配置
        Map<String, String> config = new HashMap<>();
        log.info("===>>開啟全表掃描, 查詢多個分區數據");
        config.put(FULL_SCAN_CONFIG, "true");
        odps.setGlobalSettings(config);
        }else {
        // 移除全表掃描配置
        odps.getGlobalSettings().remove(FULL_SCAN_CONFIG);
        }
    }

/**
 * 將List<Record>準換為List<Map></>
 * @param records sql查詢結果
 * @return 返回結果
 */
private List<Map<String, Object>> buildMapByRecords(List<Record> records) {
        List<Map<String, Object>> listMap = new ArrayList<>();
        for (Record record : records) {
        Column[] columns = record.getColumns();
        Map<String, Object> map = new LinkedHashMap<>();
        for (Column column : columns) {
        String name = column.getName();
        Object value = record.get(name);
        // maxCompute裡面的空返回的是使用\n
        if ("\\N".equalsIgnoreCase(String.valueOf(value))) {
        map.put(name, "");
        } else {
        map.put(name, value);
        }
        }
        listMap.add(map);
        }
        return listMap;
    }

6、分頁查詢分裝

/**
 * 執行sql查詢【分頁查詢】
 * @param querySql 查詢sql
 * @param page 頁碼 從1開始 第n頁傳n
 * @param size 每頁記錄數
 * @param fullScan 是否開啟全表掃描 如果查詢多個分區數據,需要開啟全表掃描
 * @return List<Map<String, Object>>
 */
public List<Map<String, Object>> queryData(String querySql, Integer page, Integer size, boolean fullScan){
    // 重寫SQl,添加limit offset, limit
    // 1、替換分號
    querySql = querySql.replaceAll(";", "");
    // 2、格式化SQL
    Integer offset = (page - 1 ) * size;
    // 得到執行sql
    final String execSql = String.format(PAGE_SELECT_TEMPLATE_SQL, querySql, offset, size);
    log.info("=======>>>執行分頁sql為:{}", execSql);

    // 調用執行SQL數據
    return queryData(execSql, fullScan);
}


/**
 * 執行分頁查詢
 * @param querySql 分頁查詢sql
 * @param page 頁碼 從1開始 第n頁傳n
 * @param size 每頁記錄數
 * @return 分頁查詢結果
 */
public PageResult<Map<String, Object>> pageQueryMap(String querySql, Integer page, Integer size){
    // 1、替換分號
    querySql = querySql.replaceAll(";", "");
    String countSql = String.format(PAGE_COUNT_TEMPLATE_SQL, querySql);
    log.info("=======>>>執行分頁統計總數sql為:{}", countSql);
    // 查詢總數
    final List<Map<String, Object>> countMap = queryData(countSql, false);
    if (CollectionUtils.isEmpty(countMap)){
        return new PageResult<>(0L, new ArrayList<>());
    }

    long count = 0L;
    for (Object value : countMap.get(0).values()) {
        count = Long.parseLong(String.valueOf(value));
    }

    if (count == 0){
        return new PageResult<>(0L, new ArrayList<>());
    }

    // 執行分頁查詢 開啟全表掃描
    final List<Map<String, Object>> resultList = queryData(querySql, page, size, true);

    return new PageResult<>(count, resultList);
}


/**
 * 執行分頁查詢
 * @param querySql 分頁查詢sql
 * @param page 頁碼 從1開始 第n頁傳n
 * @param size 每頁記錄數
 * @return 分頁查詢結果
 */
public <T>PageResult<T> pageQuery(String querySql, Integer page, Integer size, Class<T> clazz){
    final PageResult<Map<String, Object>> result = pageQueryMap(querySql, page, size);
    List<T> rows = new ArrayList<>();
    for (Map<String, Object> row : result.getRows()) {
        final T t = JSONObject.parseObject(JSONObject.toJSONString(row), clazz);
        rows.add(t);
    }
    return new PageResult<>(result.getTotal(), rows);
}

1.2.2 工具類測試

使用測試數據測試工具類

public static void main(String[] args) {
    // 構建連接參數
    final MaxComputeSdkConnParam connParam = new MaxComputeSdkConnParam();
    connParam.setAliyunAccessId("您的阿裡雲賬號accessId");
    connParam.setAliyunAccessKey("您的阿裡雲賬號accessKey");
    connParam.setProjectName("項目名");

    // 實例化工具類
    final MaxComputeSdkUtil sdkUtil = new MaxComputeSdkUtil(connParam);

    // 查詢所有表
    final List<TableMetaInfo> tableInfos = sdkUtil.getTableInfos();
    for (TableMetaInfo tableInfo : tableInfos) {
        System.out.println(tableInfo.getTableName());
    }

    // 分頁查詢數據
    final PageResult<Map<String, Object>> page = sdkUtil.pageQueryMap("select * from ods_cust;", 2, 10);
    System.out.println(page.getTotal());
    for (Map<String, Object> map : page.getRows()) {
        System.out.println(JSONObject.toJSONString(map));
    }
}

1.2.3 為什麼要開啟全表掃描

maxCompute存在使用限制如下:

當使用select語句時,屏顯最多隻能顯示10000行結果。當select語句作為子句時則無此限制,select子句會將全部結果返回給上層查詢。
select語句查詢分區表時默認禁止全表掃描。
自2018年1月10日20:00:00後,在新創建的項目上執行SQL語句時,默認情況下,針對該項目裡的分區表不允許執行全表掃描操作。在查詢分區表數據時必須指定分區,由此減少SQL的不必要I/O,從而減少計算資源的浪費以及按量計費模式下不必要的計算費用。

如果您需要對分區表進行全表掃描,可以在全表掃描的SQL語句前加上命令set odps.sql.allow.fullscan=true;,並和SQL語句一起提交執行。假設sale_detail表為分區表,需要同時執行如下語句進行全表查詢:

2、JDBC方式集成

使用odps-jdbc集成, 官方文檔地址MaxCompute Java JDBC介紹

<properties>
    <java.version>1.8</java.version>
    <!--maxCompute-jdbc-版本號-->
    <max-compute-jdbc.version>3.0.1</max-compute-jdbc.version>
</properties>

<dependencies>
  <!--max compute jdbc-->
  <dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>odps-jdbc</artifactId>
    <version>${max-compute-jdbc.version}</version>
    <classifier>jar-with-dependencies</classifier>
  </dependency>
</dependencies>

2.2、編寫連接工具類

編寫MaxComputeSdkUtil以JDBC方式連接MaxCompute

2.2.1、重要類和方法說明

1、連接參數類:

@Data
public class MaxComputeJdbcConnParam {
  /**阿裡雲accessId 相當於用戶名 */
  private String aliyunAccessId;
  /**阿裡雲accessKey 相當於密碼 */
  private String aliyunAccessKey;
  /** maxcompute_endpoint */
  private String endpoint;
  /**項目名稱*/
  private String projectName;
}

2、公共方法(初始化)

/**JDBC 驅動名稱*/
private static final String DRIVER_NAME = "com.aliyun.odps.jdbc.OdpsDriver";

private static final String SELECT_ALL_TABLE_SQL = "select table_name, table_comment from Information_Schema.TABLES";

private static final String SELECT_FIELD_BY_TABLE_SQL = "select column_name, column_comment from Information_Schema.COLUMNS where table_name = '%s'";
/**分頁查詢sql模板*/
private static final String PAGE_SELECT_TEMPLATE_SQL = "select z.* from (%s) z limit %s, %s;";
/**分頁查詢統計數量模板SQL*/
private static final String PAGE_COUNT_TEMPLATE_SQL = "select count(1) from (%s) z;";
/**連接*/
private final Connection conn;

/**
 * 連接參數
 */
private final MaxComputeJdbcConnParam connParam;

public MaxComputeJdbcUtil(MaxComputeJdbcConnParam connParam) {
    this.connParam = connParam;
    this.conn = buildConn();
}

/**
 * 創建連接
 * @return 數據庫連接
 */
private Connection buildConn() {
    try {
        Class.forName(DRIVER_NAME);
    } catch (ClassNotFoundException e) {
        e.printStackTrace();
        throw new BizException(ResultCode.MAX_COMPUTE_JDBC_DRIVE_LOAD_ERR);
    }

    try {
        // JDBCURL連接模板
        String jdbcUrlTemplate = "jdbc:odps:%s?project=%s&useProjectTimeZone=true";
        // 使用驅動管理器連接獲取連接
        return DriverManager.getConnection(
                String.format(jdbcUrlTemplate, connParam.getEndpoint(), connParam.getProjectName()),
                connParam.getAliyunAccessId(), connParam.getAliyunAccessKey());
    } catch (SQLException e) {
        e.printStackTrace();
        throw new BizException(ResultCode.MAX_COMPUTE_JDBC_DRIVE_LOAD_ERR);
    }
}

3、查詢表信息

/**
 * 獲取表信息
 * @return 表信息列表
 */
public List<TableMetaInfo> getTableInfos(){
    List<TableMetaInfo> resultList = new ArrayList<>();
    Statement statement = null;
    ResultSet resultSet = null;
    try {
        // 創建statement 使用SQL直接查詢
        statement = conn.createStatement();
        // 執行查詢語句
        resultSet = statement.executeQuery(SELECT_ALL_TABLE_SQL);
        while (resultSet.next()){
            final String tableName = resultSet.getString("table_name");
            final String tableComment = resultSet.getString("table_comment");
            final TableMetaInfo info = new TableMetaInfo(tableName, tableComment);
            resultList.add(info);
        }

        return resultList;
    } catch (SQLException e) {
        e.printStackTrace();
        throw new BizException(ResultCode.MAX_COMPUTE_SQL_EXEC_ERR);
    } finally {
        // 關閉resultSet
        closeResultSet(resultSet);
        // 關閉statement
        closeStatement(statement);
    }
}

4、執行SQL封裝

/**
 * 執行sql查詢
 * @param querySql 查詢sql
 * @return List<Map<String, Object>>
 */
public List<Map<String, Object>> queryData(String querySql){
    List<Map<String, Object>> resultList = new ArrayList<>();
    Statement statement = null;
    ResultSet resultSet = null;
    try {
        // 創建statement
        statement = conn.createStatement();

        // 執行查詢語句
        resultSet = statement.executeQuery(querySql);

        // 構建結果返回
        buildMapByRs(resultList, resultSet);

        return resultList;
    } catch (SQLException e) {
        e.printStackTrace();
        throw new BizException(ResultCode.MAX_COMPUTE_SQL_EXEC_ERR);
    } finally {
        // 關閉resultSet
        closeResultSet(resultSet);
        // 關閉statement
        closeStatement(statement);
    }
}

/**
 * 將ResultSet轉換為List<Map<String, Object>>
 * @param resultList 轉換的集合
 * @param resultSet ResultSet
 * @throws SQLException e
 */
private void buildMapByRs(List<Map<String, Object>> resultList, ResultSet resultSet) throws SQLException {
    // 獲取元數據
    ResultSetMetaData metaData = resultSet.getMetaData();
    while (resultSet.next()) {
        // 獲取列數
        int columnCount = metaData.getColumnCount();
        Map<String, Object> map = new HashMap<>();
        for (int i = 0; i < columnCount; i++) {
            String columnName = metaData.getColumnName(i + 1);
            Object object = resultSet.getObject(columnName);
            // maxCompute裡面的空返回的是使用\n
            if ("\\N".equalsIgnoreCase(String.valueOf(object))) {
                map.put(columnName, "");
            } else {
                map.put(columnName, object);
            }
        }
        resultList.add(map);
    }
}


private void closeStatement(Statement statement){
    if (statement != null){
        try {
            statement.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

private void closeResultSet(ResultSet resultSet){
    if (resultSet != null){
        try {
            resultSet.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

5、分頁查詢分裝

/**
 * 執行sql查詢
 * @param querySql 查詢sql
 * @return List<Map<String, Object>>
 */
public List<Map<String, Object>> queryData(String querySql, Integer page, Integer size){
    List<Map<String, Object>> resultList = new ArrayList<>();
    Statement statement = null;
    ResultSet resultSet = null;
    try {
        // 1、替換分號
        querySql = querySql.replaceAll(";", "");
        // 創建statement
        statement = conn.createStatement();
        // 2、格式化SQL
        int offset = (page - 1 ) * size;
        final String execSql = String.format(PAGE_SELECT_TEMPLATE_SQL, querySql, offset, size);
        log.info("=======>>>執行分頁sql為:{}", execSql);
        // 執行查詢語句
        resultSet = statement.executeQuery(execSql);

        // 構建結果返回
        buildMapByRs(resultList, resultSet);
        return resultList;
    } catch (SQLException e) {
        e.printStackTrace();
        throw new BizException(ResultCode.MAX_COMPUTE_SQL_EXEC_ERR);
    } finally {
        // 關閉resultSet
        closeResultSet(resultSet);
        // 關閉statement
        closeStatement(statement);
    }
}


/**
 * 執行分頁查詢
 * @param querySql 分頁查詢sql
 * @param page 頁碼 從1開始 第n頁傳n
 * @param size 每頁記錄數
 * @return 分頁查詢結果
 */
public PageResult<Map<String, Object>> pageQueryMap(String querySql, Integer page, Integer size){
    // 1、替換分號
    querySql = querySql.replaceAll(";", "");
    String countSql = String.format(PAGE_COUNT_TEMPLATE_SQL, querySql);
    log.info("=======>>>執行分頁統計總數sql為:{}", countSql);
    // 查詢總數
    final List<Map<String, Object>> countMap = queryData(countSql);
    if (CollectionUtils.isEmpty(countMap)){
        return new PageResult<>(0L, new ArrayList<>());
    }

    long count = 0L;
    for (Object value : countMap.get(0).values()) {
        count = Long.parseLong(String.valueOf(value));
    }

    if (count == 0){
        return new PageResult<>(0L, new ArrayList<>());
    }

    // 執行分頁查詢 開啟全表掃描
    final List<Map<String, Object>> resultList = queryData(querySql, page, size);

    return new PageResult<>(count, resultList);
}


/**
 * 執行分頁查詢
 * @param querySql 分頁查詢sql
 * @param page 頁碼 從1開始 第n頁傳n
 * @param size 每頁記錄數
 * @return 分頁查詢結果
 */
public <T>PageResult<T> pageQuery(String querySql, Integer page, Integer size, Class<T> clazz){
    final PageResult<Map<String, Object>> result = pageQueryMap(querySql, page, size);
    List<T> rows = new ArrayList<>();
    for (Map<String, Object> row : result.getRows()) {
        final T t = JSONObject.parseObject(JSONObject.toJSONString(row), clazz);
        rows.add(t);
    }
    return new PageResult<>(result.getTotal(), rows);
}

2.2.2 工具類測試

使用測試數據測試工具類

public static void main(String[] args) {
    final MaxComputeJdbcConnParam connParam = new MaxComputeJdbcConnParam();
    connParam.setAliyunAccessId("您的阿裡雲賬號accessId");
    connParam.setAliyunAccessKey("您的阿裡雲賬號accessKey");
    connParam.setProjectName("項目名");
    connParam.setEndpoint("http://service.cn-hangzhou.maxcompute.aliyun.com/api");
    final MaxComputeJdbcUtil jdbcUtil = new MaxComputeJdbcUtil(connParam);

    // 獲取表信息
    final List<TableMetaInfo> tableInfos = jdbcUtil.getTableInfos();
    for (TableMetaInfo tableInfo : tableInfos) {
        System.out.println(tableInfo);
    }

    // 獲取字段信息
    final String tableName = tableInfos.get(new Random().nextInt(tableInfos.size())).getTableName();
    final List<TableColumnMetaInfo> fields = jdbcUtil.getFieldByTableName(tableName);
    for (TableColumnMetaInfo field : fields) {
        System.out.println(field.getFieldName() + "-" + field.getComment());
    }

    // 執行查詢
    final List<Map<String, Object>> list = jdbcUtil.queryData("select * from ods_cust;");
    for (Map<String, Object> map : list) {
        System.out.println(JSONObject.toJSONString(map));
    }

    // 執行分頁查詢
    final List<Map<String, Object>> list2 = jdbcUtil.queryData("select * from ods_cust;", 2, 10);
    for (Map<String, Object> map : list2) {
        System.out.println(JSONObject.toJSONString(map));
    }

    // 執行分頁查詢 並返回count
    final PageResult<Map<String, Object>> list3 = jdbcUtil.pageQueryMap("select * from ods_cust;", 2, 10);
    System.out.println(list3.getTotal());
    for (Map<String, Object> map : list3.getRows()) {
        System.out.println(JSONObject.toJSONString(map));
    }

    jdbcUtil.close();
}

項目地址

springboot集成maxCompute

到此這篇關於SpringBoot集成MaxCompute的文章就介紹到這瞭,更多相關SpringBoot集成MaxCompute內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: