当前位置:首页 > 服务端 > 架构师入门笔记八 并发框架Disruptor场景应用

架构师入门笔记八 并发框架Disruptor场景应用

2022年11月09日 22:44:31服务端8
架构师入门笔记八 并发框架Disruptor场景应用
今天用一个停车场问题来加深对Disruptor的理解。一个有关汽车进入停车场的问题。 当汽车进入停车场时,系统首先会记录汽车信息。同时也会发送消息到其他系统处理相关业务,最后发送短信通知车主收费开始。看了很多文章,里面的代码都是大同小异的,可能代码真的是很经典。以下代码也是来源网络,只是自己手动敲的,加了一些注释。

代码包含以下内容:
1) 事件对象Event
2)三个消费者Handler
3)一个生产者Processer
4)执行Main方法
Event类:汽车信息
public class MyInParkingDataEvent {

	private String carLicense; // 车牌号

	public String getCarLicense() {
		return carLicense;
	}

	public void setCarLicense(String carLicense) {
		this.carLicense = carLicense;
	}

}
Handler类:一个负责存储汽车数据,一个负责发送kafka信息到其他系统中,最后一个负责给车主发短信通知
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;

/**
 * Handler 第一个消费者,负责保存进场汽车的信息
 *
 */
public class MyParkingDataInDbHandler implements EventHandler<MyInParkingDataEvent> , WorkHandler<MyInParkingDataEvent>{

	@Override
	public void onEvent(MyInParkingDataEvent myInParkingDataEvent) throws Exception {
		long threadId = Thread.currentThread().getId(); // 获取当前线程id
		String carLicense = myInParkingDataEvent.getCarLicense(); // 获取车牌号
		System.out.println(String.format("Thread Id %s 保存 %s 到数据库中 ....", threadId, carLicense));
	}

	@Override
	public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
			throws Exception {
		this.onEvent(myInParkingDataEvent);
	}

}
import com.lmax.disruptor.EventHandler;

/**
 * 第二个消费者,负责发送通知告知工作人员(Kafka是一种高吞吐量的分布式发布订阅消息系统)
 */
public class MyParkingDataToKafkaHandler implements EventHandler<MyInParkingDataEvent>{

	@Override
	public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
			throws Exception {
		long threadId = Thread.currentThread().getId(); // 获取当前线程id
		String carLicense = myInParkingDataEvent.getCarLicense(); // 获取车牌号
		System.out.println(String.format("Thread Id %s 发送 %s 进入停车场信息给 kafka系统...", threadId, carLicense));
	}

}
import com.lmax.disruptor.EventHandler;

/**
 * 第三个消费者,sms短信服务,告知司机你已经进入停车场,计费开始。
 */
public class MyParkingDataSmsHandler implements EventHandler<MyInParkingDataEvent>{

	@Override
	public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
			throws Exception {
		long threadId = Thread.currentThread().getId(); // 获取当前线程id
		String carLicense = myInParkingDataEvent.getCarLicense(); // 获取车牌号
		System.out.println(String.format("Thread Id %s 给  %s 的车主发送一条短信,并告知他计费开始了 ....", threadId, carLicense));
	}

}
Producer类:负责上报停车数据
import java.util.concurrent.CountDownLatch;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.dsl.Disruptor;

/**
 * 生产者,进入停车场的车辆
 */
public class MyInParkingDataEventPublisher implements Runnable{
	
	private CountDownLatch countDownLatch; // 用于监听初始化操作,等初始化执行完毕后,通知主线程继续工作
	private Disruptor<MyInParkingDataEvent> disruptor;
	private static final Integer NUM = 1; // 1,10,100,1000
	
	public MyInParkingDataEventPublisher(CountDownLatch countDownLatch,
			Disruptor<MyInParkingDataEvent> disruptor) {
		this.countDownLatch = countDownLatch;
		this.disruptor = disruptor;
	}
	
	@Override
	public void run() {
		MyInParkingDataEventTranslator eventTranslator = new MyInParkingDataEventTranslator();
		try {
			for(int i = 0; i < NUM; i ++) {
				disruptor.publishEvent(eventTranslator);
				Thread.sleep(1000); // 假设一秒钟进一辆车
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			countDownLatch.countDown(); // 执行完毕后通知 await()方法
			System.out.println(NUM + "辆车已经全部进入进入停车场!");
		}
	}
	
}

class MyInParkingDataEventTranslator implements EventTranslator<MyInParkingDataEvent> {

	@Override
	public void translateTo(MyInParkingDataEvent myInParkingDataEvent, long sequence) {
		this.generateData(myInParkingDataEvent);
	}
	
	private MyInParkingDataEvent generateData(MyInParkingDataEvent myInParkingDataEvent) {
		myInParkingDataEvent.setCarLicense("车牌号: 鄂A-" + (int)(Math.random() * 100000)); // 随机生成一个车牌号
		System.out.println("Thread Id " + Thread.currentThread().getId() + " 写完一个event");
		return myInParkingDataEvent;
	}
	
}
执行的Main方法:
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;

/**
 * 执行的Main方法 ,
 * 一个生产者(汽车进入停车场);
 * 三个消费者(一个记录汽车信息,一个发送消息给系统,一个发送消息告知司机)
 * 前两个消费者同步执行,都有结果了再执行第三个消费者
 */
public class MyInParkingDataEventMain {
	
	public static void main(String[] args) {
		long beginTime=System.currentTimeMillis();
		int bufferSize = 2048; // 2的N次方
		try {
			// 创建线程池,负责处理Disruptor的四个消费者
			ExecutorService executor = Executors.newFixedThreadPool(4);
			
			// 初始化一个 Disruptor
			Disruptor<MyInParkingDataEvent> disruptor = new Disruptor<MyInParkingDataEvent>(new EventFactory<MyInParkingDataEvent>() {
				@Override
				public MyInParkingDataEvent newInstance() {
					return new MyInParkingDataEvent(); // Event 初始化工厂
				}
			}, bufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
			
			// 使用disruptor创建消费者组 MyParkingDataInDbHandler 和 MyParkingDataToKafkaHandler
			EventHandlerGroup<MyInParkingDataEvent> handlerGroup = disruptor.handleEventsWith(
					new MyParkingDataInDbHandler(), new MyParkingDataToKafkaHandler());
			
			// 当上面两个消费者处理结束后在消耗 smsHandler
			MyParkingDataSmsHandler myParkingDataSmsHandler = new MyParkingDataSmsHandler();
			handlerGroup.then(myParkingDataSmsHandler);
			
			// 启动Disruptor
			disruptor.start();
			
			CountDownLatch countDownLatch = new CountDownLatch(1); // 一个生产者线程准备好了就可以通知主线程继续工作了
			// 生产者生成数据
			executor.submit(new MyInParkingDataEventPublisher(countDownLatch, disruptor));
			countDownLatch.await(); // 等待生产者结束
			
			disruptor.shutdown();
			executor.shutdown();
		} catch (Exception e) {
			e.printStackTrace();
		}
		
		System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));
	}
	
}




用了两篇博客简单的介绍了Disruptor并发框架,如果想深入学习,可以到并发网里面找文章。下一章介绍BIO,NIO,AIO知识,为Netty5的入门打个基础。

学习博客:

作者:ITDragon龙
来源链接:https://blog.csdn.net/qq_19558705/article/details/77247912

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。


本文链接:https://www.javaclub.cn/server/68970.html

标签: 架构师
分享给朋友:

“架构师入门笔记八 并发框架Disruptor场景应用” 的相关文章

漫画:Spring Boot、Java多线程、分布式、Java架构的高级架构师进阶之路

漫画:Spring Boot、Java多线程、分布式、Java架构的高级架构师进阶之路

漫画:Spring Boot、Java多线程、分布式、Java架构的高级架构师进阶之路 漫话编程 3天前 Java,编程语言界的老大哥,不同于其他老大哥和小鲜肉,作为静态面向对象编程语言的代表,在诞生那天起,成为了精致优雅的代名词 漫漫20多...

史上最强Java架构师的13大技术能力讲解! | 附架构师能力图谱

史上最强Java架构师的13大技术能力讲解! | 附架构师能力图谱

从程序员进阶成为架构师,并非一蹴而就,需要系统化、阶段性地学习,在实战项目中融会贯通,这如同打怪通关,我们得一关一关突破,每攻破一个关口,就能得到更精良的装备,技能值也随之不断增长,直至大获全胜。   凡事预则立,在开始行动之前,我们有必要先来了解下这个岗位...

架构师成长之路(1)--什么是架构师

架构师成长之路(1)--什么是架构师

转自:https://blog.csdn.net/hguisu/article/details/38385371 前言: 哲学家常思考的问题:" 我是谁?"" 我从哪里来?"" 要到哪里去?不只是哲学家,我想每个人都有自己对这三个问题的认知。 如果我们要成...

从小白到大神的java架构师成长之路~

  一、基础篇   1.1 JVM   1.1.1. Java内存模型,Java内存管理,Java堆和栈,垃圾回收   jcp/en/jsr/detail?id=133ifeve/jmm-faq/   1.1.2. 了解JVM各种参数及调优   1.1.3...

JAVA程序员进阶之路——JAVA架构师全套视频下载

      相信大多程序员在工作三、四年后,就会对自己的工作有所懈怠,因为java程序员在工作三四年后,项目开发所需要的知识基本已经从工作中学习到并掌握到,此时在工作中莫非就是重复,复制粘贴是工作中的常态,此时很多人都很想寻新的方向来突破自己,这样才...

从三方面分析,Java程序员如何晋升为高薪Java架构师?

从三方面分析,Java程序员如何晋升为高薪Java架构师?

对于工作多年的程序员而言,日后的职业发展无非是继续专精技术、转型管理和晋升架构师三种选择。下面,我从架构师在一家公司有多重要、优秀架构师需要具备怎样的素质以及架构师的发展现状三个方面来分析,Java程序员如何才能晋升为优秀的高薪Java架构师? 希...

阿里架构师和你聊聊【系统架构】

阿里架构师和你聊聊【系统架构】

黄勇,从事近十年的 JavaEE 应用开发工作,现任阿里巴巴公司系统架构师。对分布式服务架构与大数据技术有深入研究,具有丰富的 B/S 架构开发经验与项目实战经验,擅长敏捷开发模式。国内开源软件推动者之一,Smart Framework 开源框架创始人。热爱技术交流,乐于分享自...

Java架构师技术学习清单(2019修订版)

Java架构师技术学习清单(2019修订版)

想成为java架构师,首先你自身得是一个高级java工程师,会使用各种框架并且很熟练,且知晓框架实现的原理。架构师是一个既需要掌控整体又需要洞悉局部瓶颈并依据具体的业务场景给出解决方案的团队领导型人物。一个架构师得需要足够的想像力,能把各种目标...

阿里P8工程师整理的22本Java架构师核心书单,先收藏起来!

阿里P8工程师整理的22本Java架构师核心书单,先收藏起来!

随便打开一个招聘网站,看看对高级Java工程师的技能要求。 抛开其它的经验能力等等,单纯从技术,或者说知识上来讲,可以发现一些共通的地方。 Java基础 计算机基础 数据库,SQL/NoSQL 常用开源框架...

极客大学架构师训练营 毕业典礼 奉献优秀架构师升级攻略

极客大学架构师训练营 毕业典礼 奉献优秀架构师升级攻略

说明 极客大学架构师训练营 毕业典礼 首席架构师:李智慧老师 寄语 结业仪式 班班有话说 大家好,我是你们的班主任依依,从2020年5月30号,我们一起正式开启了这趟学习之旅,在这15周的时间里,我们相识相知,相互鼓...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。