Skip to content

Commit 789e94d

Browse files
committed
Apply optimistic locking for MongoDB repository
Signed-off-by: Yanming Zhou <[email protected]>
1 parent b0eccd2 commit 789e94d

File tree

7 files changed

+256
-170
lines changed

7 files changed

+256
-170
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/AbstractJdbcBatchMetadataDao.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2006-2023 the original author or authors.
2+
* Copyright 2006-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020

2121
import org.springframework.beans.factory.InitializingBean;
2222
import org.springframework.jdbc.core.JdbcOperations;
23+
import org.springframework.jdbc.core.simple.JdbcClient;
2324
import org.springframework.util.Assert;
2425
import org.springframework.util.StringUtils;
2526

@@ -29,6 +30,7 @@
2930
*
3031
* @author Robert Kasanicky
3132
* @author Mahmoud Ben Hassine
33+
* @author Yanming Zhou
3234
*/
3335
public abstract class AbstractJdbcBatchMetadataDao implements InitializingBean {
3436

@@ -47,6 +49,8 @@ public abstract class AbstractJdbcBatchMetadataDao implements InitializingBean {
4749

4850
private JdbcOperations jdbcTemplate;
4951

52+
private JdbcClient jdbcClient;
53+
5054
protected String getQuery(String base) {
5155
return StringUtils.replace(base, "%PREFIX%", tablePrefix);
5256
}
@@ -66,12 +70,17 @@ public void setTablePrefix(String tablePrefix) {
6670

6771
public void setJdbcTemplate(JdbcOperations jdbcTemplate) {
6872
this.jdbcTemplate = jdbcTemplate;
73+
this.jdbcClient = JdbcClient.create(jdbcTemplate);
6974
}
7075

7176
protected JdbcOperations getJdbcTemplate() {
7277
return jdbcTemplate;
7378
}
7479

80+
protected JdbcClient getJdbcClient() {
81+
return jdbcClient;
82+
}
83+
7584
public int getClobTypeToUse() {
7685
return clobTypeToUse;
7786
}
@@ -83,6 +92,7 @@ public void setClobTypeToUse(int clobTypeToUse) {
8392
@Override
8493
public void afterPropertiesSet() throws Exception {
8594
Assert.state(jdbcTemplate != null, "JdbcOperations is required");
95+
Assert.state(jdbcClient != null, "JdbcClient is required");
8696
}
8797

8898
}

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcExecutionContextDao.java

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.sql.PreparedStatement;
2525
import java.sql.ResultSet;
2626
import java.sql.SQLException;
27+
import java.sql.Types;
2728
import java.util.Collection;
2829
import java.util.HashMap;
2930
import java.util.Iterator;
@@ -43,6 +44,7 @@
4344
import org.springframework.core.serializer.Serializer;
4445
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
4546
import org.springframework.jdbc.core.RowMapper;
47+
import org.springframework.jdbc.core.simple.JdbcClient;
4648
import org.springframework.lang.NonNull;
4749
import org.springframework.util.Assert;
4850

@@ -65,7 +67,7 @@ public class JdbcExecutionContextDao extends AbstractJdbcBatchMetadataDao implem
6567
private static final String FIND_JOB_EXECUTION_CONTEXT = """
6668
SELECT SHORT_CONTEXT, SERIALIZED_CONTEXT
6769
FROM %PREFIX%JOB_EXECUTION_CONTEXT
68-
WHERE JOB_EXECUTION_ID = ?
70+
WHERE JOB_EXECUTION_ID = :executionId
6971
""";
7072

7173
private static final String INSERT_JOB_EXECUTION_CONTEXT = """
@@ -82,7 +84,7 @@ public class JdbcExecutionContextDao extends AbstractJdbcBatchMetadataDao implem
8284
private static final String FIND_STEP_EXECUTION_CONTEXT = """
8385
SELECT SHORT_CONTEXT, SERIALIZED_CONTEXT
8486
FROM %PREFIX%STEP_EXECUTION_CONTEXT
85-
WHERE STEP_EXECUTION_ID = ?
87+
WHERE STEP_EXECUTION_ID = :executionId
8688
""";
8789

8890
private static final String INSERT_STEP_EXECUTION_CONTEXT = """
@@ -98,12 +100,12 @@ public class JdbcExecutionContextDao extends AbstractJdbcBatchMetadataDao implem
98100

99101
private static final String DELETE_STEP_EXECUTION_CONTEXT = """
100102
DELETE FROM %PREFIX%STEP_EXECUTION_CONTEXT
101-
WHERE STEP_EXECUTION_ID = ?
103+
WHERE STEP_EXECUTION_ID = :executionId
102104
""";
103105

104106
private static final String DELETE_JOB_EXECUTION_CONTEXT = """
105107
DELETE FROM %PREFIX%JOB_EXECUTION_CONTEXT
106-
WHERE JOB_EXECUTION_ID = ?
108+
WHERE JOB_EXECUTION_ID = :executionId
107109
""";
108110

109111
private Charset charset = StandardCharsets.UTF_8;
@@ -154,8 +156,10 @@ public ExecutionContext getExecutionContext(JobExecution jobExecution) {
154156
Long executionId = jobExecution.getId();
155157
Assert.notNull(executionId, "ExecutionId must not be null.");
156158

157-
try (Stream<ExecutionContext> stream = getJdbcTemplate().queryForStream(getQuery(FIND_JOB_EXECUTION_CONTEXT),
158-
new ExecutionContextRowMapper(), executionId)) {
159+
try (Stream<ExecutionContext> stream = getJdbcClient().sql(getQuery(FIND_JOB_EXECUTION_CONTEXT))
160+
.param("executionId", executionId)
161+
.query(new ExecutionContextRowMapper())
162+
.stream()) {
159163
return stream.findFirst().orElseGet(ExecutionContext::new);
160164
}
161165
}
@@ -165,8 +169,10 @@ public ExecutionContext getExecutionContext(StepExecution stepExecution) {
165169
Long executionId = stepExecution.getId();
166170
Assert.notNull(executionId, "ExecutionId must not be null.");
167171

168-
try (Stream<ExecutionContext> stream = getJdbcTemplate().queryForStream(getQuery(FIND_STEP_EXECUTION_CONTEXT),
169-
new ExecutionContextRowMapper(), executionId)) {
172+
try (Stream<ExecutionContext> stream = getJdbcClient().sql(getQuery(FIND_STEP_EXECUTION_CONTEXT))
173+
.param("executionId", executionId)
174+
.query(new ExecutionContextRowMapper())
175+
.stream()) {
170176
return stream.findFirst().orElseGet(ExecutionContext::new);
171177
}
172178
}
@@ -248,7 +254,7 @@ public void saveExecutionContexts(Collection<StepExecution> stepExecutions) {
248254
*/
249255
@Override
250256
public void deleteExecutionContext(JobExecution jobExecution) {
251-
getJdbcTemplate().update(getQuery(DELETE_JOB_EXECUTION_CONTEXT), jobExecution.getId());
257+
getJdbcClient().sql(getQuery(DELETE_JOB_EXECUTION_CONTEXT)).param("executionId", jobExecution.getId()).update();
252258
}
253259

254260
/**
@@ -257,7 +263,9 @@ public void deleteExecutionContext(JobExecution jobExecution) {
257263
*/
258264
@Override
259265
public void deleteExecutionContext(StepExecution stepExecution) {
260-
getJdbcTemplate().update(getQuery(DELETE_STEP_EXECUTION_CONTEXT), stepExecution.getId());
266+
getJdbcClient().sql(getQuery(DELETE_STEP_EXECUTION_CONTEXT))
267+
.param("executionId", stepExecution.getId())
268+
.update();
261269
}
262270

263271
@Override
@@ -286,16 +294,13 @@ private void persistSerializedContext(final Long executionId, String serializedC
286294
longContext = null;
287295
}
288296

289-
getJdbcTemplate().update(getQuery(sql), ps -> {
290-
ps.setString(1, shortContext);
291-
if (longContext != null) {
292-
ps.setString(2, longContext);
293-
}
294-
else {
295-
ps.setNull(2, getClobTypeToUse());
296-
}
297-
ps.setLong(3, executionId);
298-
});
297+
getJdbcClient().sql(getQuery(sql))
298+
// @formatter:off
299+
.param(1, shortContext, Types.VARCHAR)
300+
.param(2, longContext, getClobTypeToUse())
301+
.param(3, executionId, Types.BIGINT)
302+
// @formatter:on
303+
.update();
299304
}
300305

301306
/**

0 commit comments

Comments
 (0)