浅析MySQL的批量提交的优化

MySQL's Optimization with BatchUpdate

Posted by LiuShuo on December 18, 2018

Batch提交

Batch提交是用一个Connection来对若干个SQL进行一次性、批量提交给MySQL处理。但有一个问题是,如果某个SQL 挂了而其他的并不会受影响,这样就无法保证原子性,只能再封装一层Transaction来实现。 同时,Batch处理并不是每个Server都支持的,需要看具体的产品和版本号。

代码

我们通过观察具体的客户端实现代码来了解批量提交是如何实现的 spring-jdbc-5.1.3.RELEASE中的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
	public int[] batchUpdate(String sql, SqlParameterSource[] batchArgs) {
		if (batchArgs.length == 0) {
			return new int[0];
		}

		ParsedSql parsedSql = getParsedSql(sql);
		PreparedStatementCreatorFactory pscf = getPreparedStatementCreatorFactory(parsedSql, batchArgs[0]);
        // 调用另外一个batchUpdate方法
		return getJdbcOperations().batchUpdate(
				pscf.getSql(),
				new BatchPreparedStatementSetter() {
					@Override
					public void setValues(PreparedStatement ps, int i) throws SQLException {
						Object[] values = NamedParameterUtils.buildValueArray(parsedSql, batchArgs[i], null);
						pscf.newPreparedStatementSetter(values).setValues(ps);
					}
					@Override
					public int getBatchSize() {
						return batchArgs.length;
					}
				});
	}

第二个batchUpdate方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Override
	public int[] batchUpdate(String sql, final BatchPreparedStatementSetter pss) throws DataAccessException {
		if (logger.isDebugEnabled()) {
			logger.debug("Executing SQL batch update [" + sql + "]");
		}

		// 调用execute方法,传入sql和回调接口
		int[] result = execute(sql, (PreparedStatementCallback<int[]>) ps -> {
			try {
				int batchSize = pss.getBatchSize();
				InterruptibleBatchPreparedStatementSetter ipss =
						(pss instanceof InterruptibleBatchPreparedStatementSetter ?
						(InterruptibleBatchPreparedStatementSetter) pss : null);
				if (JdbcUtils.supportsBatchUpdates(ps.getConnection())) {
				    // 如果支持批处理操作则依次将每个SQL的参数进行填充
					for (int i = 0; i < batchSize; i++) {
						pss.setValues(ps, i);
						if (ipss != null && ipss.isBatchExhausted(i)) {
							break;
						}
						ps.addBatch();
					}
					// 执行批处理函数(底层交给jdbc driver的实现类去执行,如mysql-connector-java)
					return ps.executeBatch();
				}
				else {
					List<Integer> rowsAffected = new ArrayList<>();
					for (int i = 0; i < batchSize; i++) {
						pss.setValues(ps, i);
						if (ipss != null && ipss.isBatchExhausted(i)) {
							break;
						}
						rowsAffected.add(ps.executeUpdate());
					}
					int[] rowsAffectedArray = new int[rowsAffected.size()];
					for (int i = 0; i < rowsAffectedArray.length; i++) {
						rowsAffectedArray[i] = rowsAffected.get(i);
					}
					return rowsAffectedArray;
				}
			}
			finally {
				if (pss instanceof ParameterDisposer) {
					((ParameterDisposer) pss).cleanupParameters();
				}
			}
		});

		Assert.state(result != null, "No result array");
		return result;
	}

调用的execute方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action)
			throws DataAccessException {

		Assert.notNull(psc, "PreparedStatementCreator must not be null");
		Assert.notNull(action, "Callback object must not be null");
		if (logger.isDebugEnabled()) {
			String sql = getSql(psc);
			logger.debug("Executing prepared SQL statement" + (sql != null ? " [" + sql + "]" : ""));
		}

		Connection con = DataSourceUtils.getConnection(obtainDataSource());
		PreparedStatement ps = null;
		try {
			ps = psc.createPreparedStatement(con);
			applyStatementSettings(ps);
			T result = action.doInPreparedStatement(ps);
			handleWarnings(ps);
			return result;
		}
		catch (SQLException ex) {
			// Release Connection early, to avoid potential connection pool deadlock
			// in the case when the exception translator hasn't been initialized yet.
			if (psc instanceof ParameterDisposer) {
				((ParameterDisposer) psc).cleanupParameters();
			}
			String sql = getSql(psc);
			JdbcUtils.closeStatement(ps);
			ps = null;
			DataSourceUtils.releaseConnection(con, getDataSource());
			con = null;
			throw translateException("PreparedStatementCallback", sql, ex);
		}
		finally {
			if (psc instanceof ParameterDisposer) {
				((ParameterDisposer) psc).cleanupParameters();
			}
			JdbcUtils.closeStatement(ps);
			DataSourceUtils.releaseConnection(con, getDataSource());
		}
	}

上面是spring-jdbc的batchUpdate逻辑的实现,但是具体的业务执行是由Server来执行的,所以不同的Server的实现机制不同可能对性能有较大影响。 以MySQL为例,在mysql-connector-java-5.1.43的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
    public boolean canRewriteAsMultiValueInsertAtSqlLevel() throws SQLException {
        return this.parseInfo.canRewriteAsMultiValueInsert;
    }
    
    @Override
        protected long[] executeBatchInternal() throws SQLException {
            synchronized (checkClosed().getConnectionMutex()) {
    
                if (this.connection.isReadOnly()) {
                    throw new SQLException(Messages.getString("PreparedStatement.25") + Messages.getString("PreparedStatement.26"),
                            SQLError.SQL_STATE_ILLEGAL_ARGUMENT);
                }
    
                if (this.batchedArgs == null || this.batchedArgs.size() == 0) {
                    return new long[0];
                }
    
                // we timeout the entire batch, not individual statements
                int batchTimeout = this.timeoutInMillis;
                this.timeoutInMillis = 0;
    
                resetCancelledState();
    
                try {
                    statementBegins();
    
                    clearWarnings();
                    // 注意这里的条件,batchHasPlainStatements默认初始化就是false,而rewriteBatchedStatements
                    // 需要在jdbc url里制定,否则不会走合并SQL的逻辑,如jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&rewriteBatchedStatements=true
                    if (!this.batchHasPlainStatements && this.connection.getRewriteBatchedStatements()) {
    
                        if (canRewriteAsMultiValueInsertAtSqlLevel()) {
                            return executeBatchedInserts(batchTimeout);
                        }
    
                        if (this.connection.versionMeetsMinimum(4, 1, 0) && !this.batchHasPlainStatements && this.batchedArgs != null
                                && this.batchedArgs.size() > 3 /* cost of option setting rt-wise */) {
                            return executePreparedBatchAsMultiStatement(batchTimeout);
                        }
                    }
    
                    return executeBatchSerially(batchTimeout);
                } finally {
                    this.statementExecuting.set(false);
    
                    clearBatch();
                }
            }
        }
    
    /**
     * Rewrites the already prepared statement into a multi-value insert
     * statement of 'statementsPerBatch' values and executes the entire batch
     * using this new statement.
     * 
     * @return update counts in the same fashion as executeBatch()
     * 
     * @throws SQLException
     */
    protected long[] executeBatchedInserts(int batchTimeout) throws SQLException {
        ......
    }

如何整合

根据DML的类型是insert、update还是delete,会走不同方法:

  • 如果是insert语句,满成条件情况下,会整合成:”insert into xxx_table values (xx),(yy),(zz)…”这样的语句。
  • 如果是update\delete语句,满成条件情况下,会整合成:”update t set … where id = 1; update t set … where id = 2; update t set … where id = 3 …“这样的语句。

总结

如果想要达到MySQL真正batchUpdate效果,需要有以下几个条件:

  • 需要在jdbcUrl后添加参数rewriteBatchedStatements=true
  • this.batchHasPlainStatements为false
  • 如果是update\delete 语句,还需要mysql版本>=4.1.0,并且batch的数量>3

References

  • https://dev.mysql.com/doc/refman/5.7/en/insert-on-duplicate.html
  • https://blog.csdn.net/qq271859852/article/details/79562262

本文首次发布于 LiuShuo’s Blog, 转载请保留原文链接.