博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Javaweb——Spring Boot 系列(19)Integration
阅读量:3934 次
发布时间:2019-05-23

本文共 11502 字,大约阅读时间需要 38 分钟。

Spring Integration

一、何为 Spring Integration

  • Spring Integration 提供了基于 Spring 的 EIP(Enterprise Integration Patterns,企业集成模式)的实现,主要解决不同系统之间的交互问题,通过异步消息驱动来达到系统交互时系统之间的松耦合。
  • Spring Integration主要由 Message、ChannelMessage EndPoint 构成。

1、Message

  • Message 即数据,需要在不同部分之间传递的数据。Message 通常由消息体和消息头组成。消息的具体内容可以是任何能够被当前的技术解析可视化的数据类型。通常,消息头的元数据就是解释消息体的内容。

2、Channel

  • Channel 即通道,消息要在不同部分之间传递,就需要一种介质,这种介质就是Channel
  • 一个消息在收发两方的具体细节就是:消息发送者将消息发送到 Channel 中,接收者从 Channel中接收消息。
  • Spring Integration提供了三个 Channel 接口:MessageChannel、PollableChannelSubscribableChannel
  • 其中 MessageChannel 是顶级接口,是另外两个接口的父类;PollableChannle 是一个轮询接口,另外一个则是订阅接口。
  • Spring Integration 在提供接口的同时,提供了如下表的常用消息通道:
    通道名 方式
    PublishSubscribeChannel 以广播的形式将消息发送给所有订阅者
    QueueChannel 以轮询的方式从通道中接收消息,用一个 queue 接收消息,队列大小可配置
    PriorityChannel 按照优先级将数据存储到对,依据于消息的消息头 priority 属性
    RendezvousChannel 确保每一个接收者都接收到消息后再发送消息
    DirectChannel Spring Integration 默认的消息通道,它允许将消息发送给一个订阅者,然后阻碍发送直到消息被接收
    ExecuteorChannel 绑定一个多线程的 task executor 来发送消息
  • 与此同时,Spring Integration 海提供了一个 ChannelInterceptor,也就是通道拦截器,用于拦截发送和接收消息的操作;通常我们只需实现这个接口即可,而可用 channel.addInterceptor(someInterceptor) 给所有的通道增加拦截器。

3、Message EndPoint

  • Message EndPoint,译为消息端点,是真正处理消息的组件,可以控制通道的路由,Spring Integration 提供的可用消息端点如下:
    端点 用途
    Channel Adapter 通道适配器,一种连接外部系统或传输协议的端点,分为 inbound 和 outbound,即入站适配器和出站适配器,入站负责接收消息,出站负责发送消息,Spring Integration 对大多数通信协议都有适配器支持。
    Gateway 网关,提供双向的请求/返回集成模式,同样分为入站和出站。
    Router 路由,可以消息体类型、消息头值以及定义好的接收表为条件,将消息分发到不同的通道
    Service Activator 调用 Bean 处理消息并将结果输出到指定的消息通道
    Filter 过滤器,决定消息是否可以传递给消息通道
    Splitter 拆分器,将消息拆分为几个部件单独处理,拆分器的返回值是一个集合或数组
    Aggregator 聚合器,以 java.util.List 为接收参数,将多个消息合并为一个
    Enricher 增强器,用于给收到的消息增加额外信息,分为消息头增强器和消息体增强器
    Transformer 转换器,对收到的消息进行逻辑转换,如格式转换
    Bridge 桥接器,用于将两个消息通道连接起来

二、Spring Integration Java DSL

  • Spring Integration 提供 IntegrationFlow 定义系统继承流程,通过 IntegrationFlows 和 IntegrationFlowBuilder 实现使用 Fluent API 定义流程。
  • Fluent API 中提供如下方法来映射 Spring Integration 的端点
    transform()	-> Transformer	filter()	-> Filter	handle()	-> ServiceActivator、Adapter、Gateway	split()		-> Spliter	aggregate()	-> Aggreator	route()		-> Router	bridge()	-> Bridge
  • 简单流程示例:
    @Beanpublic IntegrationFlow demo(){
    return IntegrationFlows.from("news") .
    transform(Integer::parseInt) .get();}

三、项目示例

  • 从 https://spring.io/blog.atom 获取资源,主要是一些 URL 链接,然后根据不同关键字将消息发送到不同消息通道。

1、新建项目

  • 新建一个 Spring Boot 项目,初始依赖选择 Integration 和 mail。
  • 手动添加如下三个依赖,
    org.springframework.integration
    spring-integration-feed
    org.springframework.integration
    spring-integration-mail
    org.springframework.integration
    spring-integration-java-dsl
    1.1.0.M1
  • 整个 POM 文件内容如下:
    4.0.0
    org.springframework.boot
    spring-boot-starter-parent
    1.3.0.M4
    com.pyc
    mymessage
    0.0.1-SNAPSHOT
    mymessage
    Learning Integration and mail
    1.8
    org.springframework.boot
    spring-boot-starter-integration
    org.springframework.boot
    spring-boot-starter-mail
    org.springframework.integration
    spring-integration-feed
    org.springframework.integration
    spring-integration-mail
    org.springframework.integration
    spring-integration-java-dsl
    1.1.0.M1
    org.springframework.boot
    spring-boot-starter-test
    test
    org.junit.vintage
    junit-vintage-engine
    org.springframework.integration
    spring-integration-test
    test
    org.springframework.boot
    spring-boot-maven-plugin
  • application.properties 文件可以什么都不编辑,也可以编辑日志文件的位置。

2、编辑流程

  • 流程分为读取流程、releases 流程、engineering 流程和 news 流程,所有流程的代码都在入口类中完成编辑,如下:
    package com.pyc.mymessage;import static java.lang.System.getProperty;import com.rometools.rome.feed.synd.SyndEntry;import org.springframework.beans.factory.annotation.Value;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.context.annotation.Bean;import org.springframework.core.io.Resource;import org.springframework.integration.dsl.IntegrationFlow;import org.springframework.integration.dsl.IntegrationFlows;import org.springframework.integration.dsl.channel.MessageChannels;import org.springframework.integration.dsl.core.Pollers;import org.springframework.integration.dsl.mail.Mail;import org.springframework.integration.feed.inbound.FeedEntryMessageSource;import org.springframework.integration.file.support.FileExistsMode;import org.springframework.integration.scheduling.PollerMetadata;import org.springframework.integration.dsl.file.Files;import java.io.File;import java.io.IOException;@SpringBootApplicationpublic class MymessageApplication {
    public static void main(String[] args) {
    SpringApplication.run(MymessageApplication.class, args); } //--------------------------------------- // Flow path of read // using @value annotation to gain resources from https://spring.io/blog.atom by automatic @Value("https://spring.io/blog.atom") Resource resource; // using Fluent API and pollers to configure acquiescent way of poll @Bean(name = PollerMetadata.DEFAULT_POLLER) public PollerMetadata poller(){
    return Pollers.fixedRate(500).get(); } // here is a position where construct adapter of inbox channel of feed and use the adapter as data input @Bean public FeedEntryMessageSource feedMessageSource() throws IOException{
    return new FeedEntryMessageSource(resource.getURL(), "news"); } @Bean public IntegrationFlow myFlow() throws IOException{
    // flow path begin with the method called from return IntegrationFlows.from(feedMessageSource()) // select route by route method,type of payload is SyndEntry, //the criteria type is string and the criteria value from Categroy what is classify by payload .
    route(payload->payload.getCategories().get(0).getName(), // send the different value to different message channel mapping->mapping.channelMapping("releases", "releasesChannel") .channelMapping("engineering", "engineeringChannel") .channelMapping("news", "newsChannel") ).get(); // Get the IntegrationFlow entity by get method and configure as a bean of spring } //-------------------------------------- //------------------------------------------- // Releases flow path @Bean public IntegrationFlow releasesFlow(){
    // start read the data came from message channel releasesChannel return IntegrationFlows.from(MessageChannels.queue("releasesChannel",10)) // Data conversion by using transform method. type of payload is SyndEntry, // convert it to string type and custom data format .
    transform(payload->"《"+ payload.getTitle()+"》"+payload.getLink()+ getProperty("line.separator")) // handling the outbound adapter of file by using handle method. //Files class is a Fluent API provided by Spring Integration Java DSL to construct adapter of output files .handle( Files.outboundAdapter(new File("springblog")) .fileExistsMode(FileExistsMode.APPEND) .charset("UTF-8") .fileNameGenerator(message -> "releases.txt") .get() ).get(); } //-------------------------------------------- //-------------------------------------------- // Engineering flow path @Bean public IntegrationFlow engineeringFlow(){
    return IntegrationFlows.from(MessageChannels.queue("engineeringChannel",10)) .
    transform( e->"《"+e.getTitle()+"》"+e.getLink() +getProperty("line.separator") ).handle( Files.outboundAdapter(new File("springblog")) .fileExistsMode(FileExistsMode.APPEND) .charset("UTF-8") .fileNameGenerator(message -> "engineering.txt") .get() ).get(); } //------------------------------------------- @Bean public IntegrationFlow newsFlow(){
    return IntegrationFlows.from(MessageChannels.queue("newsChannel", 10)) .
    transform( payload->"《"+payload.getTitle()+"》"+payload.getLink()+getProperty("line.separator") ) // add the information of message head by using enricherHeader .enrichHeaders( Mail.headers() .subject("A news come from Spring") .to("553481864@qq.com") .from("15014366986@163.com")) // the information which send by mail is constructed by the method of Mail.headers provide by Spring Integration Java DSL .handle( Mail.outboundAdapter("smtp.163.com") .port(25) .protocol("smtp") .credentials("15014366986@163.com", "*********") .javaMailProperties(p->p.put("mail.debug", "false")), e->e.id("smtpOut") // define the outbound adapter of send mail by using a method called handle // Constructed by Mail.outboundAdapter provided from Spring Integration Java DSL ).get(); }}
  • news 流程特别说一下,因为在这个流程用到了 Mail,我这里用的是网易的163邮箱服务器发送邮件,通过网上查询 163 邮箱用于发送邮件的服务器地址是 smtp.163.com,服务器名称为 smtp,通常用不用 SSL 协议的端口 25,在 credentials() 里填写的是自己的邮箱地址和邮箱密码,值得注意的是为了远程授权成功,密码通常用客户端授权码,不清楚的可以百度怎么获取163邮箱的客户端授权码。

3、测试运行

  • 在本例中不用浏览器访问任何地址,只需等待程序处理完任务后,可以看到在项目的根路径中多了一个 springblog 文件夹,里面有两个 txt 文件:
    在这里插入图片描述
  • txt 文件的具体内容分别如下:
    在这里插入图片描述
    在这里插入图片描述
  • 然后确认 news 流程是否成功,我第一次时是用我自己的163邮箱给自己的 QQ 邮箱发送邮件,然后又给同学和朋友的邮箱发送邮件,查看自己的 QQ 邮箱:
    在这里插入图片描述
  • 问了同学和朋友,它们也收到了邮件,所以可以总结 news 流程成功无误。

转载地址:http://clqgn.baihongyu.com/

你可能感兴趣的文章
线性代数 | (3) 行列式
查看>>
学术英语 | (1) wordList1
查看>>
机器学习 | 台大林轩田机器学习技法课程笔记3 --- Kernel Support Vector Machine
查看>>
机器学习 | 台大林轩田机器学习技法课程笔记7 --- Blending and Bagging
查看>>
学术英语 | (6) WordList6
查看>>
线性代数 | (5) 线性方程组
查看>>
学术英文 | (7) Unit3Words
查看>>
线性代数 | (6) 相似对角形
查看>>
学术英语 | (8) WordList7
查看>>
概率论与数理统计 | (1) 概率论初步Part One
查看>>
概率论与数理统计 | (2) 概率论初步Part Two
查看>>
概率论与数理统计 | (3) 随机变量
查看>>
学术英语 | (9) WordList8
查看>>
概率论与数理统计 | (4) 二元随机变量Part One
查看>>
学术英语 | (10) WordList9
查看>>
李航机器学习 | (2) 统计学习方法(第2版)笔记 --- 感知机
查看>>
动手学PyTorch | (33) 通过时间反向传播
查看>>
动手学PyTorch | (37) 优化与深度学习
查看>>
动手学PyTorch | (39) 小批量随机梯度下降
查看>>
动手学PyTorch | (59) 微调(fine-tuning)
查看>>