在一些并发请求的时候,需要保证数据的准确性,在同一时刻只能允许一个请求对同一条数据进行修改操作。在之前的单机应用当中,很容易想到利用jdk的锁来实现,例如 synchronized或者lock 。但是在如今业务复杂的分布式系统中jdk的锁并不适用,所以必须要要用分布式锁。
常见的几种分布式锁的实现方案,Redis、Mysql 、zookeeper;
本文主要讲的是如何使用zk实现分布式锁:
ZooKeeper是一个开源的分布式协调服务,他为分布式应用提供了高效且可靠的分布式协调服务,提供了诸如统一命名空间服务,配置服务和分布式锁等分布式基础服务。
ZooKeeper的数据模型是内存中的一个ZNode数,由斜杠(/)进行分割的路径,就是一个ZNode,每个ZNode上除了保存自己的数据内容,还保存一系列属性信息。
ZooKeeper中的数据节点分为两种:持久节点和临时节点。所谓的持久节点是指一旦这个ZNode创建成功,除非主动进行ZNode的移除操作,节点会一直保存在ZooKeeper上;而临时节点的生命周期是跟客户端的(Session)会话相关联的,一旦客户端会话失效,这个会话上的所有临时节点都会被自动移除。
具体思路:
1、首先zookeeper中我们可以创建一个/distributed_lock持久化节点
2、然后再在/distributed_lock节点下创建自己的临时顺序节点,比如:/distributed_lock/task_00000000008
3、获取所有的/distributed_lock下的所有子节点,并排序
4、判读自己创建的节点是否最小值(第一位)
5、如果是,则获取得到锁,执行自己的业务逻辑,最后删除这个临时节点。
6、如果不是最小值,则需要监听自己创建节点前一位节点的数据变化,并阻塞。
7、当前一位节点被删除时,我们需要通过递归来判断自己创建的节点是否在是最小的,如果是则执行5);如果不是则执行6)(就是递归循环的判断)
一、导入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.0.RELEASE</version> <relativePath/> </parent> <groupId>com.example</groupId> <artifactId>zk_lock</artifactId> <version>0.0.1-SNAPSHOT</version> <name>zk_lock</name> <description>zk_lock</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.7.9</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency><dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.3.1</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.22</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-validation</artifactId> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency> </dependencies>
|
主要包括了zk客户端的依赖,mybatis-plus的依赖。
二、数据的配置
1 2 3 4 5
| spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.url=jdbc:mysql://localhost:3306/db3?characterEncoding=utf8&verifyServerCertificate=false&useSSL=true&serverTimezone=Asia/Shanghai spring.datasource.username=root spring.datasource.password=root
|
三、具体代码实现
分布式锁工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
| package com.example.zk_lock.util;
import lombok.extern.slf4j.Slf4j; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import org.apache.zookeeper.CreateMode;
import java.util.Comparator; import java.util.List; import java.util.concurrent.CountDownLatch;
@Slf4j public class DistributedLockUtil {
private static final String CONNECTION_STRING = "localhost:2181";
private static final String LOCK_NODE = "/distributed_lock"; private static final String CHILDREN_NODE = "/lock_";
public static DistributedLockUtil distributedLockUtil; public static ZkClient zkClient;
public DistributedLockUtil() { distributedLockUtil = this; zkClient = new ZkClient(new ZkConnection(CONNECTION_STRING));
if (!zkClient.exists(LOCK_NODE)) { zkClient.create(LOCK_NODE, "分布式锁节点", CreateMode.PERSISTENT); } }
public static String getLock() { try { String lockName = zkClient.createEphemeralSequential(LOCK_NODE + CHILDREN_NODE, ""); acquireLock(lockName); return lockName; } catch (Exception e) { e.printStackTrace(); }
return null; }
public static void acquireLock(String lockName) throws InterruptedException { List<String> childrenList = zkClient.getChildren(LOCK_NODE); childrenList.sort(new Comparator<String>() { @Override public int compare(String o1, String o2) { return Integer.parseInt(o1.split("_")[1]) - Integer.parseInt(o2.split("_")[1]); } }); int lockPosition = childrenList.indexOf(lockName.split("/")[lockName.split("/").length - 1]); if (lockPosition < 0) { throw new ZkNodeExistsException("不存在的节点:" + lockName); } if (lockPosition == 0) { log.info("获取到锁:" + lockName); return; } log.info("...... 未获取到锁,阻塞等待 。。。。。。"); final CountDownLatch latch = new CountDownLatch(1); IZkDataListener listener = new IZkDataListener() {
@Override public void handleDataDeleted(String dataPath) throws Exception { log.info("。。。。。。前一个节点被删除 。。。。。。"); acquireLock(lockName); latch.countDown(); } @Override public void handleDataChange(String dataPath, Object data) { } }; try { zkClient.subscribeDataChanges(LOCK_NODE + "/" + childrenList.get(lockPosition - 1), listener); latch.await(); } finally { log.info("。。。。。取消订阅。。。。。。"); zkClient.unsubscribeDataChanges(LOCK_NODE + "/" + childrenList.get(lockPosition - 1), listener); }
}
public static void releaseLock(String lockName) { zkClient.delete(lockName); }
public static void closeZkClient() { zkClient.close(); } }
|
具体业务实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
@Override public boolean killGoods(Long id, Integer num) {
String lock = DistributedLockUtil.getLock(); if (Objects.nonNull(lock)) { Goods goods = this.getById(id); if (goods.getQuantity() <= 0) { DistributedLockUtil.releaseLock(lock); return false; } log.info("库存数量======" + goods.getQuantity()); goods.setQuantity(goods.getQuantity() - 1); this.updateById(goods); DistributedLockUtil.releaseLock(lock); return true; } return false; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| ** * @author wenlinshan * @version 1.0 * @date 2021/6/16 11:06 * @desc */ @RequestMapping @RestController public class GoodsController { @Resource private GoodsService goodsService;
@GetMapping("test") public String createOrderTest() { if (!goodsService.killGoods(1405065181720055809L, 1)) { return "库存不足"; } return "创建订单成功"; }
@GetMapping("close") public String closeZk(){ DistributedLockUtil.closeZkClient(); return "关闭成功"; }
}
|
至此一个简单的分布式锁的demo已经实现。
除此之外可以使用现成的框架curator来使用分布式锁。