java批量操作数据库优化
一、背景
最近收到数据库CPU告警,跟踪后发现是有小伙伴在循环操作数据库,每个数据都要查询+更新,看着血压有点高~
伪代码:
java复制代码Map<String, GetTaskListResponse> taskList = ; // 接口查询一个列表 taskList.forEach((taskId, task) -> { Task originalTask = taskMapper .selectOne(new LambdaQueryWrapper<Task>().eq(Task::getTaskId, taskId)); ... // 存在判断+计算 taskMapper.updateById(originalTask); });
分析cpu飙升过程:如果循环操作10万次 【查询+更新】,会有大量请求数据库的操作。其次,请求总时间会很久,在调用日志里面看到一次请求大于60s(这个慢请求是定时器发出的,所以一开始没有重视),触发一次超时重试,会再次发起请求,最终会导致数据库频繁请求,cpu升高。
与数据库的交互
应用连接到数据库之后,与数据库之间的交互基本时间开销有:连接、网络、io,如果操作的数据量很大,还会有数据库的计算开销。上面案例中的数据表数据不多,而连接是有池化的,所以开销主要是网络和io,减少交互的次数,是最简单的优化方式。
考虑分治:改成分批次更新,比如操作10万个数据,一次查1000个数据出来,计算好,一次提交1000个更新sql,循环100次即可。如果数据是没有依赖关系的,代码里面可以用多线程,异步查询、计算及更新。当然,异步发送请求太多太快,小心数据库会撑不住,这个要观察。
sql 批量操作方法
常见的数据库,比如mysql,可以使用的批量技术有2种,ORM框架用的机制也是基于这2个:
多个sql一次提交;
多个sql合成单个sql。
(1)多个sql一次提交
用";"分割的多个sql:将多次提交变成一次提交。比如下面的sql,数据表为user:
sql复制代码CREATE TABLE `user` ( `id` int(11) NOT NULL, `name` varchar(255) DEFAULT "", `age` int(11) DEFAULT 0, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
批量sql:
sql复制代码-- 插入 INSERT INTO user(name, id) VALUES('hello',1); INSERT INTO user(name, id) VALUES('world',2); -- 更新 update user set name='hello1' where id=1; update user set name='world1' where id=2;
(2)合成sql
多个sql合成一个提交:
sql复制代码-- 插入: 这里用自增id做主键 INSERT INTO user(name, age) VALUES ('hello',10), ('world',20); -- 更新: 利用 case-when-then 合成 update user set name= case when id=1 then 'hello1' when id=2 then 'world1' end where id in (1,2); -- 这里也可以用or,但是可能会索引失效。
二、常见ORM的批量操作
1.JPA
批量更新、批量插入都是saveAll,demo:
java复制代码List<User> userList = // 获取用户列表 userRepository.saveAll(userList);
源码:
java复制代码// org.springframework.data.jpa.repository.support.SimpleJpaRepository @Transactional public <S extends T> List<S> saveAll(Iterable<S> entities) { Assert.notNull(entities, "Entities must not be null!"); List<S> result = new ArrayList(); Iterator var3 = entities.iterator(); while(var3.hasNext()) { S entity = var3.next(); result.add(this.save(entity)); } return result; }
这里可以看到这个saveAll是"假"的,实现方式是循环调用save(),大量数据提交的话,会有性能问题。
2.mybatis-plus 批量更新
(1)手动批量flush
java复制代码List<Task> taskList = // 获取任务列表 try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH)) { TaskMapper taskMapper = sqlSession.getMapper(TaskMapper.class); for (Task task : taskList) { taskMapper.updateById(task); } sqlSession.flushStatements();// 提交一批sql sqlSession.commit(); }
处理方式是多个sql一次提交。
(2)updateBatchById
来自mybatis-plus-extension-3.2.0.jar,下是调用demo:
java复制代码public interface UserService extends IService<User> { void updateBatchUserById(List<User> list); } @Service public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService { @Override public void updateBatchUserById(List<User> list) { this.updateBatchById(list); } }
ServiceImpl 里的 this.updateBatchById源码如下:
java复制代码@Transactional( rollbackFor = {Exception.class} ) public boolean updateBatchById(Collection<T> entityList, int batchSize) { ... String sqlStatement = this.sqlStatement(SqlMethod.UPDATE_BY_ID); // UPDATE_BY_ID("updateById", "根据ID 选择修改数据", "<script>\nUPDATE %s %s WHERE %s=#{%s} %s\n</script>"), SqlSession batchSqlSession = this.sqlSessionBatch(); try { int i = 0; // 分批 for(Iterator var7 = entityList.iterator(); var7.hasNext(); ++i) { T anEntityList = var7.next(); ParamMap<T> param = new ParamMap(); param.put("et", anEntityList); // 这里有坑 anEntityList不能为null batchSqlSession.update(sqlStatement, param); if (i >= 1 && i % batchSize == 0) { batchSqlSession.flushStatements(); //提交sql } } batchSqlSession.flushStatements();//提交最后一批sql return true; } catch (Throwable var17) { // 异常转换 } finally { // 关闭会话 } }
思路其实和上面(1)的批量flush是一样的。也可以自定义sql:
xml复制代码<update id="batchUpdate" parameterType="java.util.List"> <foreach collection="list" item="item" separator=";"> update user set name = #{item.name} where id = #{item.id} </foreach> </update>
(3)更进一步:合成一个sql
提交的一批sql还能合并吗?使用case-when-then
xml复制代码<update id="updateBatch" parameterType="java.util.List" > update user <trim prefix="set" suffixOverrides=","> <trim prefix="name=case" suffix="end,"> <foreach collection="list" item="i" index="index"> <if test="i.name != null and i.name != ''"> when id=#{i.id} then #{i.name} </if> </foreach> </trim> </trim> where <foreach collection="list" separator="or" item="i" index="index" > id = #{i.id} </foreach> </update>
如果直接提交10万个更新的sql,也是可以的。有可能出的问题:
(1)jvm 字符串太大,OOM。
(2)数据库限制条数或者提交sql的数据大小。
参考:Mybatis中进行批量更新(updateBatch)
3.mybatis-plus 批量插入
(1)对象操作saveBatch
demo:
sql复制代码List<User> userList = new ArrayList<>(); userList.add(new User(1L, "Tom")); userList.add(new User(2L, "Jerry")); userList.add(new User(3L, "Mike")); int result = userMapper.saveBatch(userList);
源码:
java复制代码@Transactional( rollbackFor = {Exception.class} ) public boolean saveBatch(Collection<T> entityList, int batchSize) { String sqlStatement = this.sqlStatement(SqlMethod.INSERT_ONE);//INSERT_ONE("insert", "插入一条数据(选择字段插入)", "<script>\nINSERT INTO %s %s VALUES %s\n</script>"), SqlSession batchSqlSession = this.sqlSessionBatch(); Throwable var5 = null; try { int i = 0; // 分批 for(Iterator var7 = entityList.iterator(); var7.hasNext(); ++i) { T anEntityList = var7.next(); batchSqlSession.insert(sqlStatement, anEntityList); if (i >= 1 && i % batchSize == 0) { batchSqlSession.flushStatements();// 提交sql } } batchSqlSession.flushStatements();// //提交最后一批sql return true; } catch (Throwable var16) { // 异常处理 } finally { // 关闭会话 } }
这里比较遗憾,mybatis-plus的插入不会合成一个sql,只是一次提交多个插入的sql。
#####(2)合成一个sql 脚本如下:
xml复制代码<insert id="batchInsert" parameterType="java.util.List" useGeneratedKeys="true" keyProperty="id" flushCache="true"> insert into user (name) values <foreach item="item" index="index" collection="list" separator=","> (#{item.name}) </foreach> </insert>
参考: 数据库批量插入这么讲究的么? MyBatis批量插入的五种方式,哪种最强
4.jdbcTemplate
(1)手动拼接
这种方式比较原始,需要把每个更新的insert/update sql语句写好,拼成一个大的sql提交。
(2)批量操作
demo如下:
java复制代码List<User> userList = // 获取用户列表 String sql = "update user set age=? where id=?"; jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() { @Override public void setValues(PreparedStatement ps, int i) throws SQLException { User user = userList.get(i); ps.setInt(1, user.gee()); ps.setLong(2, user.getId()); } @Override public int getBatchSize() { return userList.size(); } });
源码:
ini复制代码public int[] batchUpdate(String sql, BatchPreparedStatementSetter pss) throws DataAccessException { int[] result = (int[])this.execute(sql, (ps) -> { // execute执行sql try { ... if (JdbcUtils.supportsBatchUpdates(ps.getConnection())) { int ixx = 0; while(true) { if (ixx < batchSize) { pss.setValues(ps, ixx); if (ipss == null || !ipss.isBatchExhausted(ixx)) { ps.addBatch(); // 添加 ++ixx; continue; } } // 获取预编译参数 int[] var11 = ps.executeBatch(); return var11; } } else { List<Integer> rowsAffected = new ArrayList(); for(int i = 0; i < batchSize; ++i) { pss.setValues(ps, i); if (ipss != null && ipss.isBatchExhausted(i)) { break; } rowsAffected.add(ps.executeUpdate()); // 添加 } int[] rowsAffectedArray = new int[rowsAffected.size()]; for(int ix = 0; ix < rowsAffectedArray.length; ++ix) { rowsAffectedArray[ix] = (Integer)rowsAffected.get(ix); } // 获取预编译参数 int[] var13 = rowsAffectedArray; return var13; } } finally { // 清理 } }); Assert.state(result != null, "No result array"); return result; }
看源码,原理一样的~最后都是拼接一批sql,JdbcTemplate只是提供了辅助拼接的工具。
三、总结
时刻记住,尽量不要循环操作数据库。大量操作数据库,要分批处理。
功能上线后要关注接口的性能,考虑把时间过长的请求纳入告警。
要加强代码review,代码静态分析工作。