Spring AMQP 1.6完整参考指南-第二部分_rabbitconnectionfactorybean setsslpropertieslocati-程序员宅基地

技术标签: RabbitMQ  

非常感谢 http://www.blogjava.net/qbna350816/archive/2016/08/13/431561.html

RabbitMQ技术学习 https://www.itkc8.com 

 

3. 参考

 

这部分参考文档详细描述了组成Sring AMQP的各种组件. main chapter 涵盖了开发AMQP应用程序的核心类. 这部分也包含了有关示例程序的章节.

3.1 使用 Spring AMQP

在本章中,我们将探索接口和类,它们是使用Spring AMQP来开发应用程序的必要组件 .

3.1.1 AMQP 抽象

介绍

 

Spring AMQP 由少数几个模块组成, 每个都以JAR的形式来表现.这些模块是: spring-amqp和spring-rabbit. spring-amqp模块包含org.springframework.amqp.core 包. 
在那个包中,你会找到表示AMQP核心模块的类. 我们的目的是提供通用的抽象,不依赖于任何特定中间件的实现或AMQP客户端库。
最终用户代码将更具有移植性,以便跨供应商实现,因为它们可以对抽象层开发。这些抽象是使用broker的特定模块实现的,如spring-rabbit。
目前只有一个RabbitMQ实现;而对于抽象的验证,除了RabbitMQ外,也已经在.Net平台上使用Apache Qpid得到了验证。
由于AMQP原则上工作于协议层次,RabbitMQ客户端可以在任何支持相同的协议版本的broker中使用,但目前我们没有测试其它任何broker。

这里的概述假设你已经熟悉了AMQP规范.如果你还没有,那么你可以查看第5章,其它资源中列举的资源.
 

Message

 

AMQP 0-8 和0-9-1 规范没有定义一个消息类或接口.相反,当执行basicPublish()这样的操作的时候, 内容是以字节数组参数进行传递的,其它额外属性也是以单独参数进行传递的. Spring AMQP定义了一个 Message 类来作为AMQP领域模型表示的一部分.Message类的目的是在单个实例中简化封装消息体(body)和属性(header),这样API就可以变得更简单. Message类的定义相当简单.
 

public class Message {

    private final MessageProperties messageProperties;

    private final byte[] body;

    public Message(byte[] body, MessageProperties messageProperties) {
        this.body = body;
        this.messageProperties = messageProperties;
    }

    public byte[] getBody() {
        returnthis.body;
    }

    public MessageProperties getMessageProperties() {
        returnthis.messageProperties;
    }
}

MessageProperties 接口定义了多个共同属性,如messageId, timestamp, contentType 等等. 那些属性可以通过调用setHeader(String key, Object value) 方法使用用户定义头(user-defined headers)来扩展.

 

Exchange

Exchange 接口代表的是AMQP Exchange,它是生产者发送消息的地方.broker虚拟主机中的交换器名称都是唯一的,同时还有少量的其它属性:

public interface Exchange {

    String getName();

    String getExchangeType();

    boolean isDurable();

    boolean isAutoDelete();

    Map<String, Object> getArguments();

}

正如你所看到的, Exchange还有一个type (它是在ExchangeTypes中定义的常量). 基本类型是: DirectTopicFanout,和Headers.
在核心包中,你可以找到每种类型的Exchange 接口实现.这些交换器类型会在处理队列绑定时,行为有所不同.
例如,Direct交换器允许队列以固定路由键进行绑定(通常是队列的名称).
Topic交换器支持使用路由正则表达式(*通配符明确匹配一个,而#通配符可匹配0个或多个). 

Fanout交换器会把消息发布到所有绑定到它上面的队列而不考虑任何路由键.

关于交换器类型的更多信息,查看Chapter 5, Other Resources.

AMQP规范还要求任何broker必须提供一个默认的无名字的(空字符串)Direct交换器.所有声明的队列都可以用它们的名字作为路由键绑定到默认交换器中. 在Section 3.1.4, “AmqpTemplate”你会了解到更多关于在Sring AMQP中使用默认交换器的使用情况.
 

Queue

Queue 代表的是消费者接收消息的组件. 像各种各样的 Exchange 类,我们的实现目标是作为核心AMQP类型的抽象表示.

public class Queue  {

    private final String name;

    private volatile boolean durable;

    private volatile boolean exclusive;

    private volatile boolean autoDelete;

    private volatile Map<String, Object> arguments;

    /**
     * The queue is durable, non-exclusive and non auto-delete.
     *
     * @param name the name of the queue.
     */
 public Queue(String name) {
        this(name, true, false, false);
    }

    // Getters and Setters omitted for brevity

}

注意,构造器需要接受队列名称作为参数.根据实现, admin template可能会提供生成独特队列名称的方法.这些队列作为回复地址或用于临时情景是非常有用的.
基于这种原因,自动生成队列的exclusive和 autoDelete 属性都应该设置为true.

参考 Section 3.1.10, “Configuring the broker” 来了解关于使用命名空间来声明队列,包括队列参数的详细情况.

Binding

生产者发送消息到Exchange,而消费者将从Queue中获取消息,连接Queues与Exchanges之间的绑定对于通过消息来连接生产者和消费者是非常关键的.
在Spring AMQP中,我们定义了一个 Binding 类来表示这些连接. 让我们重新回顾一下绑定队列和交换器的操作.

你可以使用固定的路由键来绑定 Queue 到 DirectExchange上.

new Binding(someQueue, someDirectExchange, "foo.bar")

你可以使用路由正则表达式来绑定Queue到TopicExchange上.

new Binding(someQueue, someTopicExchange, "foo.*")

你可以不使用路由键来绑定Queue到FanoutExchange上.

new Binding(someQueue, someFanoutExchange)

我们还提供了BindingBuilder来方便操作.

Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");

上面展示的BindingBuilder 类很清晰,但如果为bind()方法使用静态导入,这种形式将工作得更好.

本身来说,Binding类的实例只能一个connection中持有数据.换句话说,它不是一个活力(active)组件.
但正如在后面Section 3.1.10, “Configuring the broker”看到的, Binding实例可由AmqpAdmin 类来触发broker上的绑定操作.
同样,在同一个章节中,你还会看到Binding实例可在@Configuration类中使用Spring @Bean风格来定义
还有方便的基类来简化生成AMQP相关bean定义和识别队列,交换器,绑定的方法,这样当AMQP broker运行程序启动时,就可以得到声明.

AmqpTemplate 也在核心包中定义.作为涉及AMQP消息的主要组件, 会在它自己的章节中进行详细介绍(参考Section 3.1.4, “AmqpTemplate”).

3.1.2 连接和资源管理

介绍

虽然我们在前面章节中描述的AMQP模型是通用的,适用于所有实现,但当我们说到资源管理时,其细节是针对特定broker实现的.因此,在这个章节中,我们只关注我们的"spring-rabbit"模块,因为到现在为止,RabbitMQ是唯一支持的实现.

RabbitMQ broker中用于管理连接的中心组件是ConnectionFactory 接口. ConnectionFactory实现的责任是提供一个org.springframework.amqp.rabbit.connection.Connection 的实例,它包装了com.rabbitmq.client.Connection
我们提供的唯一具体实现提CachingConnectionFactory,默认情况下,会建立应用程序可共享的单个连接代理.连接共享是可行的,因为在AMQP处理消息的工作单位实际是 "channel" (在某些方面,这类似于JMS中Connection 和 Sessionin的关系).
你可以想象,连接实例提供了一个createChannel方法。CachingConnectionFactory 实现支持这些channels的缓存,它会基于它们是否是事务的来单独维护其缓存. 
当创建CachingConnectionFactory的实例时, hostname 可通过构造器来提供,username 和password 属性也可以提供.如果你想配置channel缓存的大小(默认是25),你可以调用setChannelCacheSize()方法.

从1.3版本开始,CachingConnectionFactory 也可以同channel一样,配置缓存连接.在这种情况下每次调用createConnection() 都会创建一个新连接(或者从缓存中获取空闲的连接).
关闭连接会将其返回到缓存中(如果还没有达到缓存大小的话).在这些连接上创建的Channels同样也会被缓存. 单独连接的使用在某些环境中是有用的,如从HA 集群中消费, 连接负载均衡器,连接不同的集群成员.设置cacheMode 为 CacheMode.CONNECTION.

这不会限制连接的数目,它用于指定允许空闲打开连接的数目.

从1.5.5版本开始,提供了一个新属性connectionLimit.当设置了此属性时,它会限制连接的总数目,当达到限制值时,将channelCheckoutTimeLimit 来等待空闲连接.如果时间超时了,将抛出AmqpTimeoutException.

重要

当缓存模式是CONNECTION时, 队列的自动声明等等 (参考 the section called “Automatic Declaration of Exchanges, Queues and Bindings”) 将不再支持.

 

此外,在写作的时候,rabbitmq-client 包默认为每个连接(5个线程)创建了一个固定的线程池. 当需要使用大量连接时,你应该考虑在CachingConnectionFactory定制一个executor. 然后,同一个executor会用于所有连接,其线程也是共享的.  
executor的线程池是没有界限的或按预期使用率来设置(通常, 一个连接至少应该有一个线程).如果在每个连接上创建了多个channels,那么池的大小会影响并发性,因此一个可变的线程池executor应该是最合适的.

 

理解缓存大小不是限制是很重要的, 它仅仅是可以缓存的通道数量.当说缓存大小是10时,在实际使用中,其实可以是任何数目的通道. 如果超过10个通道被使用,他们都返回到高速缓存,10个将在高速缓存中,其余的将物理关闭。

从1.6版本开始,默认通道缓存大小从1增加到了25. 在高容量,多线程环境中,较小的缓存意味着通道的创建和关闭将以很高的速率运行.加大默认缓存大小可避免这种开销.
你可以监控通道的使用情况(通过RabbitMQ Admin UI) ,如果看到有很多通道在创建和关闭,你可以增大缓存大小.缓存只会增长按需(以适应应用程序的并发性要求),所以这个更改不会影响现有的低容量应用程序。

从1.4.2版本开始,CachingConnectionFactory 有一个channelCheckoutTimeout属性. 当此属性的值大于0时,channelCacheSize 会变成连接上创建通道数目的限制.
如果达到了限制,调用线程将会阻塞,直到某个通道可用或者超时, 在后者的情况中,将抛出AmqpTimeoutException 异常.

在框架(如.RabbitTemplate)中使用的通道将会可靠地返回到缓存中.如果在框架外创建了通道 (如.直接访问connection(s)并调用createChannel()),你必须可靠地返回它们(通过关闭),也许需要在 finally 块中以防止耗尽通道.

CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");

Connection connection = connectionFactory.createConnection();

当使用XML时,配置看起来像下面这样:

<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username"value="guest"/>
<property name="password" value="guest"/>
</bean>

这里也有一个 SingleConnectionFactory 实现,它只能用于框架的单元测试代码中.它比CachingConnectionFactory 简单,因为它不会缓存通道,由于其缺乏性能和韧性,它不打算用于简单的测试以外的实际使用
如果基于某些原
因,你需要自己来实现ConnectionFactory ,AbstractConnectionFactory 基类提供了一个非常好的起点.

ConnectionFactory 可使用rabbit命名空间来快速方便的建立:

<rabbit:connection-factory id="connectionFactory"/>

在多数情况下,这是很好的,因为框架会为你选择最好的默认值.创建的实例会是CachingConnectionFactory.要记住,默认的缓存大小是25.如果你想缓存更多的通道,你可以设置channelCacheSize 属性值.在XML中,它看起来像下面这样:

<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="50"/>
</bean>

在命名空间中,你也可以添加channel-cache-size 属性:

<rabbit:connection-factory id="connectionFactory" channel-cache-size="50"/>

默认的缓存模式是CHANNEL, 但你可以使用缓存连接来替换;在这种情况下,我们会使用connection-cache-size:

<rabbit:connection-factory id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>

Host 和 port 属性也可以在命名空间中提供:

<rabbit:connection-factory id="connectionFactory" host="somehost" port="5672"/>

此外,如果运行集群环境中,使用addresses属性.

<rabbit:connection-factory id="connectionFactory" addresses="host1:5672,host2:5672"/>

下面是一个自定义的线程工厂,其前辍线程名称为rabbitmq-.

<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567" thread-factory="tf" channel-cache-size="10" username="user" password="password" />
<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
<constructor-arg value="rabbitmq-" />
</bean>

配置底层客户端连接工厂

CachingConnectionFactory 使用的是 Rabbit client ConnectionFactory的实例; 当在CachingConnectionFactory设置等价属性时,许多属性(host, port, userName, password, requestedHeartBeat, connectionTimeout) 来传递. 
要设置其它属性(例如clientProperties),可定义一个rabbit factory 的实例,并使用CachingConnectionFactory的适当构造器来提供引用
当使用上面提到的命名空间时,要在connection-factory属性中提供一个工厂的引用来配置. 为方便起见,提供了一个工厂,以协助在一个Spring应用程序上下文中配置连接工厂,在下一节讨论。

<rabbit:connection-factory id="connectionFactory" connection-factory="rabbitConnectionFactory"/>

RabbitConnectionFactoryBean 和配置SSL

从1.4版本开始, 提供了一个便利的RabbitConnectionFactoryBean 类通过依赖注入来配置底层客户端连接工厂的SSL属性.其它设置简单地委派给底层工厂.以前你必须以编程方式配置SSL选项。

<rabbit:connection-factory id="rabbitConnectionFactory"connection-factory="clientConnectionFactory" host="${host}" port="${port}" virtual-host="${vhost}" username="${username}" password="${password}" />
<bean id="clientConnectionFactory" class="org.springframework.xd.dirt.integration.rabbit.RabbitConnectionFactoryBean">
<property name="useSSL" value="true" />
<property name="sslPropertiesLocation" value="file:/secrets/rabbitSSL.properties"/>
</bean>

参考 RabbitMQ Documentation 来了解关于配置SSL的更多信息. 省略的keyStore 和 trustStore 配置将在无证书验证的情况下,通过SSL来连接. Key和trust store 配置可以按如下提供:

sslPropertiesLocation 属性是一个Spring Resource ,它指向一个包含下面key的属性文件:

keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret

keyStore 的 truststore 是指向store的 Spring Resources .通常情况下,这个属性文件在操作系统之下安全的,应用程序只能读取访问.

从Spring AMQP 1.5版本开始,这些属性可直接在工厂bean上设置.如果同时提供了discrete和 sslPropertiesLocation 属性, 后者属性值会覆盖discrete值.

路由连接工厂

从1.3版本开始,引入了AbstractRoutingConnectionFactory.这提供了一种机制来配置多个ConnectionFactories的映射,并通过在运行时使用lookupKey来决定目标ConnectionFactory

通常,实现会检查线程绑定上下文. 为了方便, Spring AMQP提供了SimpleRoutingConnectionFactory, 它会从SimpleResourceHolder中获取当前线程绑定的lookupKey:

<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
<propertyname="targetConnectionFactories">
<map>
<entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/> 
<entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
</map>
</property>
</bean>
<rabbit:template id="template" connection-factory="connectionFactory" />
public class MyService {

	@Autowired
  private RabbitTemplate rabbitTemplate;

	public  void service(String vHost, String payload) {
		SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
		rabbitTemplate.convertAndSend(payload);
		SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
	}

}

在使用资源后,对其进行解绑是很重要的.更多信息参考AbstractRoutingConnectionFactory的JavaDocs.

从1.4版本开始, RabbitTemplate 支持SpEL sendConnectionFactorySelectorExpression 和receiveConnectionFactorySelectorExpression 属性, 
它会在每个AMQP 协议交互操作(sendsendAndReceivereceiveor receiveAndReply)进行评估, 为提供的AbstractRoutingConnectionFactory类解析lookupKey值
Bean 引用,如"@vHostResolver.getVHost(#root)" 可用于表达式中.对于send 操作,  要发送的消息是根评估对象;对于receive操作, queueName 是根评估对象.

路由算法为:如果selector 表达式为null,或等价于null,或提供的ConnectionFactory 不是AbstractRoutingConnectionFactory的实例,根据提供的ConnectionFactory 实现,
所有的工作都按之前的进行.同样的结果也会发生:如果评估结果不为null,但对于lookupKey 无目标ConnectionFactory,且 the AbstractRoutingConnectionFactory 使用lenientFallback = true进行了配置
当然,在AbstractRoutingConnectionFactory 的情况下,它会基于determineCurrentLookupKey()的路由实现来进行回退. 但,如果lenientFallback = false, 将会抛出 IllegalStateException 异常.

Namespace 在<rabbit:template>组件中也支持send-connection-factory-selector-expression 和receive-connection-factory-selector-expression属性.

也是从1.4版本开始, 你可以在SimpleMessageListenerContainer配置路由连接工厂. 在那种情况下,队列名称的列表将作为lookup key.例如,如果你在容器中配置setQueueNames("foo", "bar"),lookup key将是"[foo,bar]" (无空格).

RabbitMQ技术学习 https://www.itkc8.com 

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/HUXU981598436/article/details/78469461

智能推荐

c# 调用c++ lib静态库_c#调用lib-程序员宅基地

文章浏览阅读2w次,点赞7次,收藏51次。四个步骤1.创建C++ Win32项目动态库dll 2.在Win32项目动态库中添加 外部依赖项 lib头文件和lib库3.导出C接口4.c#调用c++动态库开始你的表演...①创建一个空白的解决方案,在解决方案中添加 Visual C++ , Win32 项目空白解决方案的创建:添加Visual C++ , Win32 项目这......_c#调用lib

deepin/ubuntu安装苹方字体-程序员宅基地

文章浏览阅读4.6k次。苹方字体是苹果系统上的黑体,挺好看的。注重颜值的网站都会使用,例如知乎:font-family: -apple-system, BlinkMacSystemFont, Helvetica Neue, PingFang SC, Microsoft YaHei, Source Han Sans SC, Noto Sans CJK SC, W..._ubuntu pingfang

html表单常见操作汇总_html表单的处理程序有那些-程序员宅基地

文章浏览阅读159次。表单表单概述表单标签表单域按钮控件demo表单标签表单标签基本语法结构<form action="处理数据程序的url地址“ method=”get|post“ name="表单名称”></form><!--action,当提交表单时,向何处发送表单中的数据,地址可以是相对地址也可以是绝对地址--><!--method将表单中的数据传送给服务器处理,get方式直接显示在url地址中,数据可以被缓存,且长度有限制;而post方式数据隐藏传输,_html表单的处理程序有那些

PHP设置谷歌验证器(Google Authenticator)实现操作二步验证_php otp 验证器-程序员宅基地

文章浏览阅读1.2k次。使用说明:开启Google的登陆二步验证(即Google Authenticator服务)后用户登陆时需要输入额外由手机客户端生成的一次性密码。实现Google Authenticator功能需要服务器端和客户端的支持。服务器端负责密钥的生成、验证一次性密码是否正确。客户端记录密钥后生成一次性密码。下载谷歌验证类库文件放到项目合适位置(我这边放在项目Vender下面)https://github.com/PHPGangsta/GoogleAuthenticatorPHP代码示例://引入谷_php otp 验证器

【Python】matplotlib.plot画图横坐标混乱及间隔处理_matplotlib更改横轴间距-程序员宅基地

文章浏览阅读4.3k次,点赞5次,收藏11次。matplotlib.plot画图横坐标混乱及间隔处理_matplotlib更改横轴间距

docker — 容器存储_docker 保存容器-程序员宅基地

文章浏览阅读2.2k次。①Storage driver 处理各镜像层及容器层的处理细节,实现了多层数据的堆叠,为用户 提供了多层数据合并后的统一视图②所有 Storage driver 都使用可堆叠图像层和写时复制(CoW)策略③docker info 命令可查看当系统上的 storage driver主要用于测试目的,不建议用于生成环境。_docker 保存容器

随便推点

网络拓扑结构_网络拓扑csdn-程序员宅基地

文章浏览阅读834次,点赞27次,收藏13次。网络拓扑结构是指计算机网络中各组件(如计算机、服务器、打印机、路由器、交换机等设备)及其连接线路在物理布局或逻辑构型上的排列形式。这种布局不仅描述了设备间的实际物理连接方式,也决定了数据在网络中流动的路径和方式。不同的网络拓扑结构影响着网络的性能、可靠性、可扩展性及管理维护的难易程度。_网络拓扑csdn

JS重写Date函数,兼容IOS系统_date.prototype 将所有 ios-程序员宅基地

文章浏览阅读1.8k次,点赞5次,收藏8次。IOS系统Date的坑要创建一个指定时间的new Date对象时,通常的做法是:new Date("2020-09-21 11:11:00")这行代码在 PC 端和安卓端都是正常的,而在 iOS 端则会提示 Invalid Date 无效日期。在IOS年月日中间的横岗许换成斜杠,也就是new Date("2020/09/21 11:11:00")通常为了兼容IOS的这个坑,需要做一些额外的特殊处理,笔者在开发的时候经常会忘了兼容IOS系统。所以就想试着重写Date函数,一劳永逸,避免每次ne_date.prototype 将所有 ios

如何将EXCEL表导入plsql数据库中-程序员宅基地

文章浏览阅读5.3k次。方法一:用PLSQL Developer工具。 1 在PLSQL Developer的sql window里输入select * from test for update; 2 按F8执行 3 打开锁, 再按一下加号. 鼠标点到第一列的列头,使全列成选中状态,然后粘贴,最后commit提交即可。(前提..._excel导入pl/sql

Git常用命令速查手册-程序员宅基地

文章浏览阅读83次。Git常用命令速查手册1、初始化仓库git init2、将文件添加到仓库git add 文件名 # 将工作区的某个文件添加到暂存区 git add -u # 添加所有被tracked文件中被修改或删除的文件信息到暂存区,不处理untracked的文件git add -A # 添加所有被tracked文件中被修改或删除的文件信息到暂存区,包括untracked的文件...

分享119个ASP.NET源码总有一个是你想要的_千博二手车源码v2023 build 1120-程序员宅基地

文章浏览阅读202次。分享119个ASP.NET源码总有一个是你想要的_千博二手车源码v2023 build 1120

【C++缺省函数】 空类默认产生的6个类成员函数_空类默认产生哪些类成员函数-程序员宅基地

文章浏览阅读1.8k次。版权声明:转载请注明出处 http://blog.csdn.net/irean_lau。目录(?)[+]1、缺省构造函数。2、缺省拷贝构造函数。3、 缺省析构函数。4、缺省赋值运算符。5、缺省取址运算符。6、 缺省取址运算符 const。[cpp] view plain copy_空类默认产生哪些类成员函数

推荐文章

热门文章

相关标签