引入maven依赖:
<dependency> <groupId>com.coreos</groupId> <artifactId>jetcd-core</artifactId> <version>0.0.2</version> </dependency>
分布式锁实现:
import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import com.coreos.jetcd.Client; import com.coreos.jetcd.Lease; import com.coreos.jetcd.Lock; import com.coreos.jetcd.data.ByteSequence; /** * Etcd Java客户端 Jetcd提供的Lock客户端实现分布式锁 */ public class EtcdDistributedLock { private static EtcdDistributedLock lock = null; private static Object mutex = new Object(); private Client client; // etcd客户端 private Lock lockClient; // etcd分布式锁客户端 private Lease leaseClient; // etcd租约客户端 private EtcdDistributedLock() { super(); // 创建Etcd客户端,本例中Etcd集群只有一个节点 this.client = Client.builder().endpoints("http://localhost:2379").build(); this.lockClient = client.getLockClient(); this.leaseClient = client.getLeaseClient(); } /** * 单例 */ public static EtcdDistributedLock getInstance() { synchronized (mutex) { // 互斥锁 if (null == lock) { lock = new EtcdDistributedLock(); } } return lock; } /** * 加锁操作,需要注意的是,本例中没有加入重试机制,加锁失败将直接返回。 * @param lockName: 针对某一共享资源(数据、文件等)制定的锁名 * @param TTL: Time To Live,租约有效期,一旦客户端崩溃,可在租约到期后自动释放锁 * @return LockResult */ public LockResult lock(String lockName, long TTL) { LockResult lockResult = new LockResult(); /* 1.准备阶段 */ // 创建一个定时任务作为“心跳”,保证等待锁释放期间,租约不失效; // 同时,一旦客户端发生故障,心跳便会停止,锁也会因租约过期而被动释放,避免死锁 ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); // 初始化返回值lockResult lockResult.setIsLockSuccess(false); lockResult.setService(service); // 记录租约ID,初始值设为 0L Long leaseId = 0L; /* 2.创建租约 */ try { // 创建一个租约,租约有效期为TTL,实际应用中根据具体业务确定 leaseId = leaseClient.grant(TTL).get().getID(); lockResult.setLeaseId(leaseId); // 启动定时任务续约,心跳周期和初次启动延时计算公式如下,可根据实际业务制定 long period = TTL - TTL / 5; service.scheduleAtFixedRate(new KeepAliveTask(leaseClient, leaseId), period, period, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException e) { System.err.println("[error]: Create lease failed:" + e); return lockResult; } System.out.println(System.currentTimeMillis() + "|[ lock]: "+Thread.currentThread().getName()+" start to lock."); /* 3.加锁操作 */ // 执行加锁操作,并为锁对应的key绑定租约 try { lockClient.lock(ByteSequence.fromString(lockName), leaseId).get(); } catch (InterruptedException | ExecutionException e1) { System.err.println("[error]: lock failed:" + e1); return lockResult; } System.out.println(System.currentTimeMillis() + "|[ lock]: "+Thread.currentThread().getName()+" lock successfully."); lockResult.setIsLockSuccess(true); return lockResult; } /** * 解锁操作,释放锁、关闭定时任务、解除租约 * * @param lockName:锁名 * @param lockResult:加锁操作返回的结果 */ public void unLock(String lockName, LockResult lockResult) { System.err.println(System.currentTimeMillis() + "|[unlock]: "+Thread.currentThread().getName()+" start to unlock."); try { // 释放锁 lockClient.unlock(ByteSequence.fromString(lockName)).get(); // 关闭定时任务 lockResult.getService().shutdown(); // 删除租约 if (lockResult.getLeaseId() != 0L) { leaseClient.revoke(lockResult.getLeaseId()); } } catch (InterruptedException | ExecutionException e) { System.err.println("[error]: unlock failed: " + e); } System.err.println(System.currentTimeMillis() + "|[unlock]: "+Thread.currentThread().getName()+" unlock successfully."); } /** * 在等待其它客户端释放锁期间,通过心跳续约,保证自己的锁对应租约不会失效 */ static class KeepAliveTask implements Runnable { private Lease leaseClient; private long leaseId; KeepAliveTask(Lease leaseClient, long leaseId) { this.leaseClient = leaseClient; this.leaseId = leaseId; } @Override public void run() { // 续约一次 leaseClient.keepAliveOnce(leaseId); } } /** * 该class用于描述加锁的结果,同时携带解锁操作所需参数 */ static class LockResult { private boolean isLockSuccess; private long leaseId; private ScheduledExecutorService service; LockResult() { super(); } public void setIsLockSuccess(boolean isLockSuccess) { this.isLockSuccess = isLockSuccess; } public void setLeaseId(long leaseId) { this.leaseId = leaseId; } public void setService(ScheduledExecutorService service) { this.service = service; } public boolean getIsLockSuccess() { return this.isLockSuccess; } public long getLeaseId() { return this.leaseId; } public ScheduledExecutorService getService() { return this.service; } } /** * 测试分布式锁 * @param args */ public static void main(String[] args) { // 模拟分布式场景下,多个进程 “抢锁” for (int i = 0; i < 10; i++) { new MyThread().start(); } } static class MyThread extends Thread { @Override public void run() { String lockName = "/lock/mylock"; // 分布式锁名称 // 1. 加锁 LockResult lockResult = getInstance().lock(lockName, 30); if (lockResult.getIsLockSuccess()) { // 获得了锁 try { Thread.sleep(10000); // sleep 10秒,模拟执行相关业务 } catch (InterruptedException e) { System.out.println("[error]:" + e); } } // 2. 解锁 getInstance().unLock(lockName, lockResult); } } }
相关推荐
基于go+etcd实现分布式锁 clientv3文档example: 文档example client.go package etcdq import ( context v3 go.etcd.io/etcd/clientv3 time ) type Client interface { ReadItem(string) KV ReadItems(string...
节点等锁 由支持的Node.js分布式锁。安装npm install node-etcd-lock用法'use strict'const assert = require ( 'assert' )const Locker = require ( 'node-etcd-lock' )const locker = new Locker ( { address : '...
在分布式环境中实现Leader选举、...具体实现时,可以参考etcd的文档和API,它们提供了分布式锁和Leader选举的具体实现方法。例如,etcd的clientv3库提供了concurrency模块,其中包含了实现分布式锁和选举所需的原语。
通过分布式锁,leader选举和写屏障(write barriers)来实现可靠的分布式协作。etcd集群是为高可用,持久性数据存储和检索而准备。 "etcd"这个名字源于两个想法,即 unix “/etc” 文件夹和分布式系统"d"istibuted。 ...
3.不同分布式锁实现方案的优缺点你清楚了吗? 4.如果让你手撸实现Redis分布式锁,你可以做到吗? 2、分布式事务 1.你知道为什么CAP不能同时满足吗? 2.你了解不同分布式事务解决方案对应什么样的应用场景么 ? 3.你...
在分布式系统中,如何管理节点间的状态一直是一个难题,etcd像是专门为集群环境的服务发现和注册而设计,它提供了数据TTL失效、数据改变监视、多值、目录监听、分布式锁原子操作等功能,可以方便的跟踪并管理集群...
English | 水滴 Waterdrop是基于gin和grpc的高性能微服务框架。 水滴来自(三体问题)。 产品特点 ...RPC服务器:基于官方的并使用ETCD进行服务...分布式锁:分布式锁基于Redis和ETCD实现。 前者适用于最终一致的业务锁
接下来是中心键值存储,etcd,它提供共享配置、服务发现和基于 Raft 共识算法构建的集群范围的锁服务,以实现高可用性。 最后,fleet 是一个分布式的 init 系统,它将所有的东西联系在一起并提供低级别的工作负载...
即使是顶级Go开发的项目,比如Docker、Kubernetes、gRPC、etcd, 都是有经验丰富的Go开发专家锁开发,也踩过不少的并发的坑,而且依然源源不断的继续踩着,即便是标准库也是这样。 分析和总结并发编程中的陷阱,避免...
这里的所有接口在主要的Spring库中都有对应的接口,实现也大多是并行的(etcd是一个例外,因为缺乏对第三方库中v3的支持)。 1.0.1版本不赞成使用Spring Integration。 注意:对于Hazelcast,支持位于。 Spring ...