本文最后更新于:13 天前
缓存是一种介于数据永久存储介质与应用程序之间的数据临时存储介质,使用缓存可以有效的减少低速数据读取过程的次数(例如磁盘IO),提高系统性能。此外缓存不仅可以用于提高永久性存储介质的数据读取效率,还可以提供临时的数据存储空间。springboot提供了对市面上几乎所有的缓存技术进行整合的方案。
缓存 企业级应用主要作用是信息处理,当需要读取数据时,由于受限于数据库的访问效率,导致整体系统性能偏低。
为了改善上述现象,开发者通常会在应用程序与数据库之间建立一种临时的数据存储机制,该区域中的数据在内存中保存,读写速度较快,可以有效解决数据库访问效率低下的问题。这一块临时存储数据的区域就是缓存。
缓存是一种介于数据永久存储介质与应用程序之间的数据临时存储介质,使用缓存可以有效的减少低速数据读取过程的次数(例如磁盘IO),提高系统性能。此外缓存不仅可以用于提高永久性存储介质的数据读取效率,还可以提供临时的数据存储空间。springboot提供了对市面上几乎所有的缓存技术进行整合的方案。
SpringBoot内置缓存解决方案 springboot技术提供有内置的缓存解决方案,可以帮助开发者快速开启缓存技术,并使用缓存技术进行数据的快速操作,例如读取缓存数据和写入数据到缓存。
步骤① :导入springboot提供的缓存技术对应的starter
1 2 3 4 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-cache</artifactId > </dependency >
步骤② :启用缓存,在引导类上方标注注解@EnableCaching配置springboot程序中可以使用缓存
1 2 3 4 5 6 7 8 @SpringBootApplication @EnableCaching public class Springboot19CacheApplication { public static void main (String[] args) { SpringApplication.run(Springboot19CacheApplication.class, args); } }
步骤③ :设置操作的数据是否使用缓存
1 2 3 4 5 6 7 8 9 10 @Service public class BookServiceImpl implements BookService { @Autowired private BookDao bookDao; @Cacheable(value="cacheSpace",key="#id") public Book getById (Integer id) { return bookDao.selectById(id); } }
在业务方法上面使用注解@Cacheable声明当前方法的返回值放入缓存中,其中要指定缓存的存储位置,以及缓存中保存当前方法返回值对应的名称。上例中value属性描述缓存的存储位置,可以理解为是一个存储空间名,key属性描述了缓存中保存数据的名称,使用#id
读取形参中的id值作为缓存名称。
使用@Cacheable注解后,执行当前操作,如果发现对应名称在缓存中没有数据,就正常读取数据,然后放入缓存;如果对应名称在缓存中有数据,就终止当前业务方法执行,直接返回缓存中的数据。
手机验证码案例 为了便于下面演示各种各样的缓存技术,我们创建一个手机验证码的案例环境,模拟使用缓存保存手机验证码的过程。手机验证码案例需求如下:
输入手机号获取验证码,组织文档以短信形式发送给用户(页面模拟)
输入手机号和验证码验证结果
为了描述上述操作,我们制作两个表现层接口,一个用来模拟发送短信的过程,其实就是根据用户提供的手机号生成一个验证码,然后放入缓存,另一个用来模拟验证码校验的过程,其实就是使用传入的手机号和验证码进行匹配,并返回最终匹配结果。下面直接制作本案例的模拟代码,先以上例中springboot提供的内置缓存技术来完成当前案例的制作。
步骤① :导入springboot提供的缓存技术对应的starter
1 2 3 4 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-cache</artifactId > </dependency >
步骤② :启用缓存,在引导类上方标注注解@EnableCaching配置springboot程序中可以使用缓存
1 2 3 4 5 6 7 8 @SpringBootApplication @EnableCaching public class Springboot19CacheApplication { public static void main (String[] args) { SpringApplication.run(Springboot19CacheApplication.class, args); } }
步骤③ :定义验证码对应的实体类,封装手机号与验证码两个属性
1 2 3 4 5 @Data public class SMSCode { private String tele; private String code; }
步骤④ :定义验证码功能的业务层接口与实现类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public interface SMSCodeService { public String sendCodeToSMS (String tele) ; public boolean checkCode (SMSCode smsCode) ; }@Service public class SMSCodeServiceImpl implements SMSCodeService { @Autowired private CodeUtils codeUtils; @CachePut(value = "smsCode", key = "#tele") public String sendCodeToSMS (String tele) { String code = codeUtils.generator(tele); return code; } public boolean checkCode (SMSCode smsCode) { String code = smsCode.getCode(); String cacheCode = codeUtils.get(smsCode.getTele()); return code.equals(cacheCode); } }
获取验证码后,当验证码失效时必须重新获取验证码,因此在获取验证码的功能上不能使用@Cacheable注解,@Cacheable注解是缓存中没有值则放入值,缓存中有值则取值。此处的功能仅仅是生成验证码并放入缓存,并不具有从缓存中取值的功能,因此不能使用@Cacheable注解,应该使用仅具有向缓存中保存数据的功能,使用@CachePut注解即可。
对于校验验证码的功能建议放入工具类中进行,因为方法上的@Cacheable注解只有被Spring容器管理,通过bean调用才会被加载到。
步骤⑤ :定义验证码的生成策略与根据手机号读取验证码的功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Component public class CodeUtils { private String [] patch = {"000000" ,"00000" ,"0000" ,"000" ,"00" ,"0" ,"" }; public String generator (String tele) { int hash = tele.hashCode(); int encryption = 20206666 ; long result = hash ^ encryption; long nowTime = System.currentTimeMillis(); result = result ^ nowTime; long code = result % 1000000 ; code = code < 0 ? -code : code; String codeStr = code + "" ; int len = codeStr.length(); return patch[len] + codeStr; } @Cacheable(value = "smsCode",key="#tele") public String get (String tele) { return null ; } }
步骤⑥ :定义验证码功能的web层接口,一个方法用于提供手机号获取验证码,一个方法用于提供手机号和验证码进行校验
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @RestController @RequestMapping("/sms") public class SMSCodeController { @Autowired private SMSCodeService smsCodeService; @GetMapping public String getCode (String tele) { String code = smsCodeService.sendCodeToSMS(tele); return code; } @PostMapping public boolean checkCode (SMSCode smsCode) { return smsCodeService.checkCode(smsCode); } }
SpringBoot整合Ehcache缓存 下面开始springboot整合各种各样的缓存技术。第一个整合Ehcache技术。Ehcache是一种缓存技术,使用springboot整合Ehcache其实就是变更一下缓存技术的实现方式。
步骤① :导入Ehcache的坐标
1 2 3 4 <dependency > <groupId > net.sf.ehcache</groupId > <artifactId > ehcache</artifactId > </dependency >
此处为什么不是导入Ehcache的starter,而是导入技术坐标呢?其实springboot整合缓存技术做的是通用格式,不管你整合哪种缓存技术,只是实现变化了,操作方式一样。这也体现出springboot技术的优点,统一同类技术的整合方式。
步骤② :配置缓存技术实现使用Ehcache
1 2 3 4 5 spring: cache: type: ehcache ehcache: config: ehcache.xml
配置缓存的类型type为ehcache,此处需要说明一下,当前springboot可以整合的缓存技术中包含有ehcach,所以可以这样书写,不是随便写一个名称就可以整合的。
由于ehcache的配置有独立的配置文件格式,因此还需要指定ehcache的配置文件,以便于读取相应配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 <?xml version="1.0" encoding="UTF-8"?> <ehcache xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation ="http://ehcache.org/ehcache.xsd" updateCheck ="false" > <diskStore path ="D:\ehcache" /> <defaultCache eternal ="false" diskPersistent ="false" maxElementsInMemory ="1000" overflowToDisk ="false" timeToIdleSeconds ="60" timeToLiveSeconds ="60" memoryStoreEvictionPolicy ="LRU" /> <cache name ="smsCode" eternal ="false" diskPersistent ="false" maxElementsInMemory ="1000" overflowToDisk ="false" timeToIdleSeconds ="10" timeToLiveSeconds ="10" memoryStoreEvictionPolicy ="LRU" /> </ehcache >
注意前面的案例中,设置了数据保存的位置是smsCode:
这个设定需要保障ehcache中有一个缓存空间名称叫做smsCode的配置,前后要统一。在企业开发过程中,通过设置不同名称的cache来设定不同的缓存策略,应用于不同的缓存数据。
到这里springboot整合Ehcache就做完了,可以发现一点,原始代码没有任何修改,仅仅是加了一组配置就可以变更缓存供应商了,这也是springboot提供了统一的缓存操作接口的优势,变更实现并不影响原始代码的书写。
总结
springboot使用Ehcache作为缓存实现需要导入Ehcache的坐标;
修改设置,配置缓存供应商为ehcache,并提供对应的缓存配置文件。
SpringBoot整合Redis缓存 上节使用Ehcache替换了springboot内置的缓存技术,其实springboot支持的缓存技术还很多,下面使用redis技术作为缓存解决方案来实现手机验证码案例。
比对使用Ehcache的过程,加坐标,改缓存实现类型为ehcache,做Ehcache的配置。如果还成redis做缓存呢?一模一样,加坐标,改缓存实现类型为redis,做redis的配置。差别之处只有一点,redis的配置可以在yml文件中直接进行配置,无需制作独立的配置文件。
步骤① :导入redis的坐标
1 2 3 4 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-data-redis</artifactId > </dependency >
步骤② :配置缓存技术实现使用redis
1 2 3 4 5 6 spring: redis: host: localhost port: 6379 cache: type: redis
如果需要对redis作为缓存进行配置,注意不是对原始的redis进行配置,而是配置redis作为缓存使用相关的配置,隶属于spring.cache.redis节点下,注意不要写错位置了。
1 2 3 4 5 6 7 8 9 10 11 spring: redis: host: localhost port: 6379 cache: type: redis redis: use-key-prefix: false key-prefix: sms_ cache-null-values: false time-to-live: 10s
总结
springboot使用redis作为缓存实现需要导入redis的坐标
修改设置,配置缓存供应商为redis,并提供对应的缓存配置
SpringBoot整合Memcached缓存 目前我们已经掌握了3种缓存解决方案的配置形式,分别是springboot内置缓存,ehcache和redisx,下面学习一下国内比较流行的一款缓存memcached。
其实变更缓存并不繁琐,但是springboot并没有支持使用memcached作为其缓存解决方案,也就是说在type属性中没有memcached的配置选项,这里就需要更变一下处理方式了。在整合之前先安装memcached。
安装 windows版安装包下载地址:https://www.runoob.com/memcached/window-install-memcached.html
下载的安装包是解压缩就能使用的zip文件,解压缩完毕后会得到如下文件:
可执行文件只有一个memcached.exe,使用该文件可以将memcached作为系统服务启动,执行此文件时会出现报错信息,如下:
此处出现问题的原因是注册系统服务时需要使用管理员权限,当前账号权限不足导致安装服务失败,切换管理员账号权限启动命令行
然后再次执行安装服务的命令即可,如下:
1 memcached.exe -d install
服务安装完毕后可以使用命令启动和停止服务,如下:
1 2 memcached.exe -d start # 启动服务 memcached.exe -d stop # 停止服务
也可以在任务管理器中进行服务状态的切换
变更缓存为Memcached 由于memcached未被springboot收录为缓存解决方案,因此使用memcached需要通过手工硬编码的方式来使用,需要自己写了。
memcached目前提供有三种客户端技术,分别是Memcached Client for Java、SpyMemcached和Xmemcached,其中性能指标各方面最好的客户端是Xmemcached,本次整合就使用这个作为客户端实现技术了。下面开始使用Xmemcached
步骤① :导入xmemcached的坐标
1 2 3 4 5 <dependency > <groupId > com.googlecode.xmemcached</groupId > <artifactId > xmemcached</artifactId > <version > 2.4.7</version > </dependency >
步骤② :配置memcached,制作memcached的配置类
1 2 3 4 5 6 7 8 9 @Configuration public class XMemcachedConfig { @Bean public MemcachedClient getMemcachedClient () throws IOException { MemcachedClientBuilder memcachedClientBuilder = new XMemcachedClientBuilder("localhost:11211" ); MemcachedClient memcachedClient = memcachedClientBuilder.build(); return memcachedClient; } }
memcached默认对外服务端口11211。
步骤③ :使用xmemcached客户端操作缓存,注入MemcachedClient对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Service public class SMSCodeServiceImpl implements SMSCodeService { @Autowired private CodeUtils codeUtils; @Autowired private MemcachedClient memcachedClient; public String sendCodeToSMS (String tele) { String code = codeUtils.generator(tele); try { memcachedClient.set(tele,10 ,code); } catch (Exception e) { e.printStackTrace(); } return code; } public boolean checkCode (SMSCode smsCode) { String code = null ; try { code = memcachedClient.get(smsCode.getTele()).toString(); } catch (Exception e) { e.printStackTrace(); } return smsCode.getCode().equals(code); } }
设置值到缓存中使用set操作,取值使用get操作,其实更符合我们开发者的习惯。上述代码中对于服务器的配置使用硬编码写死到了代码中,最好将此数据提取出来,做成独立的配置属性。
定义配置属性 以下过程采用前期学习的属性配置方式进行,当前操作有助于理解原理篇中的很多知识。
定义memcached节点信息
1 2 3 4 memcached: servers: localhost:11211 poolSize: 10 opTimeout: 3000
定义配置类,加载必要的配置属性,读取配置文件中memcached节点信息
1 2 3 4 5 6 7 8 @Component @ConfigurationProperties(prefix = "memcached") @Data public class XMemcachedProperties { private String servers; private int poolSize; private long opTimeout; }
在memcached配置类中加载信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Configuration public class XMemcachedConfig { @Autowired private XMemcachedProperties props; @Bean public MemcachedClient getMemcachedClient () throws IOException { MemcachedClientBuilder memcachedClientBuilder = new XMemcachedClientBuilder(props.getServers()); memcachedClientBuilder.setConnectionPoolSize(props.getPoolSize()); memcachedClientBuilder.setOpTimeout(props.getOpTimeout()); MemcachedClient memcachedClient = memcachedClientBuilder.build(); return memcachedClient; } }
总结
memcached安装后需要启动对应服务才可以对外提供缓存功能,安装memcached服务需要基于windows系统管理员权限
由于springboot没有提供对memcached的缓存整合方案,需要采用手工编码的形式创建xmemcached客户端操作缓存
导入xmemcached坐标后,创建memcached配置类,注册MemcachedClient对应的bean,用于操作缓存
初始化MemcachedClient对象所需要使用的属性可以通过自定义配置属性类的形式加载
SpringBoot整合jetcache缓存 目前我们使用的缓存都是要么A要么B,能不能AB一起用呢?springboot针对缓存的整合仅仅停留在用缓存上面,如果缓存自身不支持同时支持AB一起用,springboot也没办法,所以要想解决AB缓存一起用的问题,就必须找一款缓存能够支持AB两种缓存一起用,有这种缓存吗?阿里出品的 jetcache。
jetcache严格意义上来说,并不是一个缓存解决方案,只能说他算是一个缓存框架,然后把别的缓存放到jetcache中管理,这样就可以支持AB缓存一起用了。并且jetcache参考了springboot整合缓存的思想,整体技术使用方式和springboot的缓存解决方案思想非常类似。
jetcache并不是随便拿两个缓存都能拼到一起去的。目前jetcache支持的缓存方案本地缓存支持两种,远程缓存支持两种,分别如下:
为什么jetcache只支持2+2这么4款缓存呢?阿里研发这个技术其实主要是为了满足自身的使用需要。最初肯定只有1+1种,逐步变化成2+2种。下面就以LinkedHashMap+Redis的方案实现本地与远程缓存方案同时使用。
纯远程方案 步骤① :导入springboot整合jetcache对应的坐标starter,当前坐标默认使用的远程方案是redis
1 2 3 4 5 <dependency > <groupId > com.alicp.jetcache</groupId > <artifactId > jetcache-starter-redis</artifactId > <version > 2.6.2</version > </dependency >
步骤② :远程方案基本配置
1 2 3 4 5 6 7 8 jetcache: remote: default: type: redis host: localhost port: 6379 poolConfig: maxTotal: 50
其中poolConfig是必配项,否则会报错。
步骤③ :启用缓存,在引导类上方标注注解@EnableCreateCacheAnnotation配置springboot程序中可以使用注解的形式创建缓存
1 2 3 4 5 6 7 8 @SpringBootApplication @EnableCreateCacheAnnotation public class Springboot20JetCacheApplication { public static void main (String[] args) { SpringApplication.run(Springboot20JetCacheApplication.class, args); } }
步骤④ :创建缓存对象Cache,并使用注解@CreateCache标记当前缓存的信息,然后使用Cache对象的API操作缓存,put写缓存,get读缓存。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Service public class SMSCodeServiceImpl implements SMSCodeService { @Autowired private CodeUtils codeUtils; @CreateCache(name="jetCache_",expire = 10,timeUnit = TimeUnit.SECONDS) private Cache<String ,String> jetCache; public String sendCodeToSMS (String tele) { String code = codeUtils.generator(tele); jetCache.put(tele,code); return code; } public boolean checkCode (SMSCode smsCode) { String code = jetCache.get(smsCode.getTele()); return smsCode.getCode().equals(code); } }
通过上述jetcache使用远程方案连接redis可以看出,jetcache操作缓存时的接口操作更符合开发者习惯,使用缓存就先获取缓存对象Cache,放数据进去就是put,取数据出来就是get,更加简单易懂。并且jetcache操作缓存时,可以为某个缓存对象设置过期时间,将同类型的数据放入缓存中,方便有效周期的管理。
上述方案中使用的是配置中定义的default缓存,其实这个default是个名字,可以随便写,也可以随便加。例如再添加一种缓存解决方案,参照如下配置进行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 jetcache: remote: default: type: redis host: localhost port: 6379 poolConfig: maxTotal: 50 sms: type: redis host: localhost port: 6379 poolConfig: maxTotal: 50
如果想使用名称是sms的缓存,需要再创建缓存时指定参数area,声明使用对应缓存即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Service public class SMSCodeServiceImpl implements SMSCodeService { @Autowired private CodeUtils codeUtils; @CreateCache(area="sms", name="jetCache_", expire = 10,timeUnit = TimeUnit.SECONDS) private Cache<String ,String> jetCache; public String sendCodeToSMS (String tele) { String code = codeUtils.generator(tele); jetCache.put(tele,code); return code; } public boolean checkCode (SMSCode smsCode) { String code = jetCache.get(smsCode.getTele()); return smsCode.getCode().equals(code); } }
纯本地方案 远程方案中,配置中使用remote表示远程,换成local就是本地,只不过类型不一样而已。
步骤① :导入springboot整合jetcache对应的坐标starter
1 2 3 4 5 <dependency > <groupId > com.alicp.jetcache</groupId > <artifactId > jetcache-starter-redis</artifactId > <version > 2.6.2</version > </dependency >
步骤② :本地缓存基本配置
1 2 3 4 5 jetcache: local: default: type: linkedhashmap keyConvertor: fastjson
为了加速数据获取时key的匹配速度,jetcache要求指定key的类型转换器。简单说就是,如果你给了一个Object作为key的话,我先用key的类型转换器给转换成字符串,然后再保存。等到获取数据时,仍然是先使用给定的Object转换成字符串,然后根据字符串匹配。由于jetcache是阿里的技术,这里推荐key的类型转换器使用阿里的fastjson。
步骤③ :启用缓存
1 2 3 4 5 6 7 8 @SpringBootApplication @EnableCreateCacheAnnotation public class Springboot20JetCacheApplication { public static void main (String[] args) { SpringApplication.run(Springboot20JetCacheApplication.class, args); } }
步骤④ :创建缓存对象Cache时,标注当前使用本地缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Service public class SMSCodeServiceImpl implements SMSCodeService { @CreateCache(name="jetCache_",expire = 1000,timeUnit = TimeUnit.SECONDS,cacheType = CacheType.LOCAL) private Cache<String ,String> jetCache; public String sendCodeToSMS (String tele) { String code = codeUtils.generator(tele); jetCache.put(tele,code); return code; } public boolean checkCode (SMSCode smsCode) { String code = jetCache.get(smsCode.getTele()); return smsCode.getCode().equals(code); } }
cacheType控制当前缓存使用本地缓存还是远程缓存,配置cacheType=CacheType.LOCAL即使用本地缓存。
本地+远程方案 本地和远程方法都有了,两种方案一起使用如何配置呢?其实就是将两种配置合并到一起就可以了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 jetcache: local: default: type: linkedhashmap keyConvertor: fastjson remote: default: type: redis host: localhost port: 6379 poolConfig: maxTotal: 50 sms: type: redis host: localhost port: 6379 poolConfig: maxTotal: 50
在创建缓存的时候,配置cacheType为BOTH即则本地缓存与远程缓存同时使用:
1 2 3 4 5 6 7 @Service public class SMSCodeServiceImpl implements SMSCodeService { @CreateCache(name="jetCache_",expire = 1000,timeUnit = TimeUnit.SECONDS,cacheType = CacheType.BOTH) private Cache<String ,String> jetCache; }
cacheType如果不进行配置,默认值是REMOTE,即仅使用远程缓存方案。
关于jetcache的配置,参考以下信息:
属性
默认值
说明
jetcache.statIntervalMinutes
0
统计间隔,0表示不统计
jetcache.hiddenPackages
无
自动生成name时,隐藏指定的包名前缀
jetcache.[local|remote].${area}.type
无
缓存类型,本地支持linkedhashmap、caffeine,远程支持redis、tair
jetcache.[local|remote].${area}.keyConvertor
无
key转换器,当前仅支持fastjson
jetcache.[local|remote].${area}.valueEncoder
java
仅remote类型的缓存需要指定,可选java和kryo
jetcache.[local|remote].${area}.valueDecoder
java
仅remote类型的缓存需要指定,可选java和kryo
jetcache.[local|remote].${area}.limit
100
仅local类型的缓存需要指定,缓存实例最大元素数
jetcache.[local|remote].${area}.expireAfterWriteInMillis
无穷大
默认过期时间,毫秒单位
jetcache.local.${area}.expireAfterAccessInMillis
0
仅local类型的缓存有效,毫秒单位,最大不活动间隔
以上方案仅支持手工控制缓存,但是springcache方案中的方法缓存特别好用,给一个方法添加一个注解,方法就会自动使用缓存。jetcache也提供了对应的功能,即方法缓存。
方法缓存 jetcache提供了方法缓存方案,只不过名称变更了而已。在对应的操作接口上方使用注解@Cached即可。
步骤① :导入springboot整合jetcache对应的坐标starter
1 2 3 4 5 <dependency > <groupId > com.alicp.jetcache</groupId > <artifactId > jetcache-starter-redis</artifactId > <version > 2.6.2</version > </dependency >
步骤② :配置缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 jetcache: local: default: type: linkedhashmap keyConvertor: fastjson remote: default: type: redis host: localhost port: 6379 keyConvertor: fastjson valueEncode: java valueDecode: java poolConfig: maxTotal: 50 sms: type: redis host: localhost port: 6379 poolConfig: maxTotal: 50
由于redis缓存中不支持保存对象,因此需要对redis设置当Object类型数据进入到redis中时如何进行类型转换。需要配置keyConvertor表示key的类型转换方式,同时标注value的转换类型方式,值进入redis时是java类型,标注valueEncode为java,值从redis中读取时转换成java,标注valueDecode为java。
注意,为了实现Object类型的值进出redis,需要保障进出redis的Object类型的数据必须实现序列化接口。
1 2 3 4 5 6 7 @Data public class Book implements Serializable { private Integer id; private String type; private String name; private String description; }
步骤③ :启用缓存时开启方法缓存功能,并配置basePackages,说明在哪些包中开启方法缓存
1 2 3 4 5 6 7 8 9 10 @SpringBootApplication @EnableCreateCacheAnnotation @EnableMethodCache(basePackages = "com.gaojie") public class Springboot20JetCacheApplication { public static void main (String[] args) { SpringApplication.run(Springboot20JetCacheApplication.class, args); } }
步骤④ :使用注解@Cached标注当前方法使用缓存
1 2 3 4 5 6 7 8 9 10 11 @Service public class BookServiceImpl implements BookService { @Autowired private BookDao bookDao; @Override @Cached(name="book_",key="#id",expire = 3600,cacheType = CacheType.REMOTE) public Book getById (Integer id) { return bookDao.selectById(id); } }
远程方案的数据同步 由于远程方案中redis保存的数据可以被多个客户端共享,这就存在了数据同步问题。jetcache提供了3个注解解决此问题,分别在更新、删除操作时同步缓存数据,和读取缓存时定时刷新数据:
更新缓存
1 2 3 4 @CacheUpdate(name="book_",key="#book.id",value="#book") public boolean update (Book book) { return bookDao.updateById(book) > 0 ; }
删除缓存
1 2 3 4 @CacheInvalidate(name="book_",key = "#id") public boolean delete (Integer id) { return bookDao.deleteById(id) > 0 ; }
定时刷新缓存
1 2 3 4 5 @Cached(name="book_",key="#id",expire = 3600,cacheType = CacheType.REMOTE) @CacheRefresh(refresh = 5) public Book getById (Integer id) { return bookDao.selectById(id); }
数据报表 jetcache还提供有简单的数据报表功能,帮助开发者快速查看缓存命中信息,只需要添加一个配置即可:
1 2 jetcache: statIntervalMinutes: 1
设置后,每1分钟在控制台输出缓存数据命中信息:
总结
jetcache是一个类似于springcache的缓存解决方案,自身不具有缓存功能,它提供有本地缓存与远程缓存多级共同使用的缓存解决方案;
jetcache提供的缓存解决方案受限于目前支持的方案,本地缓存支持两种,远程缓存支持两种;
注意数据进入远程缓存时的类型转换问题;
jetcache提供方法缓存,并提供了对应的缓存更新与刷新功能;
jetcache提供有简单的缓存信息命中报表方便开发者即时监控缓存数据命中情况、
SpringBoot整合j2cache缓存 jetcache可以在限定范围内构建多级缓存,但是灵活性不足,不能随意搭配缓存,下面介绍一种可以随意搭配缓存解决方案的缓存整合框架,j2cache。以Ehcache与redis整合为例,来演示如何使用这种缓存框架:
步骤① :导入j2cache、redis、ehcache坐标
1 2 3 4 5 6 7 8 9 10 11 12 13 14 <dependency > <groupId > net.oschina.j2cache</groupId > <artifactId > j2cache-core</artifactId > <version > 2.8.4-release</version > </dependency > <dependency > <groupId > net.oschina.j2cache</groupId > <artifactId > j2cache-spring-boot2-starter</artifactId > <version > 2.8.0-release</version > </dependency > <dependency > <groupId > net.sf.ehcache</groupId > <artifactId > ehcache</artifactId > </dependency >
j2cache的starter中默认包含了redis坐标,官方推荐使用redis作为二级缓存,因此此处无需导入redis坐标。
步骤② :配置一级与二级缓存,并配置一二级缓存间数据传递方式,配置书写在名称为 j2cache.properties 的文件中。如果使用ehcache还需要单独添加ehcache的配置文件:
1 2 3 4 5 6 7 8 9 10 11 j2cache.L1.provider_class = ehcache ehcache.configXml = ehcache.xml j2cache.L2.provider_class = net.oschina.j2cache.cache.support.redis.SpringRedisProvider j2cache.L2.config_section = redis redis.hosts = localhost:6379 j2cache.broadcast = net.oschina.j2cache.cache.support.redis.SpringRedisPubSubPolicy
此处配置需要参照官方给出的配置说明进行。例如1级供应商选择ehcache,供应商名称仅仅是一个ehcache,但是2级供应商选择redis时要写专用的Spring整合Redis的供应商类名SpringRedisProvider,而且这个名称并不是所有的redis包中能提供的,也不是spring包中提供的。因此配置j2cache必须参照官方文档配置,而且还要去找专用的整合包,导入对应坐标才可以使用。
一级与二级缓存最重要的一个配置就是两者之间的数据沟通方式,此类配置也不是随意配置的,并且不同的缓存解决方案提供的数据沟通方式差异化很大,需要查询官方文档进行设置。
步骤③ :使用缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Service public class SMSCodeServiceImpl implements SMSCodeService { @Autowired private CodeUtils codeUtils; @Autowired private CacheChannel cacheChannel; public String sendCodeToSMS (String tele) { String code = codeUtils.generator(tele); cacheChannel.set("sms" ,tele,code); return code; } public boolean checkCode (SMSCode smsCode) { String code = cacheChannel.get("sms" ,smsCode.getTele()).asString(); return smsCode.getCode().equals(code); } }
j2cache的使用和jetcache比较类似,但是无需开启使用的开关,直接定义缓存对象即可使用,缓存对象名CacheChannel。
j2cache的使用不复杂,配置是j2cache的核心,毕竟是一个整合型的缓存框架。缓存相关的配置过多,可以查阅j2cache-core核心包中的j2cache.properties文件中的说明。如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 j2cache.broadcast = redis jgroups.channel.name = j2cache jgroups.configXml = /network.xml rabbitmq.exchange = j2cache rabbitmq.host = localhost rabbitmq.port = 5672 rabbitmq.username = guest rabbitmq.password = guest rocketmq.name = j2cache rocketmq.topic = j2cache rocketmq.hosts = 127.0.0.1:9876 j2cache.L1.provider_class = caffeine j2cache.L2.provider_class = redis j2cache.sync_ttl_to_redis = true j2cache.default_cache_null_object = true j2cache.serialization = json caffeine.properties = /caffeine.properties redis.mode = single redis.storage = generic redis.channel = j2cache redis.channel.host =redis.cluster_name = j2cache redis.namespace =redis.hosts = 127.0.0.1:6379 redis.timeout = 2000 redis.password =redis.database = 0 redis.ssl = false redis.maxTotal = 100 redis.maxIdle = 10 redis.maxWaitMillis = 5000 redis.minEvictableIdleTimeMillis = 60000 redis.minIdle = 1 redis.numTestsPerEvictionRun = 10 redis.lifo = false redis.softMinEvictableIdleTimeMillis = 10 redis.testOnBorrow = true redis.testOnReturn = false redis.testWhileIdle = true redis.timeBetweenEvictionRunsMillis = 300000 redis.blockWhenExhausted = false redis.jmxEnabled = false lettuce.mode = single lettuce.namespace =lettuce.storage = hash lettuce.channel = j2cache lettuce.scheme = redis lettuce.hosts = 127.0.0.1:6379 lettuce.password =lettuce.database = 0 lettuce.sentinelMasterId =lettuce.maxTotal = 100 lettuce.maxIdle = 10 lettuce.minIdle = 10 lettuce.timeout = 10000 lettuce.clusterTopologyRefresh = 3000 memcached.servers = 127.0.0.1:11211 memcached.username =memcached.password =memcached.connectionPoolSize = 10 memcached.connectTimeout = 1000 memcached.failureMode = false memcached.healSessionInterval = 1000 memcached.maxQueuedNoReplyOperations = 100 memcached.opTimeout = 100 memcached.sanitizeKeys = false
总结
j2cache是一个缓存框架,自身不具有缓存功能,它提供多种缓存整合在一起使用的方案
j2cache需要通过复杂的配置设置各级缓存,以及缓存之间数据交换的方式
j2cache操作接口通过CacheChannel实现
任务 springboot整合第三方技术第二部分我们来说说任务系统,其实这里说的任务系统指的是定时任务。定时任务是企业级开发中必不可少的组成部分,诸如长周期业务数据的计算,例如年度报表,诸如系统脏数据的处理,再比如系统性能监控报告,还有抢购类活动的商品上架,这些都离不开定时任务。本节将介绍两种不同的定时任务技术。
Quartz Quartz技术是一个比较成熟的定时任务框架,不过比较繁琐,配置略微复杂。springboot对其进行整合后,简化了一系列的配置,将很多配置采用默认设置,这样开发阶段就简化了很多。在学习springboot整合Quartz前先普及几个Quartz的概念。
工作(Job):用于定义具体执行的工作。
工作明细(JobDetail):用于描述定时工作相关的信息。
触发器(Trigger):描述了工作明细与调度器的对应关系。
调度器(Scheduler):用于描述触发工作的执行规则,通常使用cron表达式定义规则。
简单说就是你定时干什么事情,这就是工作,工作不可能就是一个简单的方法,还要设置一些明细信息。工作啥时候执行,设置一个调度器,可以简单理解成设置一个工作执行的时间。工作和调度都是独立定义的,它们两个怎么配合到一起呢?使用触发器。
步骤① :导入springboot整合Quartz的starter:
1 2 3 4 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-quartz</artifactId > </dependency >
步骤② :定义任务Bean,按照Quartz的开发规范制作,继承QuartzJobBean:
1 2 3 4 5 6 7 8 public class MyQuartz extends QuartzJobBean { @Override protected void executeInternal (JobExecutionContext context) throws JobExecutionException { System.out.println("quartz task run..." ); } }
步骤③ :创建Quartz配置类,定义工作明细(JobDetail)与触发器的(Trigger)bean:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Configuration public class QuartzConfig { @Bean public JobDetail printJobDetail () { return JobBuilder.newJob(MyQuartz.class).storeDurably().build(); } @Bean public Trigger printJobTrigger () { ScheduleBuilder schedBuilder = CronScheduleBuilder.cronSchedule("0/5 * * * * ?" ); return TriggerBuilder.newTrigger().forJob(printJobDetail()).withSchedule(schedBuilder).build(); } }
工作明细中要设置对应的具体工作,使用newJob()操作传入对应的工作任务类型即可。
触发器需要绑定任务,使用forJob()操作传入绑定的工作明细对象。此处可以为工作明细设置名称然后使用名称绑定,也可以直接调用对应方法绑定。触发器中最核心的规则是执行时间,此处使用调度器定义执行时间,执行时间描述方式使用的是cron表达式。有关cron表达式的规则,各位小伙伴可以去参看相关课程学习,略微复杂,而且格式不能乱设置,不是写个格式就能用的,写不好就会出现冲突问题。
总结
springboot整合Quartz就是将Quartz对应的核心对象交给spring容器管理,包含两个对象,JobDetail和Trigger对象
JobDetail对象描述的是工作的执行信息,需要绑定一个QuartzJobBean类型的对象
Trigger对象定义了一个触发器,需要为其指定绑定的JobDetail是哪个,同时要设置执行周期调度器
Task Spring根据定时任务的特征,将定时任务的开发简化到了极致。要做定时任务总要告诉容器有这功能吧,然后定时执行什么任务直接告诉对应的bean什么时间执行就行了,就这么简单,一起来看怎么做
步骤① :开启定时任务功能,在引导类上开启定时任务功能的开关,使用注解@EnableScheduling
1 2 3 4 5 6 7 8 @SpringBootApplication @EnableScheduling public class Springboot22TaskApplication { public static void main (String[] args) { SpringApplication.run(Springboot22TaskApplication.class, args); } }
步骤② :定义Bean,在对应要定时执行的操作上方,使用注解@Scheduled定义执行的时间,执行时间的描述方式还是cron表达式
1 2 3 4 5 6 7 8 9 @Component public class MyBean { @Scheduled(cron = "0/1 * * * * ?") public void print () { System.out.println(Thread.currentThread().getName()+" :spring task run..." ); } }
这样就完成了定时任务的配置。总体感觉其实什么东西都没少,只不过没有将所有的信息都抽取成bean,而是直接使用注解绑定定时执行任务的事情而已。
如何想对定时任务进行相关配置,可以通过配置文件进行:
1 2 3 4 5 6 7 8 9 spring: task: scheduling: pool: size: 1 thread-name-prefix: ssm_ shutdown: await-termination: false await-termination-period: 10s
总结
spring task需要使用注解@EnableScheduling开启定时任务功能
为定时执行的的任务设置执行周期,描述方式cron表达式
邮件 发邮件是java程序的基本操作,springboot整合javamail其实就是简化开发。简化了发送邮件的客户端对象JavaMailSender的初始化过程,通过配置的形式加载信息简化开发过程。
学习邮件发送之前先了解3个概念,这些概念规范了邮件操作过程中的标准:
SMTP(Simple Mail Transfer Protocol):简单邮件传输协议,用于发送 电子邮件的传输协议
POP3(Post Office Protocol - Version 3):用于接收 电子邮件的标准协议
IMAP(Internet Mail Access Protocol):互联网消息协议,是POP3的替代协议
简单说就是SMPT是发邮件的标准,POP3是收邮件的标准,IMAP是对POP3的升级。我们制作程序中操作邮件,通常是发邮件,所以SMTP是使用的重点,收邮件大部分都是通过邮件客户端完成,所以开发收邮件的代码极少。除非你要读取邮件内容,然后解析,做邮件功能的统一处理。例如HR的邮箱收到求职者的简历,可以读取后统一处理。但是为什么不制作独立的投递简历的系统呢?所以说,好奇怪的需求,因为要想收邮件就要规范发邮件的人的书写格式,这个未免有点强人所难,并且极易收到外部攻击,你不可能使用白名单来收邮件。如果能使用白名单来收邮件然后解析邮件,还不如开发个系统给白名单中的人专用呢,更安全,总之就是鸡肋了。
发送简单邮件 步骤① :导入springboot整合javamail的starter
1 2 3 4 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-mail</artifactId > </dependency >
步骤② :配置邮箱的登录信息
1 2 3 4 5 spring: mail: host: smtp.126.com username: test@126.com password: test
java程序仅用于发送邮件,邮件的功能还是邮件供应商提供的,所以这里是用别人的邮件服务,要配置对应信息。
host配置的是提供邮件服务的主机协议,当前程序仅用于发送邮件,因此配置的是smtp的协议。
password并不是邮箱账号的登录密码,是邮件供应商提供的一个加密后的密码,也是为了保障系统安全性。不然外部人员通过地址访问下载了配置文件,直接获取到了邮件密码就会有极大的安全隐患。有关该密码的获取每个邮件供应商提供的方式都不一样,此处略过。可以到邮件供应商的设置页面找POP3或IMAP这些关键词找到对应的获取位置。下例仅供参考:
步骤③ :使用JavaMailSender接口发送邮件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Service public class SendMailServiceImpl implements SendMailService { @Autowired private JavaMailSender javaMailSender; private String from = "test@qq.com" ; private String to = "test@126.com" ; private String subject = "测试邮件" ; private String context = "测试邮件正文内容" ; @Override public void sendMail () { SimpleMailMessage message = new SimpleMailMessage(); message.setFrom(from+"(小甜甜)" ); message.setTo(to); message.setSubject(subject); message.setText(context); javaMailSender.send(message); } }
将发送邮件的必要信息(发件人、收件人、标题、正文)封装到SimpleMailMessage对象中,可以根据规则设置发送人昵称等。
发送多组件邮件(附件、复杂正文) 发送简单邮件仅需要提供对应的4个基本信息就可以了,如果想发送复杂的邮件,需要更换邮件对象。使用MimeMessage可以发送特殊的邮件。
发送网页正文邮件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Service public class SendMailServiceImpl2 implements SendMailService { @Autowired private JavaMailSender javaMailSender; private String from = "test@qq.com" ; private String to = "test@126.com" ; private String subject = "测试邮件" ; private String context = "<img src='ABC.JPG'/><a href='https://www.gaojie.cc'>点开有惊喜</a>" ; public void sendMail () { try { MimeMessage message = javaMailSender.createMimeMessage(); MimeMessageHelper helper = new MimeMessageHelper(message); helper.setFrom(to+"(小甜甜)" ); helper.setTo(from); helper.setSubject(subject); helper.setText(context,true ); javaMailSender.send(message); } catch (Exception e) { e.printStackTrace(); } } }
发送带有附件的邮件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Service public class SendMailServiceImpl2 implements SendMailService { @Autowired private JavaMailSender javaMailSender; private String from = "test@qq.com" ; private String to = "test@126.com" ; private String subject = "测试邮件" ; private String context = "测试邮件正文" ; public void sendMail () { try { MimeMessage message = javaMailSender.createMimeMessage(); MimeMessageHelper helper = new MimeMessageHelper(message,true ); helper.setFrom(to+"(小甜甜)" ); helper.setTo(from); helper.setSubject(subject); helper.setText(context); File f1 = new File("springboot_23_mail-0.0.1-SNAPSHOT.jar" ); File f2 = new File("resources\\logo.png" ); helper.addAttachment(f1.getName(),f1); helper.addAttachment("girls.png" ,f2); javaMailSender.send(message); } catch (Exception e) { e.printStackTrace(); } } }
消息 消息的概念 从广义角度来说,消息其实就是信息,但是和信息又有所不同。信息通常被定义为一组数据,而消息除了具有数据的特征之外,还有消息的来源与接收的概念。通常发送消息的一方称为消息的生产者,接收消息的一方称为消息的消费者。 这样比较后,发现其实消息和信息差别还是很大的。
为什么要设置生产者和消费者呢?这就是要说到消息的意义了。信息通常就是一组数据,但是消息由于有了生产者和消费者,就出现了消息中所包含的信息可以被二次解读,生产者发送消息,可以理解为生产者发送了一个信息,也可以理解为生产者发送了一个命令;消费者接收消息,可以理解为消费者得到了一个信息,也可以理解为消费者得到了一个命令。对比一下我们会发现信息是一个基本数据,而命令则可以关联下一个行为动作,这样就可以理解为基于接收的消息相当于得到了一个行为动作,使用这些行为动作就可以组织成一个业务逻辑,进行进一步的操作。总的来说,消息其实也是一组信息,只是为其赋予了全新的含义,因为有了消息的流动,并且是有方向性的流动,带来了基于流动的行为产生的全新解读。开发者就可以基于消息的这种特殊解,将其换成代码中的指令。
对于消息的理解,初学者总认为消息内部的数据非常复杂,这是一个误区。比如我发送了一个消息,要求接受者翻译发送过去的内容。初学者会认为消息中会包含被翻译的文字,已经本次操作要执行翻译操作而不是打印操作。其实这种现象有点过度解读了,发送的消息中仅仅包含被翻译的文字,但是可以通过控制不同的人接收此消息来确认要做的事情。例如发送被翻译的文字仅到A程序,而A程序只能进行翻译操作,这样就可以发送简单的信息完成复杂的业务了,是通过接收消息的主体不同,进而执行不同的操作,而不会在消息内部定义数据的操作行为,当然如果开发者希望消息中包含操作种类信息也是可以的,只是提出消息的内容可以更简单,更单一。
对于消息的生产者与消费者的工作模式,还可以将消息划分成两种模式,同步消费与异步消息。
所谓同步消息就是生产者发送完消息,等待消费者处理,消费者处理完将结果告知生产者,然后生产者继续向下执行业务。这种模式过于卡生产者的业务执行连续性,在现在的企业级开发中,上述这种业务场景通常不会采用消息的形式进行处理。
所谓异步消息就是生产者发送完消息,无需等待消费者处理完毕,生产者继续向下执行其他动作。比如生产者发送了一个日志信息给日志系统,发送过去以后生产者就向下做其他事情了,无需关注日志系统的执行结果。日志系统根据接收到的日志信息继续进行业务执行,是单纯的记录日志,还是记录日志并报警,这些和生产者无关,这样生产者的业务执行效率就会大幅度提升。并且可以通过添加多个消费者来处理同一个生产者发送的消息来提高系统的高并发性,改善系统工作效率,提高用户体验。一旦某一个消费者由于各种问题宕机了,也不会对业务产生影响,提高了系统的高可用性。
Java处理消息的标准规范 目前企业级开发中广泛使用的消息处理技术共三大类,具体如下:
为什么是三大类,而不是三个技术呢?因为这些都是规范,就像JDBC技术,是个规范,开发针对规范开发,运行还要靠实现类,例如MySQL提供了JDBC的实现,最终运行靠的还是实现。并且这三类规范都是针对异步消息进行处理的,也符合消息的设计本质,处理异步的业务。对以上三种消息规范做一下普及。
JMS JMS(Java Message Service),这是一个规范,作用等同于JDBC规范,提供了与消息服务相关的API接口。
JMS消息模型 JMS规范中规范了消息有两种模型。分别是点对点模型 和发布订阅模型 。
点对点模型 :peer-2-peer,生产者会将消息发送到一个保存消息的容器中,通常使用队列模型,使用队列保存消息。一个队列的消息只能被一个消费者消费,或未被及时消费导致超时。这种模型下,生产者和消费者是一对一绑定的。
发布订阅模型 :publish-subscribe,生产者将消息发送到一个保存消息的容器中,也是使用队列模型来保存。但是消息可以被多个消费者消费,生产者和消费者完全独立,相互不需要感知对方的存在。
以上这种分类是从消息的生产和消费过程来进行区分,针对消息所包含的信息不同,还可以进行不同类别的划分。
JMS消息种类 根据消息中包含的数据种类划分,可以将消息划分成6种消息。
TextMessage
MapMessage
BytesMessage
StreamMessage
ObjectMessage
Message (只有消息头和属性)
JMS主张不同种类的消息,消费方式不同,可以根据使用需要选择不同种类的消息。但是这一点也成为其诟病之处,后面再说。整体上来说,JMS就是典型的保守派,什么都按照J2EE的规范来,做一套规范,定义若干个标准,每个标准下又提供一大批API。目前对JMS规范实现的消息中间件技术还是挺多的,毕竟是皇家御用,肯定有人舔,例如ActiveMQ、Redis、HornetMQ。但是也有一些不太规范的实现,参考JMS的标准设计,但是又不完全满足其规范,例如:RabbitMQ、RocketMQ。
AMQP JMS的问世为消息中间件提供了很强大的规范性支撑,但是使用的过程中就开始被人诟病,比如JMS设置的极其复杂的多种类消息处理机制。本来分门别类处理挺好的,为什么会被诟病呢?原因就在于JMS的设计是J2EE规范,站在Java开发的角度思考问题。但是现实往往是复杂度很高的。比如我有一个.NET开发的系统A,有一个Java开发的系统B,现在要从A系统给B系统发业务消息,结果两边数据格式不统一,没法操作。JMS不是可以统一数据格式吗?提供了6种数据种类,总有一款适合你啊。NO,一个都不能用。因为A系统的底层语言不是Java语言开发的,根本不支持那些对象。这就意味着如果想使用现有的业务系统A继续开发已经不可能了,必须推翻重新做使用Java语言开发的A系统。
这时候有人就提出说,你搞那么复杂,整那么多种类干什么?找一种大家都支持的消息数据类型不就解决这个跨平台的问题了吗?大家一想,对啊,于是AMQP孕育而生。
单从上面的说明中其实可以明确感知到,AMQP的出现解决的是消息传递时使用的消息种类的问题,化繁为简,但是其并没有完全推翻JMS的操作API,所以说AMQP仅仅是一种协议,规范了数据传输的格式而已。
AMQP(advanced message queuing protocol):一种协议(高级消息队列协议,也是消息代理规范),规范了网络交换的数据格式,兼容JMS操作。
优点 具有跨平台性,服务器供应商,生产者,消费者可以使用不同的语言来实现
JMS消息种类 AMQP消息种类:byte[]
AMQP在JMS的消息模型基础上又进行了进一步的扩展,除了点对点和发布订阅的模型,开发了几种全新的消息模型,适应各种各样的消息发送。
AMQP消息模型
direct exchange
fanout exchange
topic exchange
headers exchange
system exchange
目前实现了AMQP协议的消息中间件技术也很多,而且都是较为流行的技术,例如:RabbitMQ、StormMQ、RocketMQ
MQTT MQTT(Message Queueing Telemetry Transport)消息队列遥测传输,专为小设备设计,是物联网(IOT)生态系统中主要成分之一。由于与JavaEE企业级开发没有交集,此处不作过多的说明。
除了上述3种J2EE企业级应用中广泛使用的三种异步消息传递技术,还有一种技术也不能忽略,Kafka。
KafKa Kafka,一种高吞吐量的分布式发布订阅消息系统,提供实时消息功能。Kafka技术并不是作为消息中间件为主要功能的产品,但是其拥有发布订阅的工作模式,也可以充当消息中间件来使用,而且目前企业级开发中其身影也不少见。
本节内容讲围绕着上述内容中的几种实现方案讲解springboot整合各种各样的消息中间件。由于各种消息中间件必须先安装再使用,下面的内容采用Windows系统安装,降低各位学习者的学习难度,基本套路和之前学习NoSQL解决方案一样,先安装再整合。
购物订单发送手机短信案例 为了便于下面演示各种各样的消息中间件技术,我们创建一个购物过程生成订单时为用户发送短信的案例环境,模拟使用消息中间件实现发送手机短信的过程。
手机验证码案例需求如下:
执行下单业务时(模拟此过程),调用消息服务,将要发送短信的订单id传递给消息中间件
消息处理服务接收到要发送的订单id后输出订单id(模拟发短信)
由于不涉及数据读写,仅开发业务层与表现层,其中短信处理的业务代码独立开发,代码如下:
订单业务
业务层接口
1 2 3 public interface OrderService { void order (String id) ; }
模拟传入订单id,执行下订单业务,参数为虚拟设定,实际应为订单对应的实体类
业务层实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Service public class OrderServiceImpl implements OrderService { @Autowired private MessageService messageService; @Override public void order (String id) { System.out.println("订单处理开始" ); messageService.sendMessage(id); System.out.println("订单处理结束" ); System.out.println(); } }
业务层转调短信处理的服务MessageService
表现层服务
1 2 3 4 5 6 7 8 9 10 11 12 @RestController @RequestMapping("/orders") public class OrderController { @Autowired private OrderService orderService; @PostMapping("{id}") public void order (@PathVariable String id) { orderService.order(id); } }
表现层对外开发接口,传入订单id即可(模拟)
短信处理业务
业务层接口
1 2 3 4 public interface MessageService { void sendMessage (String id) ; String doMessage () ; }
短信处理业务层接口提供两个操作,发送要处理的订单id到消息中间件,另一个操作目前暂且设计成处理消息,实际消息的处理过程不应该是手动执行,应该是自动执行,到具体实现时再进行设计
业务层实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Service public class MessageServiceImpl implements MessageService { private ArrayList<String> msgList = new ArrayList<String>(); @Override public void sendMessage (String id) { System.out.println("待发送短信的订单已纳入处理队列,id:" +id); msgList.add(id); } @Override public String doMessage () { String id = msgList.remove(0 ); System.out.println("已完成短信发送业务,id:" +id); return id; } }
短信处理业务层实现中使用集合先模拟消息队列,观察效果
表现层服务
1 2 3 4 5 6 7 8 9 10 11 12 13 @RestController @RequestMapping("/msgs") public class MessageController { @Autowired private MessageService messageService; @GetMapping public String doMessage () { String id = messageService.doMessage(); return id; } }
短信处理表现层接口暂且开发出一个处理消息的入口,但是此业务是对应业务层中设计的模拟接口,实际业务不需要设计此接口。
下面开启springboot整合各种各样的消息中间件,从严格满足JMS规范的ActiveMQ开始
SpringBoot整合ActiveMQ ActiveMQ是MQ产品中的元老级产品,早期标准MQ产品之一,在AMQP协议没有出现之前,占据了消息中间件市场的绝大部分份额,后期因为AMQP系列产品的出现,迅速走弱,目前仅在一些线上运行的产品中出现,新产品开发较少采用。
安装 windows版安装包下载地址:https://activemq.apache.org/components/classic/download /
下载的安装包是解压缩就能使用的zip文件,解压缩完毕后会得到如下文件
启动服务器
运行bin目录下的win32或win64目录下的activemq.bat命令即可,根据自己的操作系统选择即可,默认对外服务端口61616。
访问web管理服务
ActiveMQ启动后会启动一个Web控制台服务,可以通过该服务管理ActiveMQ。
web管理服务默认端口8161,访问后可以打开ActiveMQ的管理界面,如下:
首先输入访问用户名和密码,初始化用户名和密码相同,均为:admin,成功登录后进入管理后台界面,如下:
看到上述界面视为启动ActiveMQ服务成功。
启动失败
在ActiveMQ启动时要占用多个端口,以下为正常启动信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 wrapper | --> Wrapper Started as Console wrapper | Launching a JVM... jvm 1 | Wrapper (Version 3 .2 .3 ) http://wrapper.tanukisoftware.org jvm 1 | Copyright 1999 -2006 Tanuki Software, Inc. All Rights Reserved. jvm 1 | jvm 1 | Java Runtime: Oracle Corporation 1 .8 .0 _172 D:\soft\jdk1.8 .0 _172\jre jvm 1 | Heap sizes: current=249344 k free=235037 k max=932352 k jvm 1 | JVM args: -Dactivemq.home=../.. -Dactivemq.base=../.. -Djavax.net .ssl.keyStorePassword=password -Djavax.net .ssl.trustStorePassword=password -Djavax.net .ssl.keyStore=../../conf/broker.ks -Djavax.net .ssl.trustStore=../../conf/broker.ts -Dcom.sun.management.jmxremote -Dorg.apache.activemq.UseDedicatedTaskRunner=true -Djava.util.logging.config.file=logging.properties -Dactivemq.conf=../../conf -Dactivemq.data=../../data -Djava.security.auth.login.config=../../conf/login.config -Xmx1024m -Djava.library.path =../../bin/win64 -Dwrapper.key=7 ySrCD75XhLCpLjd -Dwrapper.port=32000 -Dwrapper.jvm.port.min=31000 -Dwrapper.jvm.port.max=31999 -Dwrapper.pid=9364 -Dwrapper.version=3 .2 .3 -Dwrapper.native_library=wrapper -Dwrapper.cpu.timeout=10 -Dwrapper.jvmid=1 jvm 1 | Extensions classpath: jvm 1 | [..\..\lib,..\..\lib\camel,..\..\lib\optional,..\..\lib\web,..\..\lib\extra] jvm 1 | ACTIVEMQ_HOME: ..\.. jvm 1 | ACTIVEMQ_BASE: ..\.. jvm 1 | ACTIVEMQ_CONF: ..\..\conf jvm 1 | ACTIVEMQ_DATA: ..\..\data jvm 1 | Loading message broker from: xbean:activemq.xml jvm 1 | INFO | Refreshing org.apache.activemq.xbean.XBeanBrokerFactory$1 @5 f3ebfe0: startup date [Mon Feb 28 16 :07 :48 CST 2022 ]; root of context hierarchy jvm 1 | INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[D:\soft\activemq\bin\win64\..\..\data\kahadb] jvm 1 | INFO | KahaDB is version 7 jvm 1 | INFO | PListStore:[D:\soft\activemq\bin\win64\..\..\data\localhost\tmp_storage] started jvm 1 | INFO | Apache ActiveMQ 5 .16 .3 (localhost, ID:CZBK-20210302 VL-10434 -1646035669595 -0 :1 ) is starting jvm 1 | INFO | Listening for connections at : tcp://CZBK-20210302 VL:61616 ?maximumConnections=1000 &wireFormat.maxFrameSize=104857600 jvm 1 | INFO | Connector openwire started jvm 1 | INFO | Listening for connections at : amqp://CZBK-20210302 VL:5672 ?maximumConnections=1000 &wireFormat.maxFrameSize=104857600 jvm 1 | INFO | Connector amqp started jvm 1 | INFO | Listening for connections at : stomp://CZBK-20210302 VL:61613 ?maximumConnections=1000 &wireFormat.maxFrameSize=104857600 jvm 1 | INFO | Connector stomp started jvm 1 | INFO | Listening for connections at : mqtt://CZBK-20210302 VL:1883 ?maximumConnections=1000 &wireFormat.maxFrameSize=104857600 jvm 1 | INFO | Connector mqtt started jvm 1 | INFO | Starting Jetty server jvm 1 | INFO | Creating Jetty connector jvm 1 | WARN | ServletContext@o.e.j.s.ServletContextHandler@7350746 f{/,null,STARTING} has uncovered http methods for path : / jvm 1 | INFO | Listening for connections at ws://CZBK-20210302 VL:61614 ?maximumConnections=1000 &wireFormat.maxFrameSize=104857600 jvm 1 | INFO | Connector ws started jvm 1 | INFO | Apache ActiveMQ 5 .16 .3 (localhost, ID:CZBK-20210302 VL-10434 -1646035669595 -0 :1 ) started jvm 1 | INFO | For help or more information please see: http://activemq.apache.org jvm 1 | WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: D:\soft\activemq\bin\win64\..\..\data\kahadb only has 68936 mb of usable space. - resetting to maximum available disk space: 68936 mb jvm 1 | INFO | ActiveMQ WebConsole available at http://127 .0 .0 .1 :8161 / jvm 1 | INFO | ActiveMQ Jolokia REST API available at http://127 .0 .0 .1 :8161 /api/jolokia/
其中占用的端口有:61616、5672、61613、1883、61614,如果启动失败,请先管理对应端口即可。以下就是某个端口占用的报错信息,可以从抛出异常的位置看出,启动5672端口时端口被占用,显示java.net.BindException: Address already in use: JVM_Bind。Windows系统中终止端口运行的操作参看【命令行启动常见问题及解决方案】
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 wrapper | --> Wrapper Started as Console wrapper | Launching a JVM... jvm 1 | Wrapper (Version 3 .2 .3 ) http://wrapper.tanukisoftware.org jvm 1 | Copyright 1999 -2006 Tanuki Software, Inc. All Rights Reserved. jvm 1 | jvm 1 | Java Runtime: Oracle Corporation 1 .8 .0 _172 D:\soft\jdk1.8 .0 _172\jre jvm 1 | Heap sizes: current=249344 k free=235038 k max=932352 k jvm 1 | JVM args: -Dactivemq.home=../.. -Dactivemq.base=../.. -Djavax.net .ssl.keyStorePassword=password -Djavax.net .ssl.trustStorePassword=password -Djavax.net .ssl.keyStore=../../conf/broker.ks -Djavax.net .ssl.trustStore=../../conf/broker.ts -Dcom.sun.management.jmxremote -Dorg.apache.activemq.UseDedicatedTaskRunner=true -Djava.util.logging.config.file=logging.properties -Dactivemq.conf=../../conf -Dactivemq.data=../../data -Djava.security.auth.login.config=../../conf/login.config -Xmx1024m -Djava.library.path =../../bin/win64 -Dwrapper.key=QPJoy9ZoXeWmmwTS -Dwrapper.port=32000 -Dwrapper.jvm.port.min=31000 -Dwrapper.jvm.port.max=31999 -Dwrapper.pid=14836 -Dwrapper.version=3 .2 .3 -Dwrapper.native_library=wrapper -Dwrapper.cpu.timeout=10 -Dwrapper.jvmid=1 jvm 1 | Extensions classpath: jvm 1 | [..\..\lib,..\..\lib\camel,..\..\lib\optional,..\..\lib\web,..\..\lib\extra] jvm 1 | ACTIVEMQ_HOME: ..\.. jvm 1 | ACTIVEMQ_BASE: ..\.. jvm 1 | ACTIVEMQ_CONF: ..\..\conf jvm 1 | ACTIVEMQ_DATA: ..\..\data jvm 1 | Loading message broker from: xbean:activemq.xml jvm 1 | INFO | Refreshing org.apache.activemq.xbean.XBeanBrokerFactory$1 @2 c9392f5: startup date [Mon Feb 28 16 :06 :16 CST 2022 ]; root of context hierarchy jvm 1 | INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[D:\soft\activemq\bin\win64\..\..\data\kahadb] jvm 1 | INFO | KahaDB is version 7 jvm 1 | INFO | PListStore:[D:\soft\activemq\bin\win64\..\..\data\localhost\tmp_storage] started jvm 1 | INFO | Apache ActiveMQ 5 .16 .3 (localhost, ID:CZBK-20210302 VL-10257 -1646035577620 -0 :1 ) is starting jvm 1 | INFO | Listening for connections at : tcp://CZBK-20210302 VL:61616 ?maximumConnections=1000 &wireFormat.maxFrameSize=104857600 jvm 1 | INFO | Connector openwire started jvm 1 | ERROR | Failed to start Apache ActiveMQ (localhost, ID:CZBK-20210302 VL-10257 -1646035577620 -0 :1 ) jvm 1 | java.io.IOException: Transport Connector could not be registered in JMX: java.io.IOException: Failed to bind to server socket: amqp://0 .0 .0 .0 :5672 ?maximumConnections=1000 &wireFormat.maxFrameSize=104857600 due to: java.net .BindException: Address already in use: JVM_Bind jvm 1 | at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:28 ) jvm 1 | at org.apache.activemq.broker.BrokerService.registerConnectorMBean(BrokerService.java:2288 ) jvm 1 | at org.apache.activemq.broker.BrokerService.startTransportConnector(BrokerService.java:2769 ) jvm 1 | at org.apache.activemq.broker.BrokerService.startAllConnectors(BrokerService.java:2665 ) jvm 1 | at org.apache.activemq.broker.BrokerService.doStartBroker(BrokerService.java:780 ) jvm 1 | at org.apache.activemq.broker.BrokerService.startBroker(BrokerService.java:742 ) jvm 1 | at org.apache.activemq.broker.BrokerService.start (BrokerService.java:645 ) jvm 1 | at org.apache.activemq.xbean.XBeanBrokerService.afterPropertiesSet(XBeanBrokerService.java:73 ) jvm 1 | at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) jvm 1 | at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62 ) jvm 1 | at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43 ) jvm 1 | at java.lang.reflect.Method.invoke(Method.java:498 ) jvm 1 | at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeCustomInitMethod(AbstractAutowireCapableBeanFactory.java:1748 ) jvm 1 | at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1685 ) jvm 1 | at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1615 ) jvm 1 | at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:553 ) jvm 1 | at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:481 ) jvm 1 | at org.springframework.beans.factory.support.AbstractBeanFactory$1 .getObject(AbstractBeanFactory.java:312 ) jvm 1 | at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230 ) jvm 1 | at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:308 ) jvm 1 | at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197 ) jvm 1 | at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:756 ) jvm 1 | at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867 ) jvm 1 | at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:542 ) jvm 1 | at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:64 ) jvm 1 | at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:52 ) jvm 1 | at org.apache.activemq.xbean.XBeanBrokerFactory$1 .<init>(XBeanBrokerFactory.java:104 ) jvm 1 | at org.apache.activemq.xbean.XBeanBrokerFactory.createApplicationContext(XBeanBrokerFactory.java:104 ) jvm 1 | at org.apache.activemq.xbean.XBeanBrokerFactory.createBroker(XBeanBrokerFactory.java:67 ) jvm 1 | at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:71 ) jvm 1 | at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:54 ) jvm 1 | at org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:87 ) jvm 1 | at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:63 ) jvm 1 | at org.apache.activemq.console.command.ShellCommand.runTask(ShellCommand.java:154 ) jvm 1 | at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:63 ) jvm 1 | at org.apache.activemq.console.command.ShellCommand.main(ShellCommand.java:104 ) jvm 1 | at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) jvm 1 | at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62 ) jvm 1 | at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43 ) jvm 1 | at java.lang.reflect.Method.invoke(Method.java:498 ) jvm 1 | at org.apache.activemq.console.Main.runTaskClass(Main.java:262 ) jvm 1 | at org.apache.activemq.console.Main.main(Main.java:115 ) jvm 1 | at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) jvm 1 | at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62 ) jvm 1 | at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43 ) jvm 1 | at java.lang.reflect.Method.invoke(Method.java:498 ) jvm 1 | at org.tanukisoftware.wrapper.WrapperSimpleApp.run(WrapperSimpleApp.java:240 ) jvm 1 | at java.lang.Thread.run(Thread.java:748 ) jvm 1 | Caused by: java.io.IOException: Failed to bind to server socket: amqp://0 .0 .0 .0 :5672 ?maximumConnections=1000 &wireFormat.maxFrameSize=104857600 due to: java.net .BindException: Address already in use: JVM_Bind jvm 1 | at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:34 ) jvm 1 | at org.apache.activemq.transport.tcp.TcpTransportServer.bind(TcpTransportServer.java:146 ) jvm 1 | at org.apache.activemq.transport.tcp.TcpTransportFactory.doBind(TcpTransportFactory.java:62 ) jvm 1 | at org.apache.activemq.transport.TransportFactorySupport.bind(TransportFactorySupport.java:40 ) jvm 1 | at org.apache.activemq.broker.TransportConnector.createTransportServer(TransportConnector.java:335 ) jvm 1 | at org.apache.activemq.broker.TransportConnector.getServer(TransportConnector.java:145 ) jvm 1 | at org.apache.activemq.broker.TransportConnector.asManagedConnector(TransportConnector.java:110 ) jvm 1 | at org.apache.activemq.broker.BrokerService.registerConnectorMBean(BrokerService.java:2283 ) jvm 1 | ... 46 more jvm 1 | Caused by: java.net .BindException: Address already in use: JVM_Bind jvm 1 | at java.net .DualStackPlainSocketImpl.bind0(Native Method) jvm 1 | at java.net .DualStackPlainSocketImpl.socketBind(DualStackPlainSocketImpl.java:106 ) jvm 1 | at java.net .AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387 ) jvm 1 | at java.net .PlainSocketImpl.bind(PlainSocketImpl.java:190 ) jvm 1 | at java.net .ServerSocket.bind(ServerSocket.java:375 ) jvm 1 | at java.net .ServerSocket.<init>(ServerSocket.java:237 ) jvm 1 | at javax.net .DefaultServerSocketFactory.createServerSocket(ServerSocketFactory.java:231 ) jvm 1 | at org.apache.activemq.transport.tcp.TcpTransportServer.bind(TcpTransportServer.java:143 ) jvm 1 | ... 52 more jvm 1 | INFO | Apache ActiveMQ 5 .16 .3 (localhost, ID:CZBK-20210302 VL-10257 -1646035577620 -0 :1 ) is shutting down jvm 1 | INFO | socketQueue interrupted - stopping jvm 1 | INFO | Connector openwire stopped jvm 1 | INFO | Could not accept connection during shutdown : null (null) jvm 1 | INFO | Connector amqp stopped jvm 1 | INFO | Connector stomp stopped jvm 1 | INFO | Connector mqtt stopped jvm 1 | INFO | Connector ws stopped jvm 1 | INFO | PListStore:[D:\soft\activemq\bin\win64\..\..\data\localhost\tmp_storage] stopped jvm 1 | INFO | Stopping async queue tasks jvm 1 | INFO | Stopping async topic tasks jvm 1 | INFO | Stopped KahaDB jvm 1 | INFO | Apache ActiveMQ 5 .16 .3 (localhost, ID:CZBK-20210302 VL-10257 -1646035577620 -0 :1 ) uptime 0 .426 seconds jvm 1 | INFO | Apache ActiveMQ 5 .16 .3 (localhost, ID:CZBK-20210302 VL-10257 -1646035577620 -0 :1 ) is shutdown jvm 1 | INFO | Closing org.apache.activemq.xbean.XBeanBrokerFactory$1 @2 c9392f5: startup date [Mon Feb 28 16 :06 :16 CST 2022 ]; root of context hierarchy jvm 1 | WARN | Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.apache.activemq.xbean.XBeanBrokerService#0 ' defined in class path resource [activemq.xml]: Invocation of init method failed; nested exception is java.io.IOException: Transport Connector could not be registered in JMX: java.io.IOException: Failed to bind to server socket: amqp://0 .0 .0 .0 :5672 ?maximumConnections=1000 &wireFormat.maxFrameSize=104857600 due to: java.net .BindException: Address already in use: JVM_Bind jvm 1 | ERROR: java.lang.RuntimeException: Failed to execute start task. Reason: java.lang.IllegalStateException: BeanFactory not initialized or already closed - call 'refresh' before accessing beans via the ApplicationContext jvm 1 | java.lang.RuntimeException: Failed to execute start task. Reason: java.lang.IllegalStateException: BeanFactory not initialized or already closed - call 'refresh' before accessing beans via the ApplicationContext jvm 1 | at org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:91 ) jvm 1 | at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:63 ) jvm 1 | at org.apache.activemq.console.command.ShellCommand.runTask(ShellCommand.java:154 ) jvm 1 | at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:63 ) jvm 1 | at org.apache.activemq.console.command.ShellCommand.main(ShellCommand.java:104 ) jvm 1 | at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) jvm 1 | at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62 ) jvm 1 | at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43 ) jvm 1 | at java.lang.reflect.Method.invoke(Method.java:498 ) jvm 1 | at org.apache.activemq.console.Main.runTaskClass(Main.java:262 ) jvm 1 | at org.apache.activemq.console.Main.main(Main.java:115 ) jvm 1 | at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) jvm 1 | at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62 ) jvm 1 | at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43 ) jvm 1 | at java.lang.reflect.Method.invoke(Method.java:498 ) jvm 1 | at org.tanukisoftware.wrapper.WrapperSimpleApp.run(WrapperSimpleApp.java:240 ) jvm 1 | at java.lang.Thread.run(Thread.java:748 ) jvm 1 | Caused by: java.lang.IllegalStateException: BeanFactory not initialized or already closed - call 'refresh' before accessing beans via the ApplicationContext jvm 1 | at org.springframework.context.support.AbstractRefreshableApplicationContext.getBeanFactory(AbstractRefreshableApplicationContext.java:164 ) jvm 1 | at org.springframework.context.support.AbstractApplicationContext.destroyBeans(AbstractApplicationContext.java:1034 ) jvm 1 | at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:555 ) jvm 1 | at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:64 ) jvm 1 | at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:52 ) jvm 1 | at org.apache.activemq.xbean.XBeanBrokerFactory$1 .<init>(XBeanBrokerFactory.java:104 ) jvm 1 | at org.apache.activemq.xbean.XBeanBrokerFactory.createApplicationContext(XBeanBrokerFactory.java:104 ) jvm 1 | at org.apache.activemq.xbean.XBeanBrokerFactory.createBroker(XBeanBrokerFactory.java:67 ) jvm 1 | at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:71 ) jvm 1 | at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:54 ) jvm 1 | at org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:87 ) jvm 1 | ... 16 more jvm 1 | ERROR: java.lang.IllegalStateException: BeanFactory not initialized or already closed - call 'refresh' before accessing beans via the ApplicationContext jvm 1 | java.lang.IllegalStateException: BeanFactory not initialized or already closed - call 'refresh' before accessing beans via the ApplicationContext jvm 1 | at org.springframework.context.support.AbstractRefreshableApplicationContext.getBeanFactory(AbstractRefreshableApplicationContext.java:164 ) jvm 1 | at org.springframework.context.support.AbstractApplicationContext.destroyBeans(AbstractApplicationContext.java:1034 ) jvm 1 | at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:555 ) jvm 1 | at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:64 ) jvm 1 | at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:52 ) jvm 1 | at org.apache.activemq.xbean.XBeanBrokerFactory$1 .<init>(XBeanBrokerFactory.java:104 ) jvm 1 | at org.apache.activemq.xbean.XBeanBrokerFactory.createApplicationContext(XBeanBrokerFactory.java:104 ) jvm 1 | at org.apache.activemq.xbean.XBeanBrokerFactory.createBroker(XBeanBrokerFactory.java:67 ) jvm 1 | at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:71 ) jvm 1 | at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:54 ) jvm 1 | at org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:87 ) jvm 1 | at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:63 ) jvm 1 | at org.apache.activemq.console.command.ShellCommand.runTask(ShellCommand.java:154 ) jvm 1 | at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:63 ) jvm 1 | at org.apache.activemq.console.command.ShellCommand.main(ShellCommand.java:104 ) jvm 1 | at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) jvm 1 | at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62 ) jvm 1 | at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43 ) jvm 1 | at java.lang.reflect.Method.invoke(Method.java:498 ) jvm 1 | at org.apache.activemq.console.Main.runTaskClass(Main.java:262 ) jvm 1 | at org.apache.activemq.console.Main.main(Main.java:115 ) jvm 1 | at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) jvm 1 | at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62 ) jvm 1 | at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43 ) jvm 1 | at java.lang.reflect.Method.invoke(Method.java:498 ) jvm 1 | at org.tanukisoftware.wrapper.WrapperSimpleApp.run(WrapperSimpleApp.java:240 ) jvm 1 | at java.lang.Thread.run(Thread.java:748 ) wrapper | <-- Wrapper Stopped 请按任意键继续. . .
整合 做了这么多springboot整合第三方技术,已经摸到门路了,加坐标,做配置,调接口,直接开工
步骤① :导入springboot整合ActiveMQ的starter
1 2 3 4 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-activemq</artifactId > </dependency >
步骤② :配置ActiveMQ的服务器地址
1 2 3 spring: activemq: broker-url: tcp://localhost:61616
步骤③ :使用JmsMessagingTemplate操作ActiveMQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Service public class MessageServiceActivemqImpl implements MessageService { @Autowired private JmsMessagingTemplate messagingTemplate; @Override public void sendMessage (String id) { System.out.println("待发送短信的订单已纳入处理队列,id:" +id); messagingTemplate.convertAndSend("order.queue.id" ,id); } @Override public String doMessage () { String id = messagingTemplate.receiveAndConvert("order.queue.id" ,String.class); System.out.println("已完成短信发送业务,id:" +id); return id; } }
发送消息需要先将消息的类型转换成字符串,然后再发送,所以是convertAndSend,定义消息发送的位置,和具体的消息内容,此处使用id作为消息内容。
接收消息需要先将消息接收到,然后再转换成指定的数据类型,所以是receiveAndConvert,接收消息除了提供读取的位置,还要给出转换后的数据的具体类型。
步骤④ :使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息
1 2 3 4 5 6 7 8 9 @Component public class MessageListener { @JmsListener(destination = "order.queue.id") @SendTo("order.other.queue.id") public String receive (String id) { System.out.println("已完成短信发送业务,id:" +id); return "new:" +id; } }
使用注解@JmsListener定义当前方法监听ActiveMQ中指定名称的消息队列。
如果当前消息队列处理完还需要继续向下传递当前消息到另一个队列中使用注解@SendTo即可,这样即可构造连续执行的顺序消息队列。
步骤⑤ :切换消息模型由点对点模型到发布订阅模型,修改jms配置即可
1 2 3 4 5 spring: activemq: broker-url: tcp://localhost:61616 jms: pub-sub-domain: true
pub-sub-domain默认值为false,即点对点模型,修改为true后就是发布订阅模型。
总结
springboot整合ActiveMQ提供了JmsMessagingTemplate对象作为客户端操作消息队列
操作ActiveMQ需要配置ActiveMQ服务器地址,默认端口61616
企业开发时通常使用监听器来处理消息队列中的消息,设置监听器使用注解@JmsListener
配置jms的pub-sub-domain属性可以在点对点模型和发布订阅模型间切换消息模型
SpringBoot整合RabbitMQ RabbitMQ是MQ产品中的目前较为流行的产品之一,它遵从AMQP协议。RabbitMQ的底层实现语言使用的是Erlang,所以安装RabbitMQ需要先安装Erlang。
Erlang安装
windows版安装包下载地址:https ://www.erlang.org/downloads
下载完毕后得到exe安装文件,一键傻瓜式安装,安装完毕需要重启,需要重启,需要重启。
安装的过程中可能会出现依赖Windows组件的提示,根据提示下载安装即可,都是自动执行的,如下:
Erlang安装后需要配置环境变量,否则RabbitMQ将无法找到安装的Erlang。需要配置项如下,作用等同JDK配置环境变量的作用。
安装 windows版安装包下载地址:https:// rabbitmq.com/install-windows.html
下载完毕后得到exe安装文件,一键傻瓜式安装,安装完毕后会得到如下文件
启动服务器
1 2 3 rabbitmq-service.bat start # 启动服务 rabbitmq-service.bat stop # 停止服务 rabbitmqctl status # 查看服务状态
运行sbin目录下的rabbitmq-service.bat命令即可,start参数表示启动,stop参数表示退出,默认对外服务端口5672。
注意:启动rabbitmq的过程实际上是开启rabbitmq对应的系统服务,需要管理员权限方可执行。
说明:有没有感觉5672的服务端口很熟悉?activemq与rabbitmq有一个端口冲突问题,学习阶段无论操作哪一个?请确保另一个处于关闭状态。
说明:不喜欢命令行的小伙伴可以使用任务管理器中的服务页,找到RabbitMQ服务,使用鼠标右键菜单控制服务的启停。
访问web管理服务
RabbitMQ也提供有web控制台服务,但是此功能是一个插件,需要先启用才可以使用。
1 2 rabbitmq-plugins.bat list # 查看当前所有插件的运行状态 rabbitmq-plugins.bat enable rabbitmq_management # 启动rabbitmq_management插件
启动插件后可以在插件运行状态中查看是否运行,运行后通过浏览器即可打开服务后台管理界面
web管理服务默认端口15672,访问后可以打开RabbitMQ的管理界面,如下:
首先输入访问用户名和密码,初始化用户名和密码相同,均为:guest,成功登录后进入管理后台界面,如下:
整合(direct模型) RabbitMQ满足AMQP协议,因此不同的消息模型对应的制作不同,先使用最简单的direct模型开发。
步骤① :导入springboot整合amqp的starter,amqp协议默认实现为rabbitmq方案
1 2 3 4 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency >
步骤② :配置RabbitMQ的服务器地址
1 2 3 4 spring: rabbitmq: host: localhost port: 5672
步骤③ :初始化直连模式系统设置
由于RabbitMQ不同模型要使用不同的交换机,因此需要先初始化RabbitMQ相关的对象,例如队列,交换机等
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Configuration public class RabbitConfigDirect { @Bean public Queue directQueue () { return new Queue("direct_queue" ); } @Bean public Queue directQueue2 () { return new Queue("direct_queue2" ); } @Bean public DirectExchange directExchange () { return new DirectExchange("directExchange" ); } @Bean public Binding bindingDirect () { return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct" ); } @Bean public Binding bindingDirect2 () { return BindingBuilder.bind(directQueue2()).to(directExchange()).with("direct2" ); } }
队列Queue与直连交换机DirectExchange创建后,还需要绑定他们之间的关系Binding,这样就可以通过交换机操作对应队列。
步骤④ :使用AmqpTemplate操作RabbitMQ
1 2 3 4 5 6 7 8 9 10 11 @Service public class MessageServiceRabbitmqDirectImpl implements MessageService { @Autowired private AmqpTemplate amqpTemplate; @Override public void sendMessage (String id) { System.out.println("待发送短信的订单已纳入处理队列(rabbitmq direct),id:" +id); amqpTemplate.convertAndSend("directExchange" ,"direct" ,id); } }
amqp协议中的操作API接口名称看上去和jms规范的操作API接口很相似,但是传递参数差异很大。
步骤⑤ :使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息
1 2 3 4 5 6 7 @Component public class MessageListener { @RabbitListener(queues = "direct_queue") public void receive (String id) { System.out.println("已完成短信发送业务(rabbitmq direct),id:" +id); } }
使用注解@RabbitListener定义当前方法监听RabbitMQ中指定名称的消息队列。
整合(topic模型) 步骤① :同上
步骤② :同上
步骤③ :初始化主题模式系统设置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Configuration public class RabbitConfigTopic { @Bean public Queue topicQueue () { return new Queue("topic_queue" ); } @Bean public Queue topicQueue2 () { return new Queue("topic_queue2" ); } @Bean public TopicExchange topicExchange () { return new TopicExchange("topicExchange" ); } @Bean public Binding bindingTopic () { return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("topic.*.id" ); } @Bean public Binding bindingTopic2 () { return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.orders.*" ); } }
主题模式支持routingKey匹配模式,*表示匹配一个单词,#表示匹配任意内容,这样就可以通过主题交换机将消息分发到不同的队列中,详细内容请参看RabbitMQ系列课程。
匹配键
topic.*.*
topic.#
topic.order.id
true
true
order.topic.id
false
false
topic.sm.order.id
false
true
topic.sm.id
false
true
topic.id.order
true
true
topic.id
false
true
topic.order
false
true
步骤④ :使用AmqpTemplate操作RabbitMQ
1 2 3 4 5 6 7 8 9 10 11 @Service public class MessageServiceRabbitmqTopicImpl implements MessageService { @Autowired private AmqpTemplate amqpTemplate; @Override public void sendMessage (String id) { System.out.println("待发送短信的订单已纳入处理队列(rabbitmq topic),id:" +id); amqpTemplate.convertAndSend("topicExchange" ,"topic.orders.id" ,id); } }
发送消息后,根据当前提供的routingKey与绑定交换机时设定的routingKey进行匹配,规则匹配成功消息才会进入到对应的队列中。
步骤⑤ :使用消息监听器在服务器启动后,监听指定队列
1 2 3 4 5 6 7 8 9 10 11 @Component public class MessageListener { @RabbitListener(queues = "topic_queue") public void receive (String id) { System.out.println("已完成短信发送业务(rabbitmq topic 1),id:" +id); } @RabbitListener(queues = "topic_queue2") public void receive2 (String id) { System.out.println("已完成短信发送业务(rabbitmq topic 22222222),id:" +id); } }
使用注解@RabbitListener定义当前方法监听RabbitMQ中指定名称的消息队列。
总结
springboot整合RabbitMQ提供了AmqpTemplate对象作为客户端操作消息队列
操作ActiveMQ需要配置ActiveMQ服务器地址,默认端口5672
企业开发时通常使用监听器来处理消息队列中的消息,设置监听器使用注解@RabbitListener
RabbitMQ有5种消息模型,使用的队列相同,但是交换机不同。交换机不同,对应的消息进入的策略也不同
SpringBoot整合RocketMQ RocketMQ由阿里研发,后捐赠给apache基金会,目前是apache基金会顶级项目之一,也是目前市面上的MQ产品中较为流行的产品之一,它遵从AMQP协议。
安装 windows版安装包下载地址:https://rocketmq.apache.org /
下载完毕后得到zip压缩文件,解压缩即可使用,解压后得到如下文件
RocketMQ安装后需要配置环境变量,具体如下:
ROCKETMQ_HOME
PATH
NAMESRV_ADDR (建议): 127.0.0.1:9876
关于NAMESRV_ADDR对于初学者来说建议配置此项,也可以通过命令设置对应值,操作略显繁琐,建议配置。系统学习RocketMQ知识后即可灵活控制该项。
RocketMQ工作模式
在RocketMQ中,处理业务的服务器称为broker,生产者与消费者不是直接与broker联系的,而是通过命名服务器进行通信。broker启动后会通知命名服务器自己已经上线,这样命名服务器中就保存有所有的broker信息。当生产者与消费者需要连接broker时,通过命名服务器找到对应的处理业务的broker,因此命名服务器在整套结构中起到一个信息中心的作用。并且broker启动前必须保障命名服务器先启动。
启动服务器
1 2 mqnamesrv # 启动命名服务器 mqbroker # 启动broker
运行bin目录下的mqnamesrv命令即可启动命名服务器,默认对外服务端口9876。
运行bin目录下的mqbroker命令即可启动broker服务器,如果环境变量中没有设置NAMESRV_ADDR则需要在运行mqbroker指令前通过set指令设置NAMESRV_ADDR的值,并且每次开启均需要设置此项。
测试服务器启动状态
RocketMQ提供有一套测试服务器功能的测试程序,运行bin目录下的tools命令即可使用。
1 2 tools org.apache.rocketmq.example.quickstart.Producer # 生产消息 tools org.apache.rocketmq.example.quickstart.Consumer # 消费消息
整合(异步消息) 步骤① :导入springboot整合RocketMQ的starter,此坐标不由springboot维护版本
1 2 3 4 5 <dependency > <groupId > org.apache.rocketmq</groupId > <artifactId > rocketmq-spring-boot-starter</artifactId > <version > 2.2.1</version > </dependency >
步骤② :配置RocketMQ的服务器地址
1 2 3 4 rocketmq: name-server: localhost:9876 producer: group: group_rocketmq
设置默认的生产者消费者所属组group。
步骤③ :使用RocketMQTemplate操作RocketMQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Service public class MessageServiceRocketmqImpl implements MessageService { @Autowired private RocketMQTemplate rocketMQTemplate; @Override public void sendMessage (String id) { System.out.println("待发送短信的订单已纳入处理队列(rocketmq),id:" +id); SendCallback callback = new SendCallback() { @Override public void onSuccess (SendResult sendResult) { System.out.println("消息发送成功" ); } @Override public void onException (Throwable e) { System.out.println("消息发送失败!!!!!" ); } }; rocketMQTemplate.asyncSend("order_id" ,id,callback); } }
使用asyncSend方法发送异步消息。
步骤④ :使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息
1 2 3 4 5 6 7 8 @Component @RocketMQMessageListener(topic = "order_id",consumerGroup = "group_rocketmq") public class MessageListener implements RocketMQListener <String > { @Override public void onMessage (String id) { System.out.println("已完成短信发送业务(rocketmq),id:" +id); } }
RocketMQ的监听器必须按照标准格式开发,实现RocketMQListener接口,泛型为消息类型。
使用注解@RocketMQMessageListener定义当前类监听RabbitMQ中指定组、指定名称的消息队列。
总结
springboot整合RocketMQ使用RocketMQTemplate对象作为客户端操作消息队列
操作RocketMQ需要配置RocketMQ服务器地址,默认端口9876
企业开发时通常使用监听器来处理消息队列中的消息,设置监听器使用注解@RocketMQMessageListener
SpringBoot整合Kafka 安装 windows版安装包下载地址:https:// kafka.apache.org/downloads
下载完毕后得到tgz压缩文件,使用解压缩软件解压缩即可使用,解压后得到如下文件
建议使用windows版2.8.1版本。
启动服务器
kafka服务器的功能相当于RocketMQ中的broker,kafka运行还需要一个类似于命名服务器的服务。在kafka安装目录中自带一个类似于命名服务器的工具,叫做zookeeper,它的作用是注册中心,相关知识请到对应课程中学习。
1 2 zookeeper-server-start .bat ..\..\config\zookeeper.properties # 启动zookeeper kafka-server-start .bat ..\..\config\server.properties # 启动kafka
运行bin目录下的windows目录下的zookeeper-server-start命令即可启动注册中心,默认对外服务端口2181。
运行bin目录下的windows目录下的kafka-server-start命令即可启动kafka服务器,默认对外服务端口9092。
创建主题
和之前操作其他MQ产品相似,kakfa也是基于主题操作,操作之前需要先初始化topic。
1 2 3 4 5 6 # 创建topic kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic itheima # 查询topic kafka-topics.bat --zookeeper 127 .0 .0 .1 :2181 --list # 删除topic kafka-topics.bat --delete --zookeeper localhost:2181 --topic itheima
测试服务器启动状态
Kafka提供有一套测试服务器功能的测试程序,运行bin目录下的windows目录下的命令即可使用。
1 2 kafka-console-producer.bat --broker-list localhost:9092 --topic itheima # 测试生产消息 kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic itheima --from-beginning # 测试消息消费
整合 步骤① :导入springboot整合Kafka的starter,此坐标由springboot维护版本
1 2 3 4 <dependency > <groupId > org.springframework.kafka</groupId > <artifactId > spring-kafka</artifactId > </dependency >
步骤② :配置Kafka的服务器地址
1 2 3 4 5 spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: order
设置默认的生产者消费者所属组id。
步骤③ :使用KafkaTemplate操作Kafka
1 2 3 4 5 6 7 8 9 10 11 @Service public class MessageServiceKafkaImpl implements MessageService { @Autowired private KafkaTemplate<String,String> kafkaTemplate; @Override public void sendMessage (String id) { System.out.println("待发送短信的订单已纳入处理队列(kafka),id:" +id); kafkaTemplate.send("itheima2022" ,id); } }
使用send方法发送消息,需要传入topic名称。
步骤④ :使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息
1 2 3 4 5 6 7 @Component public class MessageListener { @KafkaListener(topics = "itheima2022") public void onMessage (ConsumerRecord<String,String> record) { System.out.println("已完成短信发送业务(kafka),id:" +record.value()); } }
使用注解@KafkaListener定义当前方法监听Kafka中指定topic的消息,接收到的消息封装在对象ConsumerRecord中,获取数据从ConsumerRecord对象中获取即可。
总结
springboot整合Kafka使用KafkaTemplate对象作为客户端操作消息队列
操作Kafka需要配置Kafka服务器地址,默认端口9092
企业开发时通常使用监听器来处理消息队列中的消息,设置监听器使用注解@KafkaListener。接收消息保存在形参ConsumerRecord对象中