背景
Springboot项目,有个需求,需要提供接口,接口调用方每一次调用时,会保存或者更新大量数据,接口需要满足以下要求:
- 数据保存要保证数据原子性:要么全部保存成功,要么全部不保存。
- 保证接口性能。
实践发现,即使使用批量保存,接口耗时也久,所以需要开启多线程来保存。现在的问题是,在开启多线程保存的情况下,如何保证数据的原子性。
使用声明式事务出现的问题
具体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Transactional(rollbackFor = Exception.class) public boolean saveUser() { Long userId = 1L; CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> { User user = new User(); user.setId(userId); user.setName("wls"); user.setEmail("1396523950@qq.com"); save(user); }, threadPoolExecutor); CompletableFuture<Void> f2 = CompletableFuture.runAsync(() -> { UserLog userLog = new UserLog(); userLog.setUserId(userId); userLogService.save(userLog); int i = 1 / 0; }, threadPoolExecutor); CompletableFuture.allOf(f1, f2).join(); return true; }
|
此时可以发现 user保存成功,userLog未保存成功,说明整个方法的事务并未保证原子性。
解决思路
- 开启多线程,每个线程都是使用独立的DB连接。否则由于数据库是串行阻塞操作,最终还是会变成排队操作数据库。
- 依赖spring事务异常回滚机制。
- 有个统一的标识来标识“是否有线程操作失败”。
- 线程如果出现异常:先捕获异常,将标识设置为失败,然后继续抛出异常。
- 线程如果没有异常,在执行的最后,判断标识是失败,也就是“有其他线程有执行失败”,就自定义抛出异常来回滚。
- 通过锁来保证:所有的线程都操作完之后,一起判断标识是否成功;确保不会出现“还有线程的业务未执行完成,其他线程就已经结束工作”。
最终代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| @Slf4j @Service public class UserService extends ServiceImpl<UserMapper, User> {
@Resource private ThreadPoolExecutor threadPoolExecutor;
@Resource private DataSourceTransactionManager dataSourceTransactionManager;
@Resource private UserLogService userLogService;
public boolean saveUser() { CyclicBarrier cb = new CyclicBarrier(2); AtomicBoolean flag = new AtomicBoolean(false); Long userId = 1L; CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> { DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition(); TransactionStatus transaction = dataSourceTransactionManager.getTransaction(defaultTransactionDefinition); try { User user = new User(); user.setId(userId); user.setName("wls"); user.setEmail("1396523950@qq.com"); save(user); cb.await(); log.info("f1: "+flag.get()); if (flag.get()) { dataSourceTransactionManager.rollback(transaction); return; } dataSourceTransactionManager.commit(transaction); } catch (Exception e) { flag.set(true); dataSourceTransactionManager.rollback(transaction); try { cb.await(); } catch (InterruptedException | BrokenBarrierException ex) { throw new RuntimeException(ex); } throw new RuntimeException(e); }
}, threadPoolExecutor); CompletableFuture<Void> f2 = CompletableFuture.runAsync(() -> { DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition(); TransactionStatus transaction = dataSourceTransactionManager.getTransaction(defaultTransactionDefinition); try { UserLog userLog = new UserLog(); userLog.setUserId(userId); userLogService.save(userLog); int i = 1 / 0; cb.await(); log.info("f2: "+flag.get()); if (flag.get()) { dataSourceTransactionManager.rollback(transaction); return; } dataSourceTransactionManager.commit(transaction); } catch (Exception e) { flag.set(true); dataSourceTransactionManager.rollback(transaction); try { cb.await(); } catch (InterruptedException | BrokenBarrierException ex) { throw new RuntimeException(ex); } throw new RuntimeException(e); } }, threadPoolExecutor); CompletableFuture.allOf(f1, f2).join(); return true; } }
|
注意:
使用CyclicBarrier
和手动事务时需要控制任务的超时时间。