本文主要是介绍【消息队列】if-mdp使用手册,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
转自:http://blog.csdn.net/wo240/article/details/47306565
1. 新版MDP总体介绍
- 方便多tibco ems环境下的接入;
- 引入mdp schema,配置更灵活,无需依赖其它xml bean配置文件;
- 既支持queue,也支持topic;
- 支持对多queue的广播;
- 支持selector进行消息路由;
- 底层传输采用比java序列化更高效的json方式;
- 应用易于灰度发布。
2. Mdp schema
新版MDP引入了MDP schema,使得MDP的配置更灵活,简单的配置,即可使普通的Java接口和类变身为远程服务,无需借助任何annotation。
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mdp="http://www.99bill.com/schema/mdp"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.99bill.com/schema/mdp http://www.99bill.com/schema/mdp/bill99-mdp-1.0.xsd">
2.1 service
service为mdp服务端配置,示例如下
<bean id="mdpTarget" class="com.bill99.mdp.DemoImpl"/> <mdp:service id="mdpDemo" interface="com.bill99.mdp.Demo" destination="mdp.demo" ref="mdpTarget" concurrency="2" server="mdpDemo" connection-factory="connectionFactory"> </mdp:service> |
---|
或
<mdp:service id="annoDemo" interface="com.bill99.mdp.annotation.AnnoDemo" destination="mdp.anno" concurrency="2" connection-factory="connectionFactory"> <bean class="com.bill99.mdp.annotation.AnnoDemoImpl"></bean> </mdp:service> |
---|
2.1.1 service属性
Attribute | 说明 | 必选值 |
id | spring bean id | No |
interface | 服务所暴露的接口 | Yes |
ref | 服务的实现类bean id,该bean实现了interface所声明的接口 | 与内嵌的bean二选一 |
server | 服务名称定义,便于对该mdp服务进行状态与性能的监控,建议名称为<组名>.<应用名称>,默认与id相同 | No |
destination | Jms destination名称,可以是queue名称,也可以是topic名称 | Yes |
Container-type | Jms listener container名称,分为simple和default两种,默认为default | No |
destination-type | queue或topic | No |
concurrency | 并发session数,默认值为1;当container-type为default或默认值时,可以设置范围值,如5-10,意指session的 最小值为5,最大值为10,可以根据负载自动调整并发数;注意,当destination-type为topic时,concurrency必 须是1,否则会收到重复的消息。 | No |
transacted | 消费消息时是否需要事务,默认false | No |
ttl | Mdp反馈结果消息的存活时间,单位毫秒,包括同步返回与异步回调返回的消息,默认为10分钟 | No |
connection-factory | Jms 连接工程ConnectionFactory | Yes |
selector | Jms selector | No |
2.1.2 service子元素
service可以嵌套一个bean定义,作为mdp服务的实现类,与ref属性功能相同,二者二选一,根据各自喜好而定。
2.2 reference
reference为mdp客户端配置,调用service暴露的服务,示例如下
<mdp:reference id="mdpDemo" interface="com.bill99.mdp.Demo" destination="mdp.demo" concurrency="10" connection-factory="connectionFactory" timeout="10000"> </mdp:reference> |
---|
2.2.1 reference属性
Attribute | 说明 | 必选值 |
id | Spring bean id | No |
interface | 客户端访问的接口,与服务端接口一致 | Yes |
destination | Jms destination名称,可以是queue名称,也可以是topic名称 | 与内嵌的destinations元素二选一 |
destination-type | queue或topic | No |
timeout | 同步调用的超时时间,单位毫秒 | No |
source | 标示客户端来源,用于监控客户端的调用情况,如频次,成功率等,建议名称为<组名>.<应用名称>默认 为客户端主机名 | No |
concurrency | 等待反馈结果的session数,默认值为5;注意客户端的请求session数是不受控制的,完成根据请求创建 | No |
connection-factory | Jms 连接工程ConnectionFactory | Yes |
transacted | 发送消息时是否需要事务,默认false。 如果需要JTA支持,只能是在异步调用时生效(即方法无返回值),并且需要XAconnectionFactory支持 | No |
2.2.2 reference子元素
示例如下
<mdp:reference id="mdpDemo" interface="com.bill99.mdp.Demo" concurrency="10" connection-factory="connectionFactory" timeout="10000"> <mdp:destinations> <value>mdp.demo1</value> <value>mdp.demo2</value> </mdp:destinations> </mdp:reference> |
---|
destinations可以实现多播的功能,向多个destination发送请求,但有个限制前提条件,多播只能用于异步无callback的调用场景。
2.2.3 动态生成reference
package com.bill99.mdp.reference;
import javax.jms.ConnectionFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.bill99.mdp.Demo;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:/context/context-mdp-client.xml" })
public class MdpReferenceFactoryBeanTest {
@Autowired
private ConnectionFactory connectionFactory;
private Demo demo;
private MdpReferenceFactoryBean factoryBean;
@Before
public void setUp() throws Exception {
factoryBean = new MdpReferenceFactoryBean();
factoryBean.setBeanClassLoader(Thread.currentThread().getContextClassLoader());
factoryBean.setInterfaze(Demo.class);
factoryBean.setDestinations(new String[] { "mdp.demo.test" });
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setSource("inf.mdpDemo");
factoryBean.afterPropertiesSet();
demo = (Demo) factoryBean.getObject();
}
@After
public void tearDown() throws Exception {
factoryBean.destroy();
}
@Test
public void testGetObject() {
demo.echo("hello");
}
@Test
public void testEcho() {
int echo = demo.echo(1);
Assert.assertTrue(echo > 0);
}
}
3. Mdp Annotation
对于大多数应用,上面介绍的service和reference基本上就能满足其需求,借助于service和reference配置,就可以将最普通java类和接口变身为远程服务。
下面介绍部分高级特性。
3.1 MdpMethod Annotation
MdpMethod用于定制接口方法调用时的jms相关属性,示例如下
@MdpMethod(timeout = 5000, cache = true, cacheTtl = 60000)public Date sync(@MessageProperty(name = "param") String now);
Property | 说明 |
timeout | 调用超时时间,单位毫秒,默认值为2分钟,如果在reference中配置的timeout值,将覆盖此处的值 |
priority | 消息优先级,由于tibco未实现优先级功能,此处不用设置 |
ttl | 消息存活时间,单位毫秒,默认值为0,永不过期;注意,对于返回的消息,默认为10分钟后过期 |
transacted | 发送请求时是否需要事务 |
cache | 客户端是否需要开启缓存,默认false,开启缓存后,调用结果将优先直接从本地缓存中获取 |
cacheTtl | 缓存的过期时间,单位毫秒 |
3.2 MessageProperties Annotation
MessageProperties和MessageProperty的作用是与service中的selector相配合,实现jms消息自动路由功能。
MessageProperties的作用是可以在参数上定义多个MessageProperty Annotation,如果不需要多个MessageProperty,则可以直接使用MessageProperty,示例如下
public void syncDynamicAnno(@MessageProperties({ @MessageProperty(name = "dynamic", jxpath = "prop1"),
@MessageProperty(name = "value", jxpath = "value") }) Dto dto);
Dto定义如下
public class Dto {
private String prop1;
private int value;
public String getProp1() {
return prop1;
}
public void setProp1(String prop1) {
this.prop1 = prop1;
}
public int getValue() {
return value;
}
public void setValue(int value) {
this.value = value;
}
}
MessageProperty将参数dto的值自动设置在jms消息的property中,配合service中的selector属性,就可以做到消息的自动路由了,如下所示:
<bean id="mdpTarget" class="com.bill99.mdp.DemoImpl"/> <mdp:service id="mdpDemo" interface="com.bill99.mdp.Demo" destination="mdp.demo" ref="mdpTarget" concurrency="2" server="mdpDemo" connection-factory="connectionFactory" selector="dynamic='test'"> </mdp:service> |
---|
3.3 MessageProperty Annotation
MessageProperty的作用是在方法调用时,见参数中的值设置到Jms消息的Properties字段中。
示例如下
public Date sync(@MessageProperty(name = "param") String now);
MessageProperty Annotation
属性 | 说明 |
name | 设置在jms properties中的字段名称 |
jxpath | 字段名称所对应的值表达式,将Annotation所标注的参数根据jxpath表达式获取对应的值,如果不设,则直接取标注参数为值 |
4. Wiki
4.1 MdpConnectionFactory工具类
MdpConnectionFactory简化了tibco连接信息配置项,方便在多tibco环境中使用
示例如下
<bean id="connectionFactory" class="com.bill99.mdp.connection.MdpConnectionFactory"> <property name="properties" value="classpath:properties/jms.properties"/> <property name="prefix" value="notify"/> <property name="sessionCacheSize" value="10"/> </bean> |
---|
其中两个配置属性说明如下:
prefix:配置项的前缀(可选),比如prefix值为notify时,则properties文件中的配置格式定义如下:
notify.java.naming.factory.initial=com.tibco.tibjms.naming.TibjmsInitialContextFactory
notify.java.naming.provider.url=tcp://192.168.6.81:7222
notify.java.naming.security.principal=
notify.java.naming.security.credentials=
notify.connection.factory.jndiname=seashellConnectionFactory
如果prefix为空,则properties内容如下:
java.naming.factory.initial=com.tibco.tibjms.naming.TibjmsInitialContextFactory
java.naming.provider.url=tcp://192.168.6.81:7222
java.naming.security.principal=
java.naming.security.credentials=
connection.factory.jndiname=seashellConnectionFactory
sessionCacheSize属性为缓存session的数量,默认为1,如果负载较大,可以适当加大此值,比如5,提高系统吞吐量。
4.2 MdpXaConnectionFactory工具类
功能同上,但支持JTA分布式事务。
MdpConnectionFactory与MdpXaConnectionFactory对比如下。
MdpConnectionFactory
Ø 不支持atomikos JTA事务
Ø 如果要在oc4j、weblogic等提供JTA实现的容器中使用JTA事务,请直接使用setTargetConnectionFactory(ConnectionFactory connectionFactory)注入来至于ejb容器JNDI的ConnectionFactory对象,例如:
<bean id="seashellSonicXAConnectionFactory"class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiName" value="seashellSonicXAConnectionFactory" /> </bean> <bean id="connectionFactory" class="com.bill99.mdp.connection.MdpConnectionFactory"> <property name="sessionCacheSize" value="10"/> <property name="targetConnectionFactory" ref=" seashellSonicXAConnectionFactory"/> </bean> |
---|
其中红色标志部分为weblogic或OC4J中配置的连接工厂JNDI名称。
MdpXaConnectionFactory
Ø 支持atomikos JTA事务,适应于tomcat等不提供JTA支持的容器
4.3 MDP对象传输方式
Mdp内部采用的时jackson的json传输方式,据网上对比测试,与java serializable相比,在序列化时快2倍多,反序列化时快了近10倍。
另外,采用json的传输方式,避免了不同jvm版本调用时反序列化异常的问题。
缺点是依赖了jackson较高的版本,可能存在与已用应用冲突的可能(目前为止还未发现冲突),依赖的jackson包版本是1.9.2,这是一个二合一的版本,如果项目里以前有依赖于jackson-core-asl和jackson-mapper-asl,请删除。
4.4 MDP同步异步调用的自动识别
老版本的MDP调用方式需要在MdpMethod中指定,新版本的MDP可以根据方法签名自动判断调用方式,规则如下:
方法返回值类型 | 参数类型 | 调用方式 |
void | 不包含MdpCallback类型参数 | 异步 |
void | 包含MdpCallback类型参数 | 异步回调 |
非void | 不包含MdpCallback类型参数 | 同步 |
非void | 包含MdpCallback类型参数 | 非法类型,启动报错 |
举例如下
public void echo(String name); | 异步 |
public void echo(String name, MdpCallback callback); | 异步回调 |
public byte[] echoBinary(byte[] binary); | 同步 |
public String echo(String name, MdpCallback callback); | 非法类型 |
方面这些方法类型中,只有同步的方法支持异常处理,也就是方法签名如public byte[] echoBinary(byte[] binary) throws Exception这样的形式,当在服务端抛出异常时,此异常可以返回到客户端,但这又有一个前提,该方法不能放在JTA事务中,因为事务会直接拦截到方法抛出的异常,导致异常不能返回到客户端。
4.5 性能如何
在单机上的实验表明,对于同步调用,单次平均耗时约14毫秒,异步调用,平均耗时7毫秒。
与用原生的JMS API发送消息性能对比发现,mdp的性能会更高些,对比发送10000条相同大小的消息,原生JMS API平均耗时93秒,MDP为69秒。
为何MDP将JMS包装了,反而性能会更高呢?原因就是MDP采用了缓存的,使用原生的JMS API,通常每次发送消息时,都会需要创建Session和Producer对象,发送完成后立即将这些对象销毁了,而MDP确实将这些对象缓存起来,重复利用,因此性能会更高一些。
4.6 是否与以前的版本兼容
很遗憾,不能向前兼容(估计会挨很多板砖)。因为传输方式由java serializable变为json方式了,所以不能兼容,即使保持java serializable的方式不变,由于配置方式的大修改,也很困难做到向前兼容。
好一点的消息是,可以跟以前版本并存,并且旧版本迁到新版本只需配置上的修改,不涉及java代码修改。
4.7 Topic
注意,当server的destination-type为topic时,concurrency必须是1,否则会收到重复的消息。
4.8 事务
分发送请求和接收请求两端:
对于发送,又分同步和异步两种情况,异步请求支持JTA事务,同步请求不支持JTA事务;
对于接收,支持JTA事务。
但是,为了提高系统的性能和吞吐量,我们推荐尽量不要使用JTA事务,如果要保证数据的一致性,可以采用事后补偿的方式实现。
对于接收端的JTA事务,如果一定要启动,必须注意,如果queue里进来无法处理的消息,将导致这个消息在queue反复回滚,严重影响后面消息的消费。建议对消息的回滚次数和回滚间隔加以限制,如下:
addprop queue <queue-name> maxRedelivery=3,redeliveryDelay=10
其中maxRedelivery=3标示消息最多回滚3次,如3次不成功,则tibco抛弃该消息,redeliveryDelay=10表示每次回滚间隔10秒钟。
4.9 幂等性概念应用
简单的说,幂等性概念就是对于同一个接口,同样的输入参数,一次调用与N次调用对系统产生的作用是相同的。这样就要求我们在实现服务时,比如订单请求时,能够判断重复提交。
比如,我们在进行mdp同步调用时,多种原因可能导致调用结果返回超时,但实际上,服务端可能已经执行了,也有可能没有执行,我们在客户端进行补偿操作,需要重复发起请求,这时,服务端的实现就需要考虑重复提提交的订单了。
可以参看:http://coolshell.cn/articles/4787.html
4.10 Mdp Schema在Eclipse中自动提示
1. 将schema从svn中下载到本地,地址如下:http://svn.99bill.net/opt/99billsrc/PMD/SRC/IF/build/if-mdp/if-mdp/src/main/resources/com/bill99/mdp/config/mdp-1.0.xsd
2. 在eclipse中, Window->Preferences->XML->XML Catalog->User Specified Entries窗口中,选择Add 按纽,编辑如下内容:
3. 在bean定义xml中,引入xmlns:mdp="http://www.99bill.com/schema/mdp"、xsi:schemaLocation引入http://www.99bill.com/schema/mdp http://www.99bill.com/schema/mdp/bill99-mdp-1.0.xsd,在编辑xml中即可自动提示mdp的相关属性配置。如下图:
4.11 完整的demo示例
服务端
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mdp="http://www.99bill.com/schema/mdp" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.99bill.com/schema/mdp http://www.99bill.com/schema/mdp/bill99-mdp-1.0.xsd"> <bean id="connectionFactory" class="com.bill99.mdp.connection.MdpConnectionFactory"> <property name="properties" value="classpath:properties/jms.properties"/> <property name="prefix" value="notify"/> </bean> <bean id="mdpTarget" class="com.bill99.mdp.DemoImpl"/> <mdp:service id="mdpDemo" interface="com.bill99.mdp.Demo" destination="mdp.demo" ref="mdpTarget" concurrency="2" server="inf.mdpDemoServer" connection-factory="connectionFactory" > </mdp:service> </beans> |
---|
客户端
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mdp="http://www.99bill.com/schema/mdp" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.99bill.com/schema/mdp http://www.99bill.com/schema/mdp/bill99-mdp-1.0.xsd"> <bean id="connectionFactory" class="com.bill99.mdp.connection.MdpConnectionFactory"> <property name="properties" value="classpath:properties/jms.properties"/> <property name="prefix" value="notify"/> </bean> <mdp:reference id="mdpDemo" interface="com.bill99.mdp.Demo" destination="mdp.demo" concurrency="10" connection-factory="connectionFactory" timeout="10000" source="inf.mdpDemoClient"> </mdp:reference> </beans> |
---|
4.12 源码
http://svn.99bill.net/opt/99billsrc/PMD/SRC/IF/build/if-mdp/if-mdp
4.13 依赖
<dependency org="j2ee" name="jms" rev="6.0" conf="compile->default" /> |
---|
4.14 最新版本
<dependency org="com.99bill" name="if-mdp" rev="4.3.39" conf="compile->default" /> 在实际使用中,使用上述项,打包时不包含该包,会导致报错;应该用以下两项: <dependency org="org.codehaus" name="jackson-all" rev="1.9.2" conf="zip->default"/> |
---|
4.15 Release Notes
4.3.39
- dto序列化错误提示更加友好;
- 支持南京架构组if-acms配置管理系统
4.3.33
new feature
- support java generic type
4.3.31
bug fix:
- 远程调用结果为null,无法返回null结果
new feature:
- gracefully shutdown,关闭或卸载应用时,释放daemon线程。
4.3.28
初始版本
这篇关于【消息队列】if-mdp使用手册的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!