Spring boot使用Rabbitmq注解

Spring boot使用Rabbitmq注解

三个注解:

@EnableRabbit
@RabbitListener
@RabbitHandler

@EnableRabbit

@EnableRabbit和@Configuration一起使用,可以加在类或者方法上,这个注解开启了容器对注册的bean的@RabbitListener检查。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(RabbitBootstrapConfiguration.class)
public @interface EnableRabbit {
}

从源代码上可以看出这个注解提供了很简单的功能就是引入了另一个配置类:RabbitBootstrapConfiguration。该类注册了两个bean:一个BeanPostProcessor一个ListenerEndpointRegistry。

注册的BeanPostProcessor则会在bean初始化之后扫描@RabbitListener和@RabbitHandler注解。

@Configuration
public class RabbitBootstrapConfiguration {

	@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
		return new RabbitListenerAnnotationBeanPostProcessor();
	}

	@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
	public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
		return new RabbitListenerEndpointRegistry();
	}

}

@RabbitListener

@RabbitListener用于注册Listener时使用的信息:如queue,exchange,key、ListenerContainerFactory和RabbitAdmin的bean name。

@RabbitListener(containerFactory = "rabbitListenerContainerFactory", bindings = @QueueBinding(
        value = @Queue(value = "${mq.config.queue}", durable = "true"),
        exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
        key = "${mq.config.key}"), admin = "rabbitAdmin")

扫描到bean带有该注解后,首先会将注解的内容封装到Endpoint对象中并和ListenerContainerFactory的实例一起添加到上面的RabbitListenerEndpointRegistry实例中。添加的时候会创建相应的ListenerContainer实例并添加Listener对象。

RabbitListenerAnnotationBeanPostProcessor通过RabbitListenerEndpointRegistrar间接持有RabbitListenerEndpointRegistry实例。

@RabbitHandler

@RabbitListener 和 @RabbitHandler结合使用,不同类型的消息使用不同的方法来处理。

public class CommandListener{

    @RabbitHandler
    public void handler1(ApiMessage msg){
        System.out.println(msg);
    }

    @RabbitHandler
    public void handler2(Map msg){
        System.out.println(msg);
    }
}

可能遇到的问题

AMQP错误:ACCESS_REFUSED

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - operation not permitted on the default exchange, class-id=50, method-id=20)

出现这个问题的原因是因为ListenContainer的auto-declare默认为true,container会使用RabbitAdmin去重新声明Queue、Exchange、Binding等对象。如果没有配置RabbitAdmin则会报ACCESS_REFUSED的错误,看官方解释:

Starting with version 1.4, SimpleMessageListenerContainer has this new property.
When set to true (default), the container will use a RabbitAdmin to redeclare all AMQP objects (Queues, Exchanges, Bindings), if it detects that at least one of its queues is missing during startup, perhaps because it’s an auto-delete or an expired queue, but the redeclaration will proceed if the queue is missing for any reason. To disable this behavior, set this property to false. Note that the container will fail to start if all of its queues are missing.

在spring-boot中,使用@EnableRabbit,需要在context添加ListenerContainerFactory。但是ListenerContainerFactory没有提供设置container的autoDeclare属性的接口,也没办法为ListenerContainer实例注入RabbitAdmin实例。

解决方案是为@RabbitListener指定RabbitAdmin实例的bean name

@RabbitListener(containerFactory = "rabbitListenerContainerFactory", bindings = @QueueBinding(
        value = @Queue(value = "${mq.config.queue}", durable = "true"),
        exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
        key = "${mq.config.key}"), admin = "rabbitAdmin")

Rabbitmq延迟队列实现

工作中很多场景需要用到定时任务、延迟任务,常用的方法用crontab job、Spring的Quartz,然后扫描整张数据库表,判断哪些数据需要处理。控制的粒度没办法做到特定数据上。
后来就想到了Rabbitmq,Rabbitmq本来不没有延迟队列的功能,但是有个Dead Letter Exchange功能。
DLX是指队列中的消息在下面几种情况下会变为死信(dead letter),然后会被发布到另一个exchange中。
  • 在requeue=false的情况系,消息被client reject
  • 消息过期
  • 队列长度超过限制
有了DLX,就可以将需要延迟的操作设置下次执行时间(如消息的TTL时间)放入一个存储队列中,消息过期后会经由DLX进入监听的队列中。有消费方进行相关的操作,结束或者再次进入存储队列中。
Spring AMQP实现
Configuration:
<rabbit:connection-factory id=”rabbitMQConnectionFactory” requested-heartbeat=”” host=”${rabbit.host}”
                           port=”${rabbit.port}”
                           username=”${rabbit.username}” password=”${rabbit.password}” publisher-confirms=”true”
                           channel-cache-size=”10″/>

<rabbit:admin connection-factory=”rabbitMQConnectionFactory”/>
<!–声明延时队列–>
<rabbit:queue id=”delayQueue” name=”${rabbit.tracking.no.pre.track.delay.queue}”>
    <rabbit:queue-arguments>
        <entry key=”x-dead-letter-exchange” value=”tracking_dead_exchange”/>
    </rabbit:queue-arguments>
</rabbit:queue>
<!–声明监听队列–>
<rabbit:queue id=”preTrackingQueue” name=”${rabbit.tracking.no.pre.track.queue}”/>
<!–声明DLX–>
<rabbit:topic-exchange name=”tracking_dead_exchange”>
    <rabbit:bindings>
        <rabbit:binding pattern=”#” queue=”${rabbit.tracking.no.pre.track.queue}”/>
    </rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:listener-container connection-factory=”rabbitMQConnectionFactory”
                           concurrency=”1″
                           prefetch=”1″
                           acknowledge=”auto”
                           message-converter=”jackson2JsonMessageConverter”>
    <rabbit:listener ref=”trackingListener” method=”handleMessage” queues=”preTrackingQueue”/>
</rabbit:listener-container>
Code:
@Component
public class TrackingListener{
private Logger LOGGER = LoggerFactory.getLogger(TrackingListener.class);
    @Autowired
    private MessageConverter jackson2JsonMessageConverter;
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private Queue delayQueue;

    public void handleMessage(TrackingMessage trackingMessage){
LOGGER.info(“In Function: TrackingListener.handleMessage(trackingMessage={})”, new Object[]{trackingMessage});
        String expiration = 60*60*1000 “”;
        if(trackingMessage.getRecordCount() == && trackingMessage.getStatus() == 0){
//更新运单及订单状态
            trackingMessage.setStatus(1);
        }
MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration(expiration)//one hour
        messageProperties.setTimestamp(new Date());

        rabbitTemplate.send(delayQueue.getName()jackson2JsonMessageConverter.toMessage(trackingMessagemessageProperties));
        LOGGER.info(“End Function: TrackingListener.handleMessage()”);

关于EDM邮件推广

这几天在处理公司EDM邮件推广的问题,EDM推广也是公司流量的一部分流量来源。由于自己也是入门,故收集整理了一些关键词(持续更新)。

独立ip 共享ip:

  • ip信誉度(reputation):共享ip导致信誉度低
  • ESP单个ip的配额限制:共享ip导致超出配额

定制信头

  • 自定义sender address:ESP的发送域名配额,共用的sender address导致超出域名配额;而且会显示“由XXXX代发”字样。不同的sender address和from address会导致垃圾邮件评分偏高

身份验证机制

  • SPF 发送方政策框架:用于登记某个域名拥有的用来外发邮件的所有IP地址。向收信者表明,该发信服务器经过该域名认可,主要针对发信人伪造域名的垃圾邮件。
  • DKIM 域名秘钥识别邮件:私钥放在发信服务器中,发信时生成数字签名放在邮件头中;公钥放在DNS中。收件服务器收到邮件后,会将邮件头中的数字签名和DNS中获取的公钥进行比对,判断发信者的域名是否合法。
  • DMARC Domain-based Message Authentication, Reporting and Conformance,是构建与SPF和DKIM技术上的解决方案,主要针对的是钓鱼邮件。DMARC 的核心思想是邮件的发送方通过特定方式 (DNS) 公开标明自己会用到的发件服务器 (SPF)、并对发出的邮件内容进行签名 (DKIM),而邮件的接收方则检查收到的邮件是否来自发送方授权过的服务器、并且核对签名是否有效。对于未能通过前述检查项目的邮件,接收方则按照发送方指定的策略进行处理【比如直接投入垃圾箱或者拒收】,从而有效避免伪造的钓鱼邮件进入用户的收件箱。

2016-01-22更新

最近一周陆续测试了几家邮件服务商:Mailgun、Rushmail、Submail、Sendcloud、Amazon SES。

  • 大名鼎鼎的Mailgun,公司一直在用。但是企鹅邮箱的到达率有问题,所以对国内地址的发送就忽略mailgun了。
  • Rushmail,效果最好的。邮件发送时会使用多个smtp服务器进行发送,到达率高。最后没采用,因为最贵。
  • Submail,效果一般。邮件模板需要审核,适合发送触发类邮件。推广邮件就太麻烦,每次都要上传模板审核。基本上邮件都会直接进垃圾邮件。测试发型了50多封,且收件人都是我自己注册的邮箱,被Submail封号。忽略
  • Sendcloud,跟Submail差不多,邮件直接进垃圾邮件。mail-tester评分较低,主要是ip被拉黑。
  • Amazon SES,这几家里面最便宜的,发信效果暂时来看都不错,直接送达收件箱。较严重的问题就是没有国内服务器,你懂的,最后通过技术手段解决。正在使用中

关于垃圾邮件测试,推荐mail-tester。

垃圾邮件检测结果 by mail tester.com

commons-beanutils 版本问题

最近在整理公司已有项目,其中一个项目在请求solr系统的时候出现如下错误:

ERROR [ PropertyUtils ] - Method invocation failed.
java.lang.IllegalArgumentException: argument type mismatch
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_75]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.7.0_75]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_75]
	at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_75]
	at org.apache.commons.beanutils.PropertyUtilsBean.invokeMethod(PropertyUtilsBean.java:1773) [commons-beanutils-1.7.0.jar:1.6]
	at org.apache.commons.beanutils.PropertyUtilsBean.setSimpleProperty(PropertyUtilsBean.java:1759) [commons-beanutils-1.7.0.jar:1.6]
	at org.apache.commons.beanutils.PropertyUtilsBean.setNestedProperty(PropertyUtilsBean.java:1648) [commons-beanutils-1.7.0.jar:1.6]
	at org.apache.commons.beanutils.PropertyUtilsBean.setProperty(PropertyUtilsBean.java:1677) [commons-beanutils-1.7.0.jar:1.6]
	at org.apache.commons.beanutils.BeanUtilsBean.setProperty(BeanUtilsBean.java:1022) [commons-beanutils-1.7.0.jar:1.6]
	at org.apache.commons.beanutils.BeanUtilsBean.populate(BeanUtilsBean.java:811) [commons-beanutils-1.7.0.jar:1.6]
	at org.apache.commons.beanutils.BeanUtils.populate(BeanUtils.java:298) [commons-beanutils-1.7.0.jar:1.6]

调整log级别至trace之后发现,solor返回的是ArrayList,而该方法接受的参数为String数组。最后发现是commons-beanutils的版本较低导致的,下面是1.8.0的release notes。

Conversion
----------
The converter implementations have been significantly improved in this release:

1) Arrays: A new "generic" ArrayConverter has been introduced which delegates
           the individual component conversion to an appropriate Converter for
           the component type.
2) Numbers: All numeric Converters now handle conversion between numeric types
           and have improved conversion facilities to and from Strings
           based on formats and/or a specified Locale.
3) Dates: Improvements to the existing SQL Date, Time and Timestamp converters
          now handle conversion between Date/Calendar types and have improved
          conversion facilities to and from Strings based on formats and/or
          a specified Locale. New java.util.Date and Calendar converter
          implementations have been added.