在一些并发请求的时候,需要保证数据的准确性,在同一时刻只能允许一个请求对同一条数据进行修改操作。在之前的单机应用当中,很容易想到利用jdk的锁来实现,例如 synchronized或者lock 。但是在如今业务复杂的分布式系统中jdk的锁并不适用,所以必须要要用分布式锁。

常见的几种分布式锁的实现方案,Redis、Mysql 、zookeeper;

本文主要讲的是如何使用zk实现分布式锁:

     ZooKeeper是一个开源的分布式协调服务,他为分布式应用提供了高效且可靠的分布式协调服务,提供了诸如统一命名空间服务,配置服务和分布式锁等分布式基础服务。 

    ZooKeeper的数据模型是内存中的一个ZNode数,由斜杠(/)进行分割的路径,就是一个ZNode,每个ZNode上除了保存自己的数据内容,还保存一系列属性信息。

    ZooKeeper中的数据节点分为两种:持久节点和临时节点。所谓的持久节点是指一旦这个ZNode创建成功,除非主动进行ZNode的移除操作,节点会一直保存在ZooKeeper上;而临时节点的生命周期是跟客户端的(Session)会话相关联的,一旦客户端会话失效,这个会话上的所有临时节点都会被自动移除。

具体思路:

1、首先zookeeper中我们可以创建一个/distributed_lock持久化节点
2、然后再在/distributed_lock节点下创建自己的临时顺序节点,比如:/distributed_lock/task_00000000008
3、获取所有的/distributed_lock下的所有子节点,并排序
4、判读自己创建的节点是否最小值(第一位)
5、如果是,则获取得到锁,执行自己的业务逻辑,最后删除这个临时节点。
6、如果不是最小值,则需要监听自己创建节点前一位节点的数据变化,并阻塞。
7、当前一位节点被删除时,我们需要通过递归来判断自己创建的节点是否在是最小的,如果是则执行5);如果不是则执行6)(就是递归循环的判断)

一、导入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>zk_lock</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>zk_lock</name>
<description>zk_lock</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.9</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency><dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.22</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
</dependencies>

主要包括了zk客户端的依赖,mybatis-plus的依赖。

二、数据的配置

1
2
3
4
5
#数据库
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/db3?characterEncoding=utf8&verifyServerCertificate=false&useSSL=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=root

三、具体代码实现

分布式锁工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package com.example.zk_lock.util;

import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.zookeeper.CreateMode;

import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
* @author wls
*/
@Slf4j
public class DistributedLockUtil {

/**
* 常量
*/
private static final String CONNECTION_STRING = "localhost:2181";
/**
* 父节点
*/
private static final String LOCK_NODE = "/distributed_lock";
private static final String CHILDREN_NODE = "/lock_";

public static DistributedLockUtil distributedLockUtil;
public static ZkClient zkClient;

/**
* 创建zkClient
*/
public DistributedLockUtil() {
distributedLockUtil = this;
// 连接到Zookeeper
zkClient = new ZkClient(new ZkConnection(CONNECTION_STRING));

//创建节点
if (!zkClient.exists(LOCK_NODE)) {
zkClient.create(LOCK_NODE, "分布式锁节点", CreateMode.PERSISTENT);
}
}

/**
* 获取锁
*
* @return 锁名字
*/
public static String getLock() {
try {
// 1.在Zookeeper指定节点下创建临时顺序节点
String lockName = zkClient.createEphemeralSequential(LOCK_NODE + CHILDREN_NODE, "");
// 尝试获取锁
acquireLock(lockName);
return lockName;
} catch (Exception e) {
e.printStackTrace();
}

return null;
}

/**
* 尝试获取锁
*
* @throws InterruptedException 异常
*/
public static void acquireLock(String lockName) throws InterruptedException {
// 2.获取lock节点下的所有子节点
List<String> childrenList = zkClient.getChildren(LOCK_NODE);
// 3.对子节点进行排序,获取最小值
childrenList.sort(new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
return Integer.parseInt(o1.split("_")[1]) - Integer.parseInt(o2.split("_")[1]);
}
});
// 4.判断当前创建的节点是否在第一位
int lockPosition = childrenList.indexOf(lockName.split("/")[lockName.split("/").length - 1]);
if (lockPosition < 0) {
// 不存在该节点
throw new ZkNodeExistsException("不存在的节点:" + lockName);
}
if (lockPosition == 0) {
// 获取到锁
log.info("获取到锁:" + lockName);
return;
}
// 未获取到锁,阻塞
log.info("...... 未获取到锁,阻塞等待 。。。。。。");
// 5.如果未获取得到锁,监听当前创建的节点前一位的节点
final CountDownLatch latch = new CountDownLatch(1);
IZkDataListener listener = new IZkDataListener() {
/**
* 当被删除时的监听事件
* @param dataPath 节点
* @throws Exception 异常
*/
@Override
public void handleDataDeleted(String dataPath) throws Exception {
// 6.前一个节点被删除,当不保证轮到自己
log.info("。。。。。。前一个节点被删除 。。。。。。");
acquireLock(lockName);
latch.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data) {
//节点被改变
}
};
try {
//监听前一个节点
zkClient.subscribeDataChanges(LOCK_NODE + "/" + childrenList.get(lockPosition - 1), listener);
//阻塞
latch.await();
} finally {
log.info("。。。。。取消订阅。。。。。。");
//取消监听
zkClient.unsubscribeDataChanges(LOCK_NODE + "/" + childrenList.get(lockPosition - 1), listener);
}

}

/**
* 释放锁(删除节点)
*
* @param lockName 锁名字
*/
public static void releaseLock(String lockName) {
zkClient.delete(lockName);
}

public static void closeZkClient() {
zkClient.close();
}
}



具体业务实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 库存递减
*
* @param id id
* @param num 数量
* @return
*/
@Override
public boolean killGoods(Long id, Integer num) {

String lock = DistributedLockUtil.getLock();
if (Objects.nonNull(lock)) {
Goods goods = this.getById(id);
if (goods.getQuantity() <= 0) {
//库存数量不足,释放锁
DistributedLockUtil.releaseLock(lock);
return false;
}
log.info("库存数量======" + goods.getQuantity());
//将库存减操作
goods.setQuantity(goods.getQuantity() - 1);
this.updateById(goods);
DistributedLockUtil.releaseLock(lock);
return true;
}
return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
**
* @author wenlinshan
* @version 1.0
* @date 2021/6/16 11:06
* @desc
*/
@RequestMapping
@RestController
public class GoodsController {
@Resource
private GoodsService goodsService;

@GetMapping("test")
public String createOrderTest() {
if (!goodsService.killGoods(1405065181720055809L, 1)) {
return "库存不足";
}
return "创建订单成功";
}

@GetMapping("close")
public String closeZk(){
DistributedLockUtil.closeZkClient();
return "关闭成功";
}

}

至此一个简单的分布式锁的demo已经实现。
除此之外可以使用现成的框架curator来使用分布式锁。