Spring rabbitMq 中 correlationId或CorrelationIdString 消费者获取为null的问题-程序员宅基地

技术标签: java  大数据  

问题

在用Spring boot 的 spring-boot-starter-amqp   快速启动 rabbitMq 是遇到了个坑

消费者端获取不到:correlationId或CorrelationIdString

 

问题产生的原因

 

 correlationId 的在 spring rabbitmq 2.0 以后 byte方式会被放弃,所以 目前 代码中有些地方没有改过来,应该算一个BUG

 

@SuppressWarnings("deprecation")
public class DefaultMessagePropertiesConverter implements MessagePropertiesConverter {

    @Deprecated
    public enum CorrelationIdPolicy {
        STRING, BYTES, BOTH
    }

    private static final int DEFAULT_LONG_STRING_LIMIT = 1024;

    private final int longStringLimit;

    private final boolean convertLongLongStrings;

    private volatile CorrelationIdPolicy correlationIdPolicy = CorrelationIdPolicy.BYTES;

}


/**
 * For inbound, determine whether correlationId, correlationIdString or
 * both are populated. For outbound, determine whether correlationIdString
 * or correlationId is used when mapping; if {
     @code CorrelationIdPolicy.BOTH}
 * is set for outbound, String takes priority and we fallback to bytes.
 * Default {
     @code CorrelationIdPolicy.BYTES}.
 * @param correlationIPolicy true to use.
 * @deprecated - the byte[] version of correlation id will be removed in 2.0
 */
@Deprecated
public void setCorrelationIdPolicy(CorrelationIdPolicy correlationIPolicy) {
   setCorrelationIdAsString(correlationIPolicy);
}

@SuppressWarnings("deprecation")
public MessageProperties toMessageProperties(final BasicProperties source, final Envelope envelope,
      final String charset) {
   MessageProperties target = new MessageProperties();
   Map<String, Object> headers = source.getHeaders();
   if (!CollectionUtils.isEmpty(headers)) {
      for (Map.Entry<String, Object> entry : headers.entrySet()) {
         String key = entry.getKey();
         if (MessageProperties.X_DELAY.equals(key)) {
            Object value = entry.getValue();
            if (value instanceof Integer) {
               target.setReceivedDelay((Integer) value);
            }
         }
         else {
            target.setHeader(key, convertLongStringIfNecessary(entry.getValue(), charset));
         }
      }
   }
   target.setTimestamp(source.getTimestamp());
   target.setMessageId(source.getMessageId());
   target.setReceivedUserId(source.getUserId());
   target.setAppId(source.getAppId());
   target.setClusterId(source.getClusterId());
   target.setType(source.getType());
   Integer deliveryMode = source.getDeliveryMode();
   if (deliveryMode != null) {
      target.setReceivedDeliveryMode(MessageDeliveryMode.fromInt(deliveryMode));
   }
   target.setDeliveryMode(null);
   target.setExpiration(source.getExpiration());
   target.setPriority(source.getPriority());
   target.setContentType(source.getContentType());
   target.setContentEncoding(source.getContentEncoding());
   String correlationId = source.getCorrelationId();
   if (!CorrelationIdPolicy.BYTES.equals(this.correlationIdPolicy) && correlationId != null) {
      target.setCorrelationIdString(correlationId);
   }
   if (!CorrelationIdPolicy.STRING.equals(this.correlationIdPolicy)) {
      if (correlationId != null) {
         try {
            target.setCorrelationId(source.getCorrelationId().getBytes(charset));
         }
         catch (UnsupportedEncodingException ex) {
            throw new AmqpUnsupportedEncodingException(ex);
         }
      }
   }
   String replyTo = source.getReplyTo();
   if (replyTo != null) {
      target.setReplyTo(replyTo);
   }
   if (envelope != null) {
      target.setReceivedExchange(envelope.getExchange());
      target.setReceivedRoutingKey(envelope.getRoutingKey());
      target.setRedelivered(envelope.isRedeliver());
      target.setDeliveryTag(envelope.getDeliveryTag());
   }
   return target;
}

public BasicProperties fromMessageProperties(final MessageProperties source, final String charset) {
   BasicProperties.Builder target = new BasicProperties.Builder();
   target.headers(this.convertHeadersIfNecessary(source.getHeaders()))
      .timestamp(source.getTimestamp())
      .messageId(source.getMessageId())
      .userId(source.getUserId())
      .appId(source.getAppId())
      .clusterId(source.getClusterId())
      .type(source.getType());
   MessageDeliveryMode deliveryMode = source.getDeliveryMode();
   if (deliveryMode != null) {
      target.deliveryMode(MessageDeliveryMode.toInt(deliveryMode));
   }
   target.expiration(source.getExpiration())
      .priority(source.getPriority())
      .contentType(source.getContentType())
      .contentEncoding(source.getContentEncoding());
   @SuppressWarnings("deprecation")
   byte[] correlationId = source.getCorrelationId();
   String correlationIdString = source.getCorrelationIdString();
   if (!CorrelationIdPolicy.BYTES.equals(this.correlationIdPolicy)
         && StringUtils.hasText(correlationIdString)) {
      target.correlationId(correlationIdString);
      correlationId = null;
   }
   if (!CorrelationIdPolicy.STRING.equals(this.correlationIdPolicy)
         && correlationId != null && correlationId.length > 0) {
      try {
         target.correlationId(new String(correlationId, charset));
      }
      catch (UnsupportedEncodingException ex) {
         throw new AmqpUnsupportedEncodingException(ex);
      }
   }
   String replyTo = source.getReplyTo();
   if (replyTo != null) {
      target.replyTo(replyTo);
   }
   return target.build();
}

 

解决方法

 

生产者:

 


public void topicPublish(String content) {
    CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
    rabbitTemplate.convertAndSend(AmqpDirectExchangeConfig.FANOUT_EXCHANGE,"",
            this.buildMessage(content,correlationId.getId()), correlationId);
    this.log.info("消息id-{},消息内容为-{},已发送",correlationId,content);

}

/**
 * 返回rabbitTemplate
 * @return
 */
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
//必须是prototype类型
public RabbitTemplate rabbitRtryTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionAckFactory());
    template.setMessagePropertiesConverter(defaultMessagePropertiesConverter());
    template.setRetryTemplate(rabbitRetry());
    template.setReplyTimeout(2000);//2s秒超时
    return template;
}

@Bean
public MessagePropertiesConverter defaultMessagePropertiesConverter(){
   DefaultMessagePropertiesConverter messagePropertiesConverter=new DefaultMessagePropertiesConverter();
   messagePropertiesConverter.setCorrelationIdPolicy(DefaultMessagePropertiesConverter.CorrelationIdPolicy.STRING);
    return messagePropertiesConverter;
}

消费者:

/**
 * 消息消费者
 * @return
 */
@Bean
public SimpleMessageListenerContainer messageContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionAckFactory());
    container.setQueues(queue1());
    container.setExposeListenerChannel(true);
    container.setMessagePropertiesConverter(defaultMessagePropertiesConverter());
    container.setMaxConcurrentConsumers(1);
    container.setConcurrentConsumers(1);
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
    container.setMessageListener(new ChannelAwareMessageListener  ()  {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            byte[] body = message.getBody();
            MessageProperties messageProperties=message.getMessageProperties();
            log.info("消费者A,从队列{},订阅到CorrelationId=[{}],消息body=[{}]",
                    messageProperties.getConsumerQueue(),
                    messageProperties.getCorrelationIdString(),
                    new String(body,"utf-8"));
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
        }
    });
    return container;
}

 

 




 

转载于:https://www.cnblogs.com/cn-coder/p/7561644.html

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/angji13123/article/details/102278724

智能推荐

pytorch学习(一)利用pytorch训练一个最简单的分类器-------(基于CIFAR10数据集)的句句讲解_classes[labels[i]] for i in range(4)]-程序员宅基地

文章浏览阅读1.3w次,点赞75次,收藏273次。刚学pytorch两周,利用这个分类器学习pytorch的如何运用训练一个分类网络分为以下几个步骤:1数据的加载及预处理2网络模型的设置3.定义损失函数及优化器4.用训练集训练网络5.用测试集测试网络1.数据的加载及预处理1.加载数据集(训练集和测试集)2.扩充数据集防止过拟合2.网络模型的设置网络模型的设置主要是网络结构的设置:本次我们使用的是简单的一个类网络的简单设置..._classes[labels[i]] for i in range(4)]

java判断是否空值_JAVA 判断对象内容是否含有空值-程序员宅基地

文章浏览阅读1.5k次。简单判断对象是否含有NULL值,以及信息描述。packagecom.sicdt.sicsign.bill.api.util;importjava.lang.reflect.InvocationTargetException;importjava.lang.reflect.Method;importjava.util.ArrayList;importjava.util.Arrays;importja..._java如何判断一个twodiseasescreening对象是否有值

实例:GridView实现CheckBox的多选或单选,并根据所选择的行进行数据操作_asp.net gridview checkbox 多选-程序员宅基地

文章浏览阅读1.2w次。实例说明: 有一个用户列表,其中每一个用户对应一个状态。状态有两个值,分别是已激活,未激活。 现要求实现这样的功能, 1. 列表显示出用户名及对应的激活状态。 2. 列表中的每一行行首有一个CheckBox,列表底部有一个Button(激活)。使用者先勾选CheckBox(可多选),在点击激活按钮,即可激活相应勾选的用户。 3. 对于当前状态是激活状态的用户,其行首不应该显示CheckBox.功能截图: 涉及到的知识: Gr_asp.net gridview checkbox 多选

ssm整合spring-security遇到的404错误、一直重定向于登入界面的错误_springsecurity 404-程序员宅基地

文章浏览阅读4.9k次,点赞3次,收藏6次。文章目录1. 404错误1.1 第一种可能1.2 第二种可能2. 无论登入成功还是失败一直重定向在登入界面2.1 第一个可能2.2 第二个可能3. 最后附上我的spring-security的配置文件1. 404错误1.1 第一种可能如果你设置的登入页面是.html页面,则会出现404的问题。因为spring-security要操作页面,都是请求springmvc得到的。spring-sec..._springsecurity 404

android 手机通过usb数据线与OTG设备通信_android otg 发指令-程序员宅基地

文章浏览阅读1.8k次。1.首先在AndroidManifest.xml文件中添加所需要的权限&lt;uses-feature android:name="android.hardware.usb.host" /&gt;&lt;uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" /&gt;&lt;uses-permis..._android otg 发指令

MySql取得日期(前一天、某一天) 当前时间等_mysql 查询日期等于今天-程序员宅基地

文章浏览阅读9.2k次。https://www.cnblogs.com/aprils/p/4519796.html取得当天:SELECT curdate();mysql> SELECT curdate();+------------+| curdate()|+------------+| 2013-07-29 |+------------+取得当前日期:mysql> s..._mysql 查询日期等于今天

随便推点

@Component的作用_@component 作用-程序员宅基地

文章浏览阅读1.1k次。@TOC版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/m0_37626813/article/details/78558010 今天在写程序的时候看见一个以前没有见过的注解(@Component),在网上查找过后,经过实践,决定把它记录下来。 1、@controller 控制器(注入服务)用于标注控制层,相当于st..._@component 作用

javascript 自调用匿名函数-程序员宅基地

文章浏览阅读695次。通常定义函数并调用 function a(){ } a(); 或是 var a = function(){ } a(); 但是查看jquery源码会发现它最外层的架构是这样的 (function( window, undefined ) { // jquery code })(window); 解释:首先jquery是定义了一个匿名函数(

如何实现文件的上传功能?_专机上传怎么实现-程序员宅基地

文章浏览阅读7.1k次,点赞2次,收藏16次。如何用servlet如何实现文件上传:一、用servlet如何实现文件上传: 1.需要先获取你把上传的文件放到哪里(也就是你的存储路径)2.如果你需要上传的不只是一个文件的话,需要先定一个Part集合来得到你要上传的集合,通过用户的请求3.先通过request去拿到你要上传的文件用Part对象接受4.然后就是通过part获取请求头part.getHeader(“content-dis..._专机上传怎么实现

iOS UITableView中异步加载图片 - 解决方案_tableview reloadrows 异步-程序员宅基地

文章浏览阅读6.4k次。问题背景:需要在UITableView中的每一行下载图片,之前使用placeholder,下载好后存在cache中。解决方案:方案一:使用SDWebImage:https://github.com/rs/SDWebImage如何安装及使用在git页面有详细解释,具体使用的代码:#import ...- (UITableViewCell *)tableView_tableview reloadrows 异步

Linux——Squid代理服务器_squid 代理linux-程序员宅基地

文章浏览阅读244次。二、安装及运行控制1.编译安装Aquid—prefix=/usr/local/squid:安装目录—sysconfdir=/etc:单独将配置文件修改到其他目录。—enable-linux-netfilter:使用内核过滤。—enable-async-io=值:异步I/O,提升存储性能—enable-default-err-Ianguage=Simplify_Chinese:错误信..._squid 代理linux

2021-04-16 mp4==>wav+txt==>音频段00:00:00-99:99:99_mp4电台频段-程序员宅基地

文章浏览阅读86次。标注自用 适用性不行 顶多作为tools 拆开用~1)json转txt 并提取需要的类temp数据import jsonimport osimport numpy as npdef readjson(): # // 打开json文件,文件路径要自己改 with open("./draft.json", 'r', encoding='utf-8') as f: temp = json.loads(f.read()) # // 获得 中..._mp4电台频段

推荐文章

热门文章

相关标签