我们要解决的问题
- 完成组织内外的各种异构系统、应用、数据源之间共享和交换信息。
- 优化现有结构,使整个系统易于拓展和维护。
- 保证多个系统各自独立互不干扰。
总结发现我们实际要解决的问题是:企业应用集成(Enterprise Application Integration,EAI) 是完成在组织内、外的各种异构系统,应用和数据源之间共享和交换信息和协作的途径,方法学,标准和技术。
EAI的常用解决方案
SOA(Service Oriented Architecture) 中文释义为 “面向服务的架构”它是一种设计理念,其中包含多个服务, 服务之间通过相互依赖最终提供一系列完整的功能。各个服务通常以独立的形式部署运行,服务之间通过网络进行调用。要求各个服务遵循统一的规范和契约。
ESB企业服务总线
ESB(Enterprise Service Bus,即企业服务总线) 就是一根管道,用来连接各个服务节点。ESB的存在是为了集成基于不同协议的不同服务,ESB 做了消息的转化、解释以及路由的工作,以此来让不同的服务互联互通
如何实现ESB
当前实现ESB比较成熟的模型为EIP(Enterprise Integration Patterns)1。他的包含如下规范:
- 集成方式(Integration Styles):EIP规定所有集成模式要基于消息传送模式。
- 通道模式(Channel Patterns):消息通过通道进行传递。
- 消息体模式(Message Construction Patterns):描述了在消息系统中交互的消息的规范。
- 路由模式(Routing Patterns):消息如何从发送者分发到正确的接收者,中间一般不进行修改。
- 转换模式(Transformation Patterns):将消息体的内容修改为接收者可以理解的结构,中间可能要对数据进行修改或者筛选。
- 终端模式(Endpoint Patterns):生成或者接收消息的客户。
- 系统管理模式(System Management Patterns):提供监控整个系统状态的工具,包括错误处理,压力测试或者监控系统变化。
为什么使用Camel
当前热门的EIP集成框架分别有:Spring Integration、Mule ESB、Apache Camel。接下来分别对三个框架进行分析(打分为博主的评估,仅供参考):
Spring Integration只提供了非常基础的支持,如文件,FTP,JMS,TCP,HTTP或Web服务。集成是通过编写大量的XML代码(没有一个真正的DSL)实现的。使用它,寓意着大量的XML编写工作。
Mule ESB不是仅仅一个集成框架,而是一个包括一些额外功能的完整ESB,比Spring集成它更像是一个DSL。
因为是一个完整的ESB,所以集成逻辑会比较复杂。
Apache Camel实现了你能想到的几乎每一个技术,提供很多组件,同时你可以很容易的自定义组件。而且Camel和Spring的集成很完善。Camel可以实现用到才依赖,不用不依赖。
Camel可以做什么
Apache Camel简介
apache camel 是轻量级ESB框架
它有几个比较重要的概念就是:
- endpoint,所谓的endpoint,就是一种可以接收或发送数据的组件。可以支持多种协议,如jms,http,file等。
- processor,它是用来处理具体业务逻辑的组件。
- route,用来路由,指示数据从哪里来到哪里去,中间用哪个processor处理。
而processor之间用exchange对象来传送数据,有点像jms,通俗一点就像上学时传的小纸条。
camel就是企业信息集成框架,它提供了很多简单好用而又强大的组件,用户可以根据场景来选择不同的EIP(企业集成模式)来实现自己的需求,以响应快速变化的业务。可以把它当成企业信息总线(ESB)的轻量级实现。
camel是一款基于规则快速实现消息流转的开发组件,集成该组件后,你的程序可以编写最少的代码实现复杂的消息在不同的协议规则间流转。 例如:程序实现从Ftp获得.xml文件,然后将收到的文件内容值转换后,发送到Jms Queue中,并且将Request写入到数据库log表。 Ftp组件->Jms组件->Db组件 只需要短短的几行代码就可以实现这样一个功能,但是如果用其他框架一个个功能的写,将会有非常多的代码量并且可能会出现一些纰漏,而camel已经将这些功能都封装在camel组件中了,节省开发成本。 from("ftp://xxxxxxxxxxxxx").bean("bean:JmsQueueCovertBean?method=convert").to("jms://xxxxxxxxxxx")..setBody(simple("insert into xxxxxxxxxxx")).to("jdbc:testdb");
Apache camel 是一个基于EIP的开源框架。实现了EIP定义的一些不同应用系统之间的消息传输模型,包括常见的Point2Point、Pub/Sub模型。
Camel的消息传递系统(Message System)2:
- 终端(Message Endpoint):可以是异构的业务系统,都需要提供Endpoint实现集成。
- 通道(Message Channel):两个应用之间进行信息通讯的通道。
- 消息(Message):Endpoint之间交互的标准化单位。
- 路由(Message Router):根据一定的条件,将消息传递给不同的过滤器以实现对单个处理步骤的解耦。
- 转换器(Message Translator):消息在传输过程中的转换和数据映射,包括报文格式转换和内容转换映射。
- 管道和过滤器(Pipes & Filters):在保持独立性和灵活性的基础上,对复杂的消息进行处理。
- 消息汇聚:比如将来自不同服务器的数据,有ActiveMQ、RabbitMQ、WebService等的数据合成报表。
- 消息分发:将消息从消息生产者转发给消息接收者,分发方式分为两种:顺序分发&并行分发。
from("amqp:queue:order") .to("uri:validateBean", "uri:handleBean", "uri:emailBean"); from("amqp:queue:order") .multicast() .to("uri:validateBean", "uri:handleBean", "uri:emailBean");
消息转换:将消息内容进行转换,比如xml转为json格式。
from("amqp:queue:order") .process(new XmlToJsonProcessor()) .to("bean:orderHandler");
规则引擎:可以使用Spring XML配置或DSL来定义route。同时camel提供了大量内置Processor,用于逻辑运算、过滤等,这样更容易灵活的管理route。
<route> <from uri="amqp:queue:order"/> <multicast> <to uri="uri:validateBean"/> <to uri="uri:handleBean"/> <to uri="uri:emailBean"/> </multicast> </route>
from("amqp:queue:order") .filter(header("foo") .isEqualTo("bar")) .choice() .when(xpath("/person/city = 'London'")) .to("file:target/messages/uk") .otherwise() .to("file:target/messages/others");
Camel的核心要素
Camel有以下五要素:
- Endpoint:用于收发消息。
- Exchange:消息本体。
- Processor:消息处理器。
- Routing:路由规则。
- Service:Camel基础概念。
Message
org.apache.camel.Message是Camel中一个基本的包含数据和路由的实体,Messages包含了
- 唯一的识别(Unique Identifier)–java.lang.String类型
- 头信息(Headers)–会提供一些内容的提示,头信息被组织成名值对的形式,string–>Object
- 内容(body)是一个Object类型的对象,这就意味着,你要确保接收器能够理解消息的内容。当消息发送器和接收器使用不同的内容格式的时候,你可以使用Camel的数据转换机制将其转换为一个特定的格式。在许多情况下预先定义类型可以被自动转换。
- 错误标记(fault flag)使用来标记正常或者错误的标记,通常由一些标准类定义,例如(WSDL)
Endpoint
是Camel中的一个基本概念,Endpoint作为Camel系统中一个通道的端点,可以发送或者接受消息。在Camel中Endpoint使用URI来配置。在运行时Camel通过URI来查找端点。端点的功能强大、全面而且又可维护。来看一些例子。
- Endpoint是Camel与其他系统进行通信的设定点。
- Camel自身提供了广泛的通信协议支持,例如:RPC协议、HTTP协议、FTP协议……
- Camel中的Endpoint使用URI描述对目标系统的通信。
- 对Endpoint实例的创建通过对Camel中org.apche.camel.Component接口的实现来实现的。
- Camel通过Plug方式提供对各种协议的Endpoint支持,如果需要使用某种Endpoint,需要引入响应的plug。例如要使用Camel对Netty4-Endpoint的支持,要引入camel-netty4的依赖包。
Component
Component是一些Endpoints URI的集合。他们通过连接码来链接(例如file:,jms:),而且作为一个endpoint的工厂。现在Camel中又超过80个Component。当然你一可以通过扩展org.apache.camel.impl.DefaultComponent来实现自己的Component
Exchange
org.apache.camel.Exchange 是一个消息之间通信的抽象的会话。下面列出的就是这样一个会话,使得组件更为全面
- Properties:Exchange对象贯穿整个路由执行过程中的控制端点、处理器甚至还有表达式、路由条件判断。为了让这些元素能够共享一些开发人员自定义的参数配置信息,Exchange以K-V结构提供了这样的参数配置信息存储方式。
- Patterns:Exchange中的pattern属性非常重要,它的全称是:ExchangePattern(交换器工作模式)。其实现是一个枚举类型:org.apache.camel.ExchangePattern。可以使用的值包括:InOnly, RobustInOnly, InOut, InOptionalOut, OutOnly, RobustOutOnly, OutIn, OutOptionalIn。从Camel官方已公布的文档来看,这个属性描述了Exchange中消息的传播方式。
- Message IN/OUT:当Endpoint和Processor、Processor和Processor间的Message在Exchange中传递时,Exchange会自动将上一个元素的输出作为这个元素的输入使用。
Processor
Processor用于接受从Endpoint、Routing或者另一个Processor的Exchange中传来的消息,并进行处理。
Camel核心包和各个Plugin组件都提供了很多Processor的实现,开发人员也可以通过实现org.apache.camel.Processor接口自定义Processor。
org.apache.camel.Processor 是一个消息接受者和消息通信的处理器。当然,Processor是Route的一个元素,可用来消息格式转换或者其他的一些变换。
// 一个自定义处理器的实现 public class OtherProcessor implements Processor { @Override public void process(Exchange exchange) throws Exception { Message message = exchange.getIn(); String body = message.getBody().toString(); //=============== // 您可以在这里进行数据格式转换 // 并且将结果存储到out message中 //=============== // 存入到exchange的out区域 if(exchange.getPattern() == ExchangePattern.InOut) { Message outMessage = exchange.getOut(); outMessage.setBody(body + " || other out"); } } }
Routing
Routing用于处理Endpoint和Processor之间、Processor和Processor之间的路由跳转。
Camel中支持的路由规则非常丰富,包括基于内容、接收者列表、循环动态路由等。
顾名思义,Route,就是路由,它定义了Message如何在一个系统中传输的真实路径或者通道。路由引擎自身并不暴露给开发者,但是开发者可以自己定义路由,并且需要信任引擎可以完成复杂的传输工作。每个路由都有一个唯一的标识符,用来记录日志、调试、监控,以及启动或者停止路由。
路由也有一个输入的Message,因此他们也有效的链接到一个输入端点。路由定义了一种领域特有的语言(DSL)。Camel提供了java、scala和基于XM的Route-DSL。
示例路由:
//simple route. from("file:data/inbox").to("jms:queue:order")
路由可以使用过滤器、多播、接收列表、并行处理来定义,从而变得非常灵活。由于这篇文章只是简单的介绍Camel,我这里只给出一个注释的例子。这个使用了“direct:”架构,他提供了当消息生产者发出消息后直接的、同步的调用。
//Every 10 seconds timer sends an Exchange to direct:prepare from("timer://foo?fixedRate=true&period=10000").to("direct:prepare"); // Onother Routes can begin from "direct:prepare" // This now depends on timer, logging and putting a message to the queue. from(direct:prepare).to("log:com.mycompany.order?level=DEBUG").to("jms:queue:order?jmsMessageType=Text");
Service
在Apache Camel中有一个比Endpoint、Component、CamelContext等元素更基础的概念元素:Service。
包括Endpoint、Component、CamelContext等元素在内的大多数工作在Camel中的元素,都是一个一个的Service。
Camel应用程序中的每一个Service都是独立运行的,各个Service的关联衔接通过CamelContext上下文对象完成。每一个Service通过调用start()方法被激活并参与到Camel应用程序的工作中,直到它的stop()方法被调用。也就是说,每个Service都有独立的生命周期。
CamelContext上下文
CamelContext横跨了Camel服务的整个生命周期,并且为Camel服务的工作环境提供支撑。
现在让我们来看看一张图,我们看到的是一些不同的相互链接的构件,而在他们中间起链接作用的粘合剂就是Camel Context了。他将实体链接一起,有的时候被称为Camel运行是容器。
代码实践
引入相关Jar包
<!-- apache camel --> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> <version>2.24.2</version> </dependency> <!-- apache camel 集成 activemq中间件 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-camel</artifactId> <version>5.15.4</version> </dependency>
传输文件到消息件
public static void main(String[] args) throws Exception { DefaultCamelContext context = new DefaultCamelContext(); ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","http://172.16.2.221:8161"); context.addComponent("activemq", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)); context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("file:input_box?noop=true") .to("activemq:queue:my_queue"); } }); context.start(); }
传输对象到消息中间件
public static void main(String[] args) throws Exception { DefaultCamelContext context = new DefaultCamelContext(); ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://172.16.2.221:61616"); context.addComponent("activemq", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)); context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start") .to("activemq:queue:my_queue"); } }); context.start(); ProducerTemplate producerTemplate = context.createProducerTemplate(); producerTemplate.sendBody("direct:start","测试消息"); }
生产者和消费者示例
process是一个处理器,可以处理消费者消费消息之前的消息。
public static void main(String[] args) throws Exception { DefaultCamelContext context = new DefaultCamelContext(); context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start") .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { System.out.println("进入消息处理器..."); //提供者发送的消息 String msg = exchange.getIn().getBody(String.class); msg = msg + "-By FanJiangFeng"; System.out.println("消息被我修改成:"+msg); //重新发送 exchange.getOut().setBody(msg); } }) .to("seda:end"); } }); context.start(); //提供者 ProducerTemplate producerTemplate = context.createProducerTemplate(); producerTemplate.sendBody("direct:start","Hello Everyone"); //消费者 ConsumerTemplate consumerTemplate = context.createConsumerTemplate(); String message = consumerTemplate.receiveBody("seda:end", String.class); System.out.println("消费者取出的消息:"+message); }
消息生产者生产的数据为数据库中sql查询到的数据
需要Jar包
<!-- apache camel 集成 数据库 --> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-jdbc</artifactId> <version>2.22.1</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> </dependency>
测试类
public static void main(String[] args) throws Exception { MysqlDataSource dataSource = new MysqlDataSource(); dataSource.setURL("jdbc:mysql://localhost:3306/test"); dataSource.setUser("root"); dataSource.setPassword("1234"); SimpleRegistry simpleRegistry = new SimpleRegistry(); simpleRegistry.put("myDataSource",dataSource); DefaultCamelContext context = new DefaultCamelContext(simpleRegistry); context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start") .to("jdbc:myDataSource") .bean(new ResultHandlerTest(),"printResult"); } }); context.start(); //生产者 ProducerTemplate producerTemplate = context.createProducerTemplate(); producerTemplate.sendBody("direct:start","select * from user"); } public static class ResultHandlerTest{ private void printResult(List list){ for(int i=0;i<list.size();i++){ System.out.println(list.get(i)); } } }
消息生产者发送消息,由某个类的某个方法进行消费消息(第一种方式)
消息生产者发送消息,由某个类的某个方法进行消费消息(此方法必须为public方法,否则报方法找不到的异常)
public class CallMethodUsingClassComponent { public void consumerMethod(String message){ System.out.println("消费者方法接收消息:"+ message); } public static void main(String[] args) throws Exception{ DefaultCamelContext context = new DefaultCamelContext(); context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start") .to("class:com.ftx.camel.test.CallMethodUsingClassComponent?method=consumerMethod"); } }); context.start(); //创建消息生产者 ProducerTemplate producerTemplate = context.createProducerTemplate(); producerTemplate.sendBody("direct:start","测试消息"); } }
消息生产者发送消息,由某个类的某个方法进行消费消息(第二种方式)
此方法必须为public方法,否则报方法找不到的异常
public class CallMethodUsingClassComponent2 { public void consumerMethod(String message){ System.out.println("消费者方法接收消息:"+ message); } public static void main(String[] args) throws Exception{ CallMethodUsingClassComponent2 callMethodUsingClassComponent = new CallMethodUsingClassComponent2(); SimpleRegistry registry = new SimpleRegistry(); registry.put("myService",callMethodUsingClassComponent); DefaultCamelContext context = new DefaultCamelContext(registry); context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start") .to("bean:myService?method=consumerMethod"); } }); context.start(); //创建消息生产者 ProducerTemplate producerTemplate = context.createProducerTemplate(); producerTemplate.sendBody("direct:start","测试消息"); } }
消费者从activeMQ中消费消息
public class ActiveMQConsumer { public static void main(String[] args) throws Exception { DefaultCamelContext context = new DefaultCamelContext(); ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://172.16.2.221:61616"); context.addComponent("activemq", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)); context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("activemq:queue:my_queue").to("seda:end"); } }); context.start(); ConsumerTemplate consumerTemplate = context.createConsumerTemplate(); String message = consumerTemplate.receiveBody("seda:end", String.class); System.out.println(message); } }
补充:关于”direct:start”
from("direct:start") .to("http://myhost/mypath");
上面的”direct:start”只是说路由,以直接端点提供路由的同步调用.如果要将Exchange
发送到direct:start端点,则可以创建ProducerTemplate
并使用各种发送方法.
- Direct Component
基于内存的同步消息组件
使用Direct组件,生产者直接调用消费者。因此使用Direct组件的唯一开销是方法调用。
Direct的线程模型:由于生产者直接调用消费者
因此:调用者与camel的消费者共用一个线程
https://camel.apache.org/components/next/direct-component.html
- SEDA Component
基于内存的异步消息组件:生产者和消费者通过BlockingQueue交换消息,生产者与消费者是不同的线程
如果VM在消息尚未处理时终止,则seda不会实现消息的持久化或恢复,因此有丢失消息的风险
消费者视角
Consumer thread pool:SedaConsumer内部持有一个线程池,默认是1个线程,可以通过concurrentConsumers指定线程数
代码如下所示:
from("seda:start?concurrentConsumers=2") .to("log:A") .to("log:B");
Consumer thread pool中的每个线程,还可以开启新的线程池,代码如下所示
from("seda:start?concurrentConsumers=2") .to("log:A") // create a thread pool with a pool size of 5 and a maxi- mum size of 10. .threads(5, 10) .to("log:B");
生产者视角
异步发送消息:生产者发完消息,立刻返回,不需要等待消息消费成功
//InOnly消息模式 producerTemplate.sendBody("seda:start", body);
同步发送消息:生产者发完消息,会阻塞,直到消费成功
//InOut消息模式 producerTemplate.requestBody("seda:start", body);