SpringBoot集成SFTP客戶端實現文件上傳下載實例

背景

在項目開發中,一般文件存儲很少再使用SFTP服務,但是也不排除合作夥伴使用SFTP來存儲項目中的文件或者通過SFTP來實現文件數據的交互。

我遇到的項目中,就有銀行和保險公司等合作夥伴通過SFTP服務來實現與我們項目的文件數據的交互。

為瞭能夠順利地完成與友商的SFTP服務的連通,我們需要在自己的項目中實現一套SFTP客戶端工具。一般我們會采用Jsch來實現SFTP客戶端。

依賴

<!--執行遠程操作-->
<dependency>
    <groupId>com.jcraft</groupId>
    <artifactId>jsch</artifactId>
    <version>0.1.55</version>
</dependency>
     <!--鏈接池-->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.11.1</version>
</dependency>

首先我們一定要引入jsch依賴,這個是我們實現SFTP客戶端的基石;其次我們引入瞭鏈接池工具,為瞭避免每次執行SFTP命令都要重新創建鏈接,我們使用池化的方式優化瞭比較消耗資源的創建操作。

創建工具類

為瞭更好的使用SFTP工具,我們把jsch中關於SFTP的相關功能提煉出來,做瞭一次簡單的封裝,做成瞭我們可以直接使用的工具類。

裡面隻有兩類方法:

1.創建Session與開啟Session;

session創建好後,還不能創建channel,需要開啟session後才能創建channel;

2.創建channel與開啟channel;

channel也是一樣,創建好的channel需要開啟後才能真正地執行命令;

public class JschUtil {
  /**
   * 創建session
   *
   * @param userName       用戶名
   * @param password       密碼
   * @param host           域名
   * @param port           端口
   * @param privateKeyFile 密鑰文件
   * @param passphrase     口令
   * @return
   * @throws AwesomeException
   */
  public static Session createSession(String userName, String password, String host, int port, String privateKeyFile, String passphrase) throws AwesomeException {
    return createSession(new JSch(), userName, password, host, port, privateKeyFile, passphrase);
  }
  /**
   * 創建session
   *
   * @param jSch
   * @param userName       用戶名
   * @param password       密碼
   * @param host           域名
   * @param port           端口
   * @param privateKeyFile 密鑰
   * @param passphrase     口令
   * @return
   * @throws AwesomeException
   */
  public static Session createSession(JSch jSch, String userName, String password, String host, int port, String privateKeyFile, String passphrase) throws AwesomeException {
    try {
      if (!StringUtils.isEmpty(privateKeyFile)) {
        // 使用密鑰驗證方式,密鑰可以是有口令的密鑰,也可以是沒有口令的密鑰
        if (!StringUtils.isEmpty(passphrase)) {
          jSch.addIdentity(privateKeyFile, passphrase);
        } else {
          jSch.addIdentity(privateKeyFile);
        }
      }
      // 獲取session
      Session session = jSch.getSession(userName, host, port);
      if (!StringUtils.isEmpty(password)) {
        session.setPassword(password);
      }
      // 不校驗域名
      session.setConfig("StrictHostKeyChecking", "no");
      return session;
    } catch (Exception e) {
      throw new AwesomeException(500, "create session fail");
    }
  }
  /**
   * 創建session
   *
   * @param jSch
   * @param userName 用戶名
   * @param password 密碼
   * @param host     域名
   * @param port     端口
   * @return
   * @throws AwesomeException
   */
  public static Session createSession(JSch jSch, String userName, String password, String host, int port) throws AwesomeException {
    return createSession(jSch, userName, password, host, port, StringUtils.EMPTY, StringUtils.EMPTY);
  }
  /**
   * 創建session
   *
   * @param jSch
   * @param userName 用戶名
   * @param host     域名
   * @param port     端口
   * @return
   * @throws AwesomeException
   */
  private Session createSession(JSch jSch, String userName, String host, int port) throws AwesomeException {
    return createSession(jSch, userName, StringUtils.EMPTY, host, port, StringUtils.EMPTY, StringUtils.EMPTY);
  }
  /**
   * 開啟session鏈接
   *
   * @param jSch
   * @param userName       用戶名
   * @param password       密碼
   * @param host           域名
   * @param port           端口
   * @param privateKeyFile 密鑰
   * @param passphrase     口令
   * @param timeout        鏈接超時時間
   * @return
   * @throws AwesomeException
   */
  public static Session openSession(JSch jSch, String userName, String password, String host, int port, String privateKeyFile, String passphrase, int timeout) throws AwesomeException {
    Session session = createSession(jSch, userName, password, host, port, privateKeyFile, passphrase);
    try {
      if (timeout >= 0) {
        session.connect(timeout);
      } else {
        session.connect();
      }
      return session;
    } catch (Exception e) {
      throw new AwesomeException(500, "session connect fail");
    }
  }
  /**
   * 開啟session鏈接
   *
   * @param userName       用戶名
   * @param password       密碼
   * @param host           域名
   * @param port           端口
   * @param privateKeyFile 密鑰
   * @param passphrase     口令
   * @param timeout        鏈接超時時間
   * @return
   * @throws AwesomeException
   */
  public static Session openSession(String userName, String password, String host, int port, String privateKeyFile, String passphrase, int timeout) throws AwesomeException {
    Session session = createSession(userName, password, host, port, privateKeyFile, passphrase);
    try {
      if (timeout >= 0) {
        session.connect(timeout);
      } else {
        session.connect();
      }
      return session;
    } catch (Exception e) {
      throw new AwesomeException(500, "session connect fail");
    }
  }
  /**
   * 開啟session鏈接
   *
   * @param jSch
   * @param userName 用戶名
   * @param password 密碼
   * @param host     域名
   * @param port     端口
   * @param timeout  鏈接超時時間
   * @return
   * @throws AwesomeException
   */
  public static Session openSession(JSch jSch, String userName, String password, String host, int port, int timeout) throws AwesomeException {
    return openSession(jSch, userName, password, host, port, StringUtils.EMPTY, StringUtils.EMPTY, timeout);
  }
  /**
   * 開啟session鏈接
   *
   * @param userName 用戶名
   * @param password 密碼
   * @param host     域名
   * @param port     端口
   * @param timeout  鏈接超時時間
   * @return
   * @throws AwesomeException
   */
  public static Session openSession(String userName, String password, String host, int port, int timeout) throws AwesomeException {
    return openSession(userName, password, host, port, StringUtils.EMPTY, StringUtils.EMPTY, timeout);
  }
  /**
   * 開啟session鏈接
   *
   * @param jSch
   * @param userName 用戶名
   * @param host     域名
   * @param port     端口
   * @param timeout  鏈接超時時間
   * @return
   * @throws AwesomeException
   */
  public static Session openSession(JSch jSch, String userName, String host, int port, int timeout) throws AwesomeException {
    return openSession(jSch, userName, StringUtils.EMPTY, host, port, StringUtils.EMPTY, StringUtils.EMPTY, timeout);
  }
  /**
   * 開啟session鏈接
   *
   * @param userName 用戶名
   * @param host     域名
   * @param port     端口
   * @param timeout  鏈接超時時間
   * @return
   * @throws AwesomeException
   */
  public static Session openSession(String userName, String host, int port, int timeout) throws AwesomeException {
    return openSession(userName, StringUtils.EMPTY, host, port, StringUtils.EMPTY, StringUtils.EMPTY, timeout);
  }
  /**
   * 創建指定通道
   *
   * @param session
   * @param channelType
   * @return
   * @throws AwesomeException
   */
  public static Channel createChannel(Session session, ChannelType channelType) throws AwesomeException {
    try {
      if (!session.isConnected()) {
        session.connect();
      }
      return session.openChannel(channelType.getValue());
    } catch (Exception e) {
      throw new AwesomeException(500, "open channel fail");
    }
  }
  /**
   * 創建sftp通道
   *
   * @param session
   * @return
   * @throws AwesomeException
   */
  public static ChannelSftp createSftp(Session session) throws AwesomeException {
    return (ChannelSftp) createChannel(session, ChannelType.SFTP);
  }
  /**
   * 創建shell通道
   *
   * @param session
   * @return
   * @throws AwesomeException
   */
  public static ChannelShell createShell(Session session) throws AwesomeException {
    return (ChannelShell) createChannel(session, ChannelType.SHELL);
  }
  /**
   * 開啟通道
   *
   * @param session
   * @param channelType
   * @param timeout
   * @return
   * @throws AwesomeException
   */
  public static Channel openChannel(Session session, ChannelType channelType, int timeout) throws AwesomeException {
    Channel channel = createChannel(session, channelType);
    try {
      if (timeout >= 0) {
        channel.connect(timeout);
      } else {
        channel.connect();
      }
      return channel;
    } catch (Exception e) {
      throw new AwesomeException(500, "connect channel fail");
    }
  }
  /**
   * 開啟sftp通道
   *
   * @param session
   * @param timeout
   * @return
   * @throws AwesomeException
   */
  public static ChannelSftp openSftpChannel(Session session, int timeout) throws AwesomeException {
    return (ChannelSftp) openChannel(session, ChannelType.SFTP, timeout);
  }
  /**
   * 開啟shell通道
   *
   * @param session
   * @param timeout
   * @return
   * @throws AwesomeException
   */
  public static ChannelShell openShellChannel(Session session, int timeout) throws AwesomeException {
    return (ChannelShell) openChannel(session, ChannelType.SHELL, timeout);
  }
  enum ChannelType {
    SESSION("session"),
    SHELL("shell"),
    EXEC("exec"),
    X11("x11"),
    AGENT_FORWARDING("[email protected]"),
    DIRECT_TCPIP("direct-tcpip"),
    FORWARDED_TCPIP("forwarded-tcpip"),
    SFTP("sftp"),
    SUBSYSTEM("subsystem");
    private final String value;
    ChannelType(String value) {
      this.value = value;
    }
    public String getValue() {
      return this.value;
    }
  }
}

SFTP鏈接池化

我們通過實現BasePooledObjectFactory類來池化通道ChannelSftp。這並不是真正池化的代碼,下面的代碼隻是告知池化管理器如何創建對象和銷毀對象。

static class SftpFactory extends BasePooledObjectFactory<ChannelSftp> implements AutoCloseable {
    private Session session;
    private SftpProperties properties;
    // 初始化SftpFactory
    // 裡面主要是創建目標session,後續可用通過這個session不斷地創建ChannelSftp。
    SftpFactory(SftpProperties properties) throws AwesomeException {
      this.properties = properties;
      String username = properties.getUsername();
      String password = properties.getPassword();
      String host = properties.getHost();
      int port = properties.getPort();
      String privateKeyFile = properties.getPrivateKeyFile();
      String passphrase = properties.getPassphrase();
      session = JschUtil.createSession(username, password, host, port, privateKeyFile, passphrase);
    }
    // 銷毀對象,主要是銷毀ChannelSftp
    @Override
    public void destroyObject(PooledObject<ChannelSftp> p) throws Exception {
      p.getObject().disconnect();
    }
    // 創建對象ChannelSftp
    @Override
    public ChannelSftp create() throws Exception {
      int timeout = properties.getTimeout();
      return JschUtil.openSftpChannel(this.session, timeout);
    }
    // 包裝創建出來的對象
    @Override
    public PooledObject<ChannelSftp> wrap(ChannelSftp channelSftp) {
      return new DefaultPooledObject<>(channelSftp);
    }
    // 驗證對象是否可用
    @Override
    public boolean validateObject(PooledObject<ChannelSftp> p) {
      return p.getObject().isConnected();
    }
    // 銷毀資源,關閉session
    @Override
    public void close() throws Exception {
      if (Objects.nonNull(session)) {
        if (session.isConnected()) {
          session.disconnect();
        }
        session = null;
      }
    }
  }

為瞭實現真正的池化操作,我們還需要以下代碼:

1.我們需要在SftpClient對象中創建一個GenericObjectPool對象池,這個才是真正的池子,它負責創建和存儲所有的對象。

2.我們還需要提供資源銷毀的功能,也就是實現AutoCloseable,在服務停止時,需要把相關的資源銷毀。

public class SftpClient implements AutoCloseable {
  private SftpFactory sftpFactory;
  GenericObjectPool<ChannelSftp> objectPool;
  // 構造方法1
  public SftpClient(SftpProperties properties, GenericObjectPoolConfig<ChannelSftp> poolConfig) throws AwesomeException {
    this.sftpFactory = new SftpFactory(properties);
    objectPool = new GenericObjectPool<>(this.sftpFactory, poolConfig);
  }
  // 構造方法2
  public SftpClient(SftpProperties properties) throws AwesomeException {
    this.sftpFactory = new SftpFactory(properties);
    SftpProperties.PoolConfig config = properties.getPool();
    // 默認池化配置
    if (Objects.isNull(config)) {
      objectPool = new GenericObjectPool<>(this.sftpFactory);
    } else {
      // 自定義池化配置
      GenericObjectPoolConfig<ChannelSftp> poolConfig = new GenericObjectPoolConfig<>();
      poolConfig.setMaxIdle(config.getMaxIdle());
      poolConfig.setMaxTotal(config.getMaxTotal());
      poolConfig.setMinIdle(config.getMinIdle());
      poolConfig.setTestOnBorrow(config.isTestOnBorrow());
      poolConfig.setTestOnCreate(config.isTestOnCreate());
      poolConfig.setTestOnReturn(config.isTestOnReturn());
      poolConfig.setTestWhileIdle(config.isTestWhileIdle());
      poolConfig.setBlockWhenExhausted(config.isBlockWhenExhausted());
      poolConfig.setMaxWait(Duration.ofMillis(config.getMaxWaitMillis()));
      poolConfig.setTimeBetweenEvictionRuns(Duration.ofMillis(config.getTimeBetweenEvictionRunsMillis()));
      objectPool = new GenericObjectPool<>(this.sftpFactory, poolConfig);
    }
  }
  
  // 銷毀資源
    @Override
  public void close() throws Exception {
    // 銷毀鏈接池
    if (Objects.nonNull(this.objectPool)) {
      if (!this.objectPool.isClosed()) {
        this.objectPool.close();
      }
    }
    this.objectPool = null;
    // 銷毀sftpFactory
    if (Objects.nonNull(this.sftpFactory)) {
      this.sftpFactory.close();
    }
  }
}

SFTP鏈接池的使用

我們已經對鏈接池進行瞭初始化,下面我們就可以從鏈接池中獲取我們需要的ChannelSftp來實現文件的上傳下載瞭。

下面實現瞭多種文件上傳和下載的方式:

1.直接把本地文件上傳到SFTP服務器的指定路徑;

2.把InputStream輸入流提交到SFTP服務器指定路徑中;

3.可以針對以上兩種上傳方式進行進度的監測;

4.把SFTP服務器中的指定文件下載到本地機器上;

5.把SFTP服務器˙中的文件寫入指定的輸出流;

6.針對以上兩種下載方式,監測下載進度;

  /**
   * 上傳文件
   *
   * @param srcFilePath
   * @param targetDir
   * @param targetFileName
   * @return
   * @throws AwesomeException
   */
  public boolean uploadFile(String srcFilePath, String targetDir, String targetFileName) throws AwesomeException {
    return uploadFile(srcFilePath, targetDir, targetFileName, null);
  }
  /**
   * 上傳文件
   *
   * @param srcFilePath
   * @param targetDir
   * @param targetFileName
   * @param monitor
   * @return
   * @throws AwesomeException
   */
  public boolean uploadFile(String srcFilePath, String targetDir, String targetFileName, SftpProgressMonitor monitor) throws AwesomeException {
    ChannelSftp channelSftp = null;
    try {
      // 從鏈接池獲取對象
      channelSftp = this.objectPool.borrowObject();
      // 如果不存在目標文件夾
      if (!exist(channelSftp, targetDir)) {
        mkdirs(channelSftp, targetDir);
      }
      channelSftp.cd(targetDir);
      // 上傳文件
      if (Objects.nonNull(monitor)) {
        channelSftp.put(srcFilePath, targetFileName, monitor);
      } else {
        channelSftp.put(srcFilePath, targetFileName);
      }
      return true;
    } catch (Exception e) {
      throw new AwesomeException(500, "upload file fail");
    } finally {
      if (Objects.nonNull(channelSftp)) {
        // 返還對象給鏈接池
        this.objectPool.returnObject(channelSftp);
      }
    }
  }
  /**
   * 上傳文件到目標文件夾
   *
   * @param in
   * @param targetDir
   * @param targetFileName
   * @return
   * @throws AwesomeException
   */
  public boolean uploadFile(InputStream in, String targetDir, String targetFileName) throws AwesomeException {
    return uploadFile(in, targetDir, targetFileName, null);
  }
  /**
   * 上傳文件,添加進度監視器
   *
   * @param in
   * @param targetDir
   * @param targetFileName
   * @param monitor
   * @return
   * @throws AwesomeException
   */
  public boolean uploadFile(InputStream in, String targetDir, String targetFileName, SftpProgressMonitor monitor) throws AwesomeException {
    ChannelSftp channelSftp = null;
    try {
      channelSftp = this.objectPool.borrowObject();
      // 如果不存在目標文件夾
      if (!exist(channelSftp, targetDir)) {
        mkdirs(channelSftp, targetDir);
      }
      channelSftp.cd(targetDir);
      if (Objects.nonNull(monitor)) {
        channelSftp.put(in, targetFileName, monitor);
      } else {
        channelSftp.put(in, targetFileName);
      }
      return true;
    } catch (Exception e) {
      throw new AwesomeException(500, "upload file fail");
    } finally {
      if (Objects.nonNull(channelSftp)) {
        this.objectPool.returnObject(channelSftp);
      }
    }
  }
  /**
   * 下載文件
   *
   * @param remoteFile
   * @param targetFilePath
   * @return
   * @throws AwesomeException
   */
  public boolean downloadFile(String remoteFile, String targetFilePath) throws AwesomeException {
    return downloadFile(remoteFile, targetFilePath, null);
  }
  /**
   * 下載目標文件到本地
   *
   * @param remoteFile
   * @param targetFilePath
   * @return
   * @throws AwesomeException
   */
  public boolean downloadFile(String remoteFile, String targetFilePath, SftpProgressMonitor monitor) throws AwesomeException {
    ChannelSftp channelSftp = null;
    try {
      channelSftp = this.objectPool.borrowObject();
      // 如果不存在目標文件夾
      if (!exist(channelSftp, remoteFile)) {
        // 不用下載瞭
        return false;
      }
      File targetFile = new File(targetFilePath);
      try (FileOutputStream outputStream = new FileOutputStream(targetFile)) {
        if (Objects.nonNull(monitor)) {
          channelSftp.get(remoteFile, outputStream, monitor);
        } else {
          channelSftp.get(remoteFile, outputStream);
        }
      }
      return true;
    } catch (Exception e) {
      throw new AwesomeException(500, "upload file fail");
    } finally {
      if (Objects.nonNull(channelSftp)) {
        this.objectPool.returnObject(channelSftp);
      }
    }
  }
  /**
   * 下載文件
   *
   * @param remoteFile
   * @param outputStream
   * @return
   * @throws AwesomeException
   */
  public boolean downloadFile(String remoteFile, OutputStream outputStream) throws AwesomeException {
    return downloadFile(remoteFile, outputStream, null);
  }
  /**
   * 下載文件
   *
   * @param remoteFile
   * @param outputStream
   * @param monitor
   * @return
   * @throws AwesomeException
   */
  public boolean downloadFile(String remoteFile, OutputStream outputStream, SftpProgressMonitor monitor) throws AwesomeException {
    ChannelSftp channelSftp = null;
    try {
      channelSftp = this.objectPool.borrowObject();
      // 如果不存在目標文件夾
      if (!exist(channelSftp, remoteFile)) {
        // 不用下載瞭
        return false;
      }
      if (Objects.nonNull(monitor)) {
        channelSftp.get(remoteFile, outputStream, monitor);
      } else {
        channelSftp.get(remoteFile, outputStream);
      }
      return true;
    } catch (Exception e) {
      throw new AwesomeException(500, "upload file fail");
    } finally {
      if (Objects.nonNull(channelSftp)) {
        this.objectPool.returnObject(channelSftp);
      }
    }
  }
  /**
   * 創建文件夾
   *
   * @param channelSftp
   * @param dir
   * @return
   */
  protected boolean mkdirs(ChannelSftp channelSftp, String dir) {
    try {
      String pwd = channelSftp.pwd();
      if (StringUtils.contains(pwd, dir)) {
        return true;
      }
      String relativePath = StringUtils.substringAfter(dir, pwd);
      String[] dirs = StringUtils.splitByWholeSeparatorPreserveAllTokens(relativePath, "/");
      for (String path : dirs) {
        if (StringUtils.isBlank(path)) {
          continue;
        }
        try {
          channelSftp.cd(path);
        } catch (SftpException e) {
          channelSftp.mkdir(path);
          channelSftp.cd(path);
        }
      }
      return true;
    } catch (Exception e) {
      return false;
    }
  }
  /**
   * 判斷文件夾是否存在
   *
   * @param channelSftp
   * @param dir
   * @return
   */
  protected boolean exist(ChannelSftp channelSftp, String dir) {
    try {
      channelSftp.lstat(dir);
      return true;
    } catch (Exception e) {
      return false;
    }
  }

集成到SpringBoot中

我們可以通過java config的方式,把我們已經實現好的SftpClient類實例化到Spring IOC容器中來管理,以便讓開發人員在整個項目中通過@Autowired的方式就可以直接使用。

配置

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
 * @author zouwei
 * @className SftpProperties
 * @date: 2022/8/19 下午12:12
 * @description:
 */
@Data
@Configuration
@ConfigurationProperties(prefix = "sftp.config")
public class SftpProperties {
  // 用戶名
  private String username;
  // 密碼
  private String password;
  // 主機名
  private String host;
  // 端口
  private int port;
  // 密鑰
  private String privateKeyFile;
  // 口令
  private String passphrase;
  // 通道鏈接超時時間
  private int timeout;
  // 鏈接池配置
  private PoolConfig pool;
  @Data
  public static class PoolConfig {
    //最大空閑實例數,空閑超過此值將會被銷毀淘汰
    private int maxIdle;
    // 最小空閑實例數,對象池將至少保留2個空閑對象
    private int minIdle;
    //最大對象數量,包含借出去的和空閑的
    private int maxTotal;
    //對象池滿瞭,是否阻塞獲取(false則借不到直接拋異常)
    private boolean blockWhenExhausted;
    // BlockWhenExhausted為true時生效,對象池滿瞭阻塞獲取超時,不設置則阻塞獲取不超時,也可在borrowObject方法傳遞第二個參數指定本次的超時時間
    private long maxWaitMillis;
    // 創建對象後是否驗證對象,調用objectFactory#validateObject
    private boolean testOnCreate;
    // 借用對象後是否驗證對象 validateObject
    private boolean testOnBorrow;
    // 歸還對象後是否驗證對象 validateObject
    private boolean testOnReturn;
    // 定時檢查期間是否驗證對象 validateObject
    private boolean testWhileIdle;
    //定時檢查淘汰多餘的對象, 啟用單獨的線程處理
    private long timeBetweenEvictionRunsMillis;
    //jmx監控,和springboot自帶的jmx沖突,可以選擇關閉此配置或關閉springboot的jmx配置
    private boolean jmxEnabled;
  }
}

java Bean註入

import com.example.awesomespring.exception.AwesomeException;
import com.example.awesomespring.sftp.SftpClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @author zouwei
 * @className SftpConfig
 * @date: 2022/8/19 下午12:12
 * @description:
 */
@Configuration
public class SftpConfig {
  @Autowired
  private SftpProperties properties;
  // 創建SftpClient對象
  @Bean(destroyMethod = "close")
  @ConditionalOnProperty(prefix = "sftp.config")
  public SftpClient sftpClient() throws AwesomeException {
    return new SftpClient(properties);
  }
}

通過以上代碼,我們就可以在項目的任何地方直接使用SFTP客戶端來上傳和下載文件瞭。

更多關於SpringBoot SFTP文件上傳下載的資料請關註WalkonNet其它相關文章!

推薦閱讀: