Java中ShardingSphere 數據分片的實現
前言
其實很多人對分庫分表多少都有點恐懼,其實我也是,總覺得這玩意是運維幹的、數據量上來瞭或者sql過於復雜、一些數據分片的中間件支持的也不是很友好、配置繁瑣等多種問題。
我們今天用ShardingSphere 給大傢演示數據分片,包括分庫分表、隻分表不分庫進行說明。
下一節有時間的話在講講讀寫分離吧。
github地址:https://github.com/362460453/boot-sharding-JDBC
ShardingSphere介紹
ShardingSphere是一套開源的分佈式數據庫中間件解決方案組成的生態圈,它由Sharding-JDBC、Sharding-Proxy和Sharding-Sidecar(計劃中)這3款相互獨立的產品組成。 他們均提供標準化的數據分片、分佈式事務和數據庫治理功能,可適用於如Java同構、異構語言、容器、雲原生等各種多樣化的應用場景。
ShardingSphere的功能能幫助我們做什麼
- 數據分片
- 讀寫分離
- 編排治理
- 分佈式事務
2016年初Sharding-JDBC被開源,這個產品是當當的,加入瞭Apache 後改名為 ShardingSphere 。他是我們應用和數據庫之間的中間層,雖代碼入侵性很強,但不會對現有業務邏輯進行改變。
更多文檔請點擊官網:https://shardingsphere.apache.org/document/current/en/overview/
為什麼不用mycat
大傢如果去查相關資料會知道,mycat和ShardingSphere是同類型的中間件,主要的功能,數據分片和讀寫分離兩個都能去做,但是姿勢卻有很大的差別, 從字面意義上看Sharding 含義是分片、碎片的意思,所以不難理解ShardingSphere 對數據分片有很強對能力,對於99%對sql都是支持的,官網也有sql支持的相關內容,大傢詳細閱讀,隻有 類似sum 這種函數不支持,而且對 ORM框架和常用數據庫基本都兼容,所以個人建議如果你們做數據分片,也就是是分庫分表對話,強烈建議選擇ShardingSphere,因為我私下也和一些朋友交流過,mycat 的數據分片對多表查詢不是很友好,而且用 mycat 要有很強的運維來做,還有一點就是mycat 都是靠xml配置的,沒有代碼入侵,所以這也算是他的優點吧。如果你們隻做讀寫分離對話,那麼我建議用mycat,是沒問題的。
實踐前的準備工作
啟動你的mysql,創建兩個數據庫,分別叫 sharding_master 和 sharding_salve分別在這兩個數據庫執行如下sql
CREATE TABLE IF NOT EXISTS `t_order_0` ( `order_id` INT NOT NULL, `user_id` INT NOT NULL, PRIMARY KEY (`order_id`) ); CREATE TABLE IF NOT EXISTS `t_order_1` ( `order_id` INT NOT NULL, `user_id` INT NOT NULL, PRIMARY KEY (`order_id`) );
做完以上兩步結果如下
代碼案例
環境
工具 | 版本 |
jdk |
1.8.0_144 |
springboot | 2.0.4.RELEASE |
sharding | 1.3.1 |
mysql | 5.7 |
創建一個springboot工程,我們使用 JdbcTemplate 框架,如果用mybatis也是無影響的。
pom引用依賴如下
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.4.RELEASE</version> </parent> <properties> <java.version>1.8</java.version> <druid.version>1.0.26</druid.version> <sharding.jdbc.core.version>1.3.3</sharding.jdbc.core.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>com.dangdang</groupId> <artifactId>sharding-jdbc-core</artifactId> <version>${sharding.jdbc.core.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>${druid.version}</version> </dependency> </dependencies>
application.yml 配置如下
server: port: 8050 sharding: jdbc: driverClassName: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/sharding_master?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false username: root password: 123456 filters: stat maxActive: 100 initialSize: 1 maxWait: 15000 minIdle: 1 timeBetweenEvictionRunsMillis: 30000 minEvictableIdleTimeMillis: 180000 validationQuery: SELECT 'x' testWhileIdle: true testOnBorrow: false testOnReturn: false poolPreparedStatements: false maxPoolPreparedStatementPerConnectionSize: 20 removeAbandoned: true removeAbandonedTimeout: 600 logAbandoned: false connectionInitSqls: url0: jdbc:mysql://localhost:3306/sharding_master?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false username0: root password0: 123456 url1: jdbc:mysql://localhost:3306/sharding_salve?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false username1: root password1: 123456
yml映射成Bean
@Data @ConfigurationProperties(prefix="sharding.jdbc") public class ShardDataSourceProperties { private String driverClassName; private String url; private String username; private String password; private String url0; private String username0; private String password0; private String url1; private String username1; private String password1; private String filters; private int maxActive; private int initialSize; private int maxWait; private int minIdle; private int timeBetweenEvictionRunsMillis; private int minEvictableIdleTimeMillis; private String validationQuery; private boolean testWhileIdle; private boolean testOnBorrow; private boolean testOnReturn; private boolean poolPreparedStatements; private int maxPoolPreparedStatementPerConnectionSize; private boolean removeAbandoned; private int removeAbandonedTimeout; private boolean logAbandoned; private List<String> connectionInitSqls; //省略geter setter
分庫策略
//通過實現SingleKeyDatabaseShardingAlgorithm接口實現分庫 public class ModuloDatabaseShardingAlgorithm implements SingleKeyDatabaseShardingAlgorithm<Integer> { @Override public String doEqualSharding(Collection<String> availableTargetNames, ShardingValue<Integer> shardingValue) { for (String each : availableTargetNames) { if (each.endsWith(shardingValue.getValue() % 2 + "")) { return each; } } throw new IllegalArgumentException(); } @Override public Collection<String> doInSharding(Collection<String> availableTargetNames, ShardingValue<Integer> shardingValue) { Collection<String> result = new LinkedHashSet<>(availableTargetNames.size()); for (Integer value : shardingValue.getValues()) { for (String targetName : availableTargetNames) { if (targetName.endsWith(value % 2 + "")) { result.add(targetName); } } } return result; } @Override public Collection<String> doBetweenSharding(Collection<String> availableTargetNames, ShardingValue<Integer> shardingValue) { Collection<String> result = new LinkedHashSet<>(availableTargetNames.size()); Range<Integer> range = (Range<Integer>) shardingValue.getValueRange(); for (Integer i = range.lowerEndpoint(); i <= range.upperEndpoint(); i++) { for (String each : availableTargetNames) { if (each.endsWith(i % 2 + "")) { result.add(each); } } } return result; } }
分表策略
public class ModuloTableShardingAlgorithm implements SingleKeyTableShardingAlgorithm<Integer> { /** * 對於分片字段的等值操作 都走這個方法。(包括 插入 更新) * 如: * <p> * select * from t_order from t_order where order_id = 11 * └── SELECT * FROM t_order_1 WHERE order_id = 11 * select * from t_order from t_order where order_id = 44 * └── SELECT * FROM t_order_0 WHERE order_id = 44 * </P> */ @Override public String doEqualSharding(final Collection<String> tableNames, final ShardingValue<Integer> shardingValue) { for (String each : tableNames) { if (each.endsWith(shardingValue.getValue() % 2 + "")) { return each; } } throw new IllegalArgumentException(); } /** * 對於分片字段的in操作,都走這個方法。 * select * from t_order from t_order where order_id in (11,44) * ├── SELECT * FROM t_order_0 WHERE order_id IN (11,44) * └── SELECT * FROM t_order_1 WHERE order_id IN (11,44) * select * from t_order from t_order where order_id in (11,13,15) * └── SELECT * FROM t_order_1 WHERE order_id IN (11,13,15) * select * from t_order from t_order where order_id in (22,24,26) * └──SELECT * FROM t_order_0 WHERE order_id IN (22,24,26) */ @Override public Collection<String> doInSharding(final Collection<String> tableNames, final ShardingValue<Integer> shardingValue) { Collection<String> result = new LinkedHashSet<>(tableNames.size()); for (Integer value : shardingValue.getValues()) { for (String tableName : tableNames) { if (tableName.endsWith(value % 2 + "")) { result.add(tableName); } } } return result; } /** * 對於分片字段的between操作都走這個方法。 * select * from t_order from t_order where order_id between 10 and 20 * ├── SELECT * FROM t_order_0 WHERE order_id BETWEEN 10 AND 20 * └── SELECT * FROM t_order_1 WHERE order_id BETWEEN 10 AND 20 */ @Override public Collection<String> doBetweenSharding(final Collection<String> tableNames, final ShardingValue<Integer> shardingValue) { Collection<String> result = new LinkedHashSet<>(tableNames.size()); Range<Integer> range = (Range<Integer>) shardingValue.getValueRange(); for (Integer i = range.lowerEndpoint(); i <= range.upperEndpoint(); i++) { for (String each : tableNames) { if (each.endsWith(i % 2 + "")) { result.add(each); } } } return result; } }
對特定表和庫,進行特定的分庫分表規則
簡單說,就是分庫按照瞭user_id的奇偶區分,分表按照order_id 的奇偶區分,
如果你有多個表進行分片,就寫多個TableRule,
配置兩個數據源,分別是我在yml裡的配置,根據你的需求個性化配置就可以。
@Configuration @EnableConfigurationProperties(ShardDataSourceProperties.class) public class ShardDataSourceConfig { @Autowired private ShardDataSourceProperties shardDataSourceProperties; private DruidDataSource parentDs() throws SQLException { DruidDataSource ds = new DruidDataSource(); ds.setDriverClassName(shardDataSourceProperties.getDriverClassName()); ds.setUsername(shardDataSourceProperties.getUsername()); ds.setUrl(shardDataSourceProperties.getUrl()); ds.setPassword(shardDataSourceProperties.getPassword()); ds.setFilters(shardDataSourceProperties.getFilters()); ds.setMaxActive(shardDataSourceProperties.getMaxActive()); ds.setInitialSize(shardDataSourceProperties.getInitialSize()); ds.setMaxWait(shardDataSourceProperties.getMaxWait()); ds.setMinIdle(shardDataSourceProperties.getMinIdle()); ds.setTimeBetweenEvictionRunsMillis(shardDataSourceProperties.getTimeBetweenEvictionRunsMillis()); ds.setMinEvictableIdleTimeMillis(shardDataSourceProperties.getMinEvictableIdleTimeMillis()); ds.setValidationQuery(shardDataSourceProperties.getValidationQuery()); ds.setTestWhileIdle(shardDataSourceProperties.isTestWhileIdle()); ds.setTestOnBorrow(shardDataSourceProperties.isTestOnBorrow()); ds.setTestOnReturn(shardDataSourceProperties.isTestOnReturn()); ds.setPoolPreparedStatements(shardDataSourceProperties.isPoolPreparedStatements()); ds.setMaxPoolPreparedStatementPerConnectionSize( shardDataSourceProperties.getMaxPoolPreparedStatementPerConnectionSize()); ds.setRemoveAbandoned(shardDataSourceProperties.isRemoveAbandoned()); ds.setRemoveAbandonedTimeout(shardDataSourceProperties.getRemoveAbandonedTimeout()); ds.setLogAbandoned(shardDataSourceProperties.isLogAbandoned()); ds.setConnectionInitSqls(shardDataSourceProperties.getConnectionInitSqls()); return ds; } private DataSource ds0() throws SQLException { DruidDataSource ds = parentDs(); ds.setUsername(shardDataSourceProperties.getUsername0()); ds.setUrl(shardDataSourceProperties.getUrl0()); ds.setPassword(shardDataSourceProperties.getPassword0()); return ds; } private DataSource ds1() throws SQLException { DruidDataSource ds = parentDs(); ds.setUsername(shardDataSourceProperties.getUsername1()); ds.setUrl(shardDataSourceProperties.getUrl1()); ds.setPassword(shardDataSourceProperties.getPassword1()); return ds; } private DataSourceRule dataSourceRule() throws SQLException { Map<String, DataSource> dataSourceMap = new HashMap<>(2); dataSourceMap.put("ds_0", ds0()); dataSourceMap.put("ds_1", ds1()); DataSourceRule dataSourceRule = new DataSourceRule(dataSourceMap); return dataSourceRule; } //對order對策略 private TableRule orderTableRule() throws SQLException { TableRule orderTableRule = TableRule.builder("t_order").actualTables(Arrays.asList("t_order_0", "t_order_1")) .dataSourceRule(dataSourceRule()).build(); return orderTableRule; } //分庫分表策略 private ShardingRule shardingRule() throws SQLException { ShardingRule shardingRule = ShardingRule.builder().dataSourceRule(dataSourceRule()) .tableRules(Arrays.asList(orderTableRule(), orderItemTableRule())) .databaseShardingStrategy( new DatabaseShardingStrategy("user_id", new ModuloDatabaseShardingAlgorithm())) .tableShardingStrategy(new TableShardingStrategy("order_id", new ModuloTableShardingAlgorithm())) .build(); return shardingRule; } @Bean public DataSource dataSource() throws SQLException { return ShardingDataSourceFactory.createDataSource(shardingRule()); } @Bean public PlatformTransactionManager transactionManager() throws SQLException { return new DataSourceTransactionManager(dataSource()); } }
我們需要從controller調用接口進行對數據的增加和查詢
下面所有的類都是用來模擬請求進行測試
@RestController @RequestMapping("/order") public class OrderController { @Autowired private OrderDao orderDao; @RequestMapping(path = "/createOrder/{userId}/{orderId}", method = {RequestMethod.GET}) public String createOrder(@PathVariable("userId") Integer userId, @PathVariable("orderId") Integer orderId) { Order order = new Order(); order.setOrderId(orderId); order.setUserId(userId); orderDao.createOrder(order); return "success"; } @RequestMapping(path = "/{userId}", method = {RequestMethod.GET}) public List<Order> getOrderListByUserId(@PathVariable("userId") Integer userId) { return orderDao.getOrderListByUserId(userId); } } --------------------------------------------------- public interface OrderDao { List<Order> getOrderListByUserId(Integer userId); void createOrder(Order order); } --------------------------------------------------- @Service public class OrderDaoImpl implements OrderDao { @Autowired JdbcTemplate jdbcTemplate; @Override public List<Order> getOrderListByUserId(Integer userId) { StringBuilder sqlBuilder = new StringBuilder(); sqlBuilder .append("select order_id, user_id from t_order where user_id=? "); return jdbcTemplate.query(sqlBuilder.toString(), new Object[]{userId}, new int[]{Types.INTEGER}, new BeanPropertyRowMapper<Order>( Order.class)); } @Override public void createOrder(Order order) { StringBuffer sb = new StringBuffer(); sb.append("insert into t_order(user_id, order_id)"); sb.append("values("); sb.append(order.getUserId()).append(","); sb.append(order.getOrderId()); sb.append(")"); jdbcTemplate.update(sb.toString()); } } --------------------------------------------------- public class Order implements Serializable { private int userId; private int orderId; --------------------------------------------------- @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
測試
啟動項目,訪問:http://localhost:8050/order/createOrder/1/1
更換參數多次訪問,可以插入多條記錄,觀察你的數據庫入庫情況,已經按照我們制定的分庫分表策略進行劃分瞭。
需要註意的是
shareding是不支持jdbctemplate的批量修改操作的。
表名前不要加上庫名,原生的情況加庫名,不加庫名其實是一樣的,但使用shareding的表就會報錯。
如果想進行隻分表不分庫的話
- 註釋掉 ModuloDatabaseShardingAlgorithm 類
- 還有ShardDataSourceConfig.shardingRule() 中的分庫策略那行代碼
- 還有相關數據源配置改成 1 個
到此這篇關於Java中ShardingSphere 數據分片的實現的文章就介紹到這瞭,更多相關ShardingSphere 數據分片內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- ShardingSphere jdbc實現分庫分表核心概念詳解
- Java ShardingJDBC實戰演練
- Sharding-Jdbc 自定義復合分片的實現(分庫分表)
- SpringBoot整合sharding-jdbc實現自定義分庫分表的實踐
- 使用sharding-jdbc實現水平分表的示例代碼