`

etcd分布式锁实现

 
阅读更多

引入maven依赖:

  <dependency>
	    <groupId>com.coreos</groupId>
	    <artifactId>jetcd-core</artifactId>
	    <version>0.0.2</version>
	</dependency>

 

分布式锁实现:

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.coreos.jetcd.Client;
import com.coreos.jetcd.Lease;
import com.coreos.jetcd.Lock;
import com.coreos.jetcd.data.ByteSequence;

/**
 * Etcd Java客户端 Jetcd提供的Lock客户端实现分布式锁
 */
public class EtcdDistributedLock {
	
	private static EtcdDistributedLock lock = null;
	private static Object mutex = new Object();
	private Client client; // etcd客户端
	private Lock lockClient; // etcd分布式锁客户端
	private Lease leaseClient; // etcd租约客户端

	private EtcdDistributedLock() {
		super();
		// 创建Etcd客户端,本例中Etcd集群只有一个节点
		this.client = Client.builder().endpoints("http://localhost:2379").build();
		this.lockClient = client.getLockClient();
		this.leaseClient = client.getLeaseClient();
	}

	/**
	 * 单例
	 */
	public static EtcdDistributedLock getInstance() {
		synchronized (mutex) { // 互斥锁
			if (null == lock) {
				lock = new EtcdDistributedLock();
			}
		}
		return lock;
	}

	/**
	 * 加锁操作,需要注意的是,本例中没有加入重试机制,加锁失败将直接返回。
	 * @param lockName: 针对某一共享资源(数据、文件等)制定的锁名
	 * @param TTL: Time To Live,租约有效期,一旦客户端崩溃,可在租约到期后自动释放锁
	 * @return LockResult
	 */
	public LockResult lock(String lockName, long TTL) {
		LockResult lockResult = new LockResult();
		
		/* 1.准备阶段 */
		// 创建一个定时任务作为“心跳”,保证等待锁释放期间,租约不失效;
		// 同时,一旦客户端发生故障,心跳便会停止,锁也会因租约过期而被动释放,避免死锁
		ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();

		// 初始化返回值lockResult
		lockResult.setIsLockSuccess(false);
		lockResult.setService(service);

		// 记录租约ID,初始值设为 0L
		Long leaseId = 0L;

		/* 2.创建租约 */
		try {
			// 创建一个租约,租约有效期为TTL,实际应用中根据具体业务确定
			leaseId = leaseClient.grant(TTL).get().getID();
			lockResult.setLeaseId(leaseId);

			// 启动定时任务续约,心跳周期和初次启动延时计算公式如下,可根据实际业务制定
			long period = TTL - TTL / 5;
			service.scheduleAtFixedRate(new KeepAliveTask(leaseClient, leaseId), period, period, TimeUnit.SECONDS);
		} catch (InterruptedException | ExecutionException e) {
			System.err.println("[error]: Create lease failed:" + e);
			return lockResult;
		}
		System.out.println(System.currentTimeMillis() + "|[  lock]: "+Thread.currentThread().getName()+" start to lock.");

		/* 3.加锁操作 */
		// 执行加锁操作,并为锁对应的key绑定租约
		try {
			lockClient.lock(ByteSequence.fromString(lockName), leaseId).get();
		} catch (InterruptedException | ExecutionException e1) {
			System.err.println("[error]: lock failed:" + e1);
			return lockResult;
		}
		System.out.println(System.currentTimeMillis() + "|[  lock]: "+Thread.currentThread().getName()+" lock successfully.");

		lockResult.setIsLockSuccess(true);
		return lockResult;
	}

	/**
	 * 解锁操作,释放锁、关闭定时任务、解除租约
	 * 
	 * @param lockName:锁名
	 * @param lockResult:加锁操作返回的结果
	 */
	public void unLock(String lockName, LockResult lockResult) {
		System.err.println(System.currentTimeMillis() + "|[unlock]: "+Thread.currentThread().getName()+" start to unlock.");
		try {
			// 释放锁
			lockClient.unlock(ByteSequence.fromString(lockName)).get();
			// 关闭定时任务
			lockResult.getService().shutdown();
			// 删除租约
			if (lockResult.getLeaseId() != 0L) {
				leaseClient.revoke(lockResult.getLeaseId());
			}
		} catch (InterruptedException | ExecutionException e) {
			System.err.println("[error]: unlock failed: " + e);
		}

		System.err.println(System.currentTimeMillis() + "|[unlock]: "+Thread.currentThread().getName()+" unlock successfully.");
	}

	/**
	 * 在等待其它客户端释放锁期间,通过心跳续约,保证自己的锁对应租约不会失效
	 */
	static class KeepAliveTask implements Runnable {
		private Lease leaseClient;
		private long leaseId;

		KeepAliveTask(Lease leaseClient, long leaseId) {
			this.leaseClient = leaseClient;
			this.leaseId = leaseId;
		}

		@Override
		public void run() {
			// 续约一次
			leaseClient.keepAliveOnce(leaseId);
		}
	}

	/**
	 * 该class用于描述加锁的结果,同时携带解锁操作所需参数
	 */
	static class LockResult {
		private boolean isLockSuccess;
		private long leaseId;
		private ScheduledExecutorService service;

		LockResult() {
			super();
		}

		public void setIsLockSuccess(boolean isLockSuccess) {
			this.isLockSuccess = isLockSuccess;
		}

		public void setLeaseId(long leaseId) {
			this.leaseId = leaseId;
		}

		public void setService(ScheduledExecutorService service) {
			this.service = service;
		}

		public boolean getIsLockSuccess() {
			return this.isLockSuccess;
		}

		public long getLeaseId() {
			return this.leaseId;
		}

		public ScheduledExecutorService getService() {
			return this.service;
		}
	}

	/**
	 * 测试分布式锁
	 * @param args
	 */
	public static void main(String[] args) {
		// 模拟分布式场景下,多个进程 “抢锁”
		for (int i = 0; i < 10; i++) {
			new MyThread().start();
		}
	}

	static class MyThread extends Thread {
		@Override
		public void run() {
			String lockName = "/lock/mylock"; // 分布式锁名称
			
			// 1. 加锁
			LockResult lockResult = getInstance().lock(lockName, 30);
			if (lockResult.getIsLockSuccess()) { // 获得了锁
				try {
					Thread.sleep(10000); // sleep 10秒,模拟执行相关业务
				} catch (InterruptedException e) {
					System.out.println("[error]:" + e);
				}
			}

			// 2. 解锁
			getInstance().unLock(lockName, lockResult);
		}
	}
}

 

 

 

分享到:
评论

相关推荐

    【代码】基于etcd的分布式队列(golang版)

    基于go+etcd实现分布式锁 clientv3文档example: 文档example client.go package etcdq import ( context v3 go.etcd.io/etcd/clientv3 time ) type Client interface { ReadItem(string) KV ReadItems(string...

    node-etcd-lock:由etcd v3支持的Node.js分布式锁

    节点等锁 由支持的Node.js分布式锁。安装npm install node-etcd-lock用法'use strict'const assert = require ( 'assert' )const Locker = require ( 'node-etcd-lock' )const locker = new Locker ( { address : '...

    在分布式环境中Leader选举互斥锁和读写锁该如何实现

    在分布式环境中实现Leader选举、...具体实现时,可以参考etcd的文档和API,它们提供了分布式锁和Leader选举的具体实现方法。例如,etcd的clientv3库提供了concurrency模块,其中包含了实现分布式锁和选举所需的原语。

    etcd故障处理文档.docx

    通过分布式锁,leader选举和写屏障(write barriers)来实现可靠的分布式协作。etcd集群是为高可用,持久性数据存储和检索而准备。 "etcd"这个名字源于两个想法,即 unix “/etc” 文件夹和分布式系统"d"istibuted。 ...

    26章全Java主流分布式解决方案多场景设计与实战

    3.不同分布式锁实现方案的优缺点你清楚了吗? 4.如果让你手撸实现Redis分布式锁,你可以做到吗? 2、分布式事务 1.你知道为什么CAP不能同时满足吗? 2.你了解不同分布式事务解决方案对应什么样的应用场景么 ? 3.你...

    etcd-v3.3.10-linux-amd64.tar.zip

    在分布式系统中,如何管理节点间的状态一直是一个难题,etcd像是专门为集群环境的服务发现和注册而设计,它提供了数据TTL失效、数据改变监视、多值、目录监听、分布式锁原子操作等功能,可以方便的跟踪并管理集群...

    waterdrop::droplet:Waterdrop是一个高性能的微服务框架。 水滴来自(三体问题)

    English | 水滴 Waterdrop是基于gin和grpc的高性能微服务框架。 水滴来自(三体问题)。 产品特点 ...RPC服务器:基于官方的并使用ETCD进行服务...分布式锁:分布式锁基于Redis和ETCD实现。 前者适用于最终一致的业务锁

    osdc-2015:使用 CoreOS Talk 构建分布式系统

    接下来是中心键值存储,etcd,它提供共享配置、服务发现和基于 Raft 共识算法构建的集群范围的锁服务,以实现高可用性。 最后,fleet 是一个分布式的 init 系统,它将所有的东西联系在一起并提供低级别的工作负载...

    Go并发编程研讨课.pdf

    即使是顶级Go开发的项目,比如Docker、Kubernetes、gRPC、etcd, 都是有经验丰富的Go开发专家锁开发,也踩过不少的并发的坑,而且依然源源不断的继续踩着,即便是标准库也是这样。 分析和总结并发编程中的陷阱,避免...

    spring-cloud-cluster:现在该项目已被Spring Integration中的代码取代

    这里的所有接口在主要的Spring库中都有对应的接口,实现也大多是并行的(etcd是一个例外,因为缺乏对第三方库中v3的支持)。 1.0.1版本不赞成使用Spring Integration。 注意:对于Hazelcast,支持位于。 Spring ...

Global site tag (gtag.js) - Google Analytics