4. Spring Boot使用Apache Curator实现分布式计数器「第四章 ZooKeeper Curator应用场景实战」「架构之路ZooKeeper理论和实战」

分布式计数器在curator中怎么玩呢,这一节我们来看看如何使用。

一、基本概念

1.1 什么是计数器

         计数器就是在原有值的基础上再增加一个值得到新的值。

1.2 什么是分布式计数器

         在分布式环境下的计数器,就是分布式计数器。

二、分布式计数器实战

2.1 环境说明

(1)基于前面章节《Spring Boot 使用 Curator 操作 ZooKeeper》进行往下编码。

(2)ZooKeeper版本:3.6.2

(3)Curator版本:5.1.0

(4)对于在普通的java项目中,代码是一样的,只要能获取到CuratorFramework就能搞定。

2.2 使用DistributedAtomicInteger

         使用DistributedAtomicInteger实现分布式计数器:

  1. package com.kfit.springbootcuratordemo.distributedatomic;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.recipes.atomic.AtomicValue;
  4. import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
  5. import org.apache.curator.retry.RetryForever;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Service;
  8. /**
  9. *
  10. * 分布式计数器
  11. *
  12. * @author 悟纤「公众号SpringBoot」
  13. * @date 2021-04-22
  14. * @slogan 大道至简 悟在天成
  15. */
  16. @Service
  17. public class DistributedAtomicService {
  18. @Autowired
  19. private CuratorFramework curatorFramework;
  20. public void atomic(){
  21. //DistributedAtomicLong,DistributedAtomicInteger
  22. //RetryNTimes(retryCount,sleepMsBetweenRetries):new RetryNTimes(3,100)
  23. //new RetryForever(100)
  24. DistributedAtomicInteger counter = new DistributedAtomicInteger(curatorFramework, "/atomic",new RetryForever(100));
  25. try {
  26. AtomicValue<Integer> value = counter.increment();
  27. String str = "";
  28. str += "===="+Thread.currentThread().getName()+"====";
  29. str += "原值为:"+value.preValue();//就是上一次修改成功之后的值
  30. str += ",更改后的值为:"+value.postValue();//如果成功就是最新的值,失败的话就是0
  31. str += ",状态:"+value.succeeded();//成功为true,失败为false.
  32. System.out.println(str);
  33. } catch (Exception e) {
  34. e.printStackTrace();
  35. }
  36. }
  37. }

2.3 编写测试代码






         在conroller使用多线程的方式进行调用:

  1. @Autowired
  2. private DistributedAtomicService distributedAtomicService;
  3. @RequestMapping("/atomic")
  4. public String atomic(){
  5. for(int i=0;i<20;i++){
  6. new Thread(){
  7. @Override
  8. public void run() {
  9. distributedAtomicService.atomic();
  10. }
  11. }.start();
  12. }
  13. return "ok";
  14. }

         访问测试下:

http://127.0.0.1:8080/shop/atomic

         控制台打印信息:

三、分布式计数器源码

         我们跟进代码counter.increment(),可以找到这么一段代码:

(1)如果节点不存在的话,那么就会走创建节点的逻辑,那么这个通过ZK只能创建一次来判断当前请求是否创建成功,节点被创建的话,那么执行这个创建执行就会失败。

(2)如果节点存在的话,那么就会走修改节点的逻辑,这里通过指定版本version,也就是使用了CAS来保证并发修改问题。

购买完整视频,请前往:http://www.mark-to-win.com/TeacherV2.html?id=287