RabbitMQ Stream插件使用详解

2024-04-23 1587阅读

RabbitMQ Stream插件使用详解

2.4版为RabbitMQ流插件引入了对RabbitMQStream插件Java客户端的初始支持。

  • RabbitStreamTemplate
  • StreamListener容器

    将spring rabbit流依赖项添加到项目中:

      org.springframework.amqp
      spring-rabbit-stream
      3.1.4
    

    您可以使用RabbitAdmin bean,使用QueueBuilder.stream()方法指定队列类型,正常地配置队列。例如:

    @Bean
    Queue stream() {
        return QueueBuilder.durable("stream.queue1")
                .stream()
                .build();
    }

    然而,这仅在您还使用non-stream 组件(如SimpleMessageListenerContainer或DirectMessageListeneerContainer)时才有效,因为在打开AMQP连接时会触发管理员来声明定义的bean。如果您的应用程序仅使用流组件,或者您希望使用高级流配置功能,则应改为配置StreamAdmin:

    @Bean
    StreamAdmin streamAdmin(Environment env) {
        return new StreamAdmin(env, sc -> {
            sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();
            sc.stream("stream.queue2").create();
        });
    }

    一、Sending Messages

    RabbitStreamTemplate提供RabbitTemplate(AMQP)功能的子集。

    public interface RabbitStreamOperations extends AutoCloseable {
    	CompletableFuture send(Message message);
    	CompletableFuture convertAndSend(Object message);
    	CompletableFuture convertAndSend(Object message, @Nullable MessagePostProcessor mpp);
    	CompletableFuture send(com.rabbitmq.stream.Message message);
    	MessageBuilder messageBuilder();
    	MessageConverter messageConverter();
    	StreamMessageConverter streamMessageConverter();
    	@Override
    	void close() throws AmqpException;
    }

    RabbitStreamTemplate实现具有以下构造函数和属性:

    public RabbitStreamTemplate(Environment environment, String streamName) {
    }
    public void setMessageConverter(MessageConverter messageConverter) {
    }
    public void setStreamConverter(StreamMessageConverter streamConverter) {
    }
    public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
    }

    MessageConverter在convertAndSend方法中用于将对象转换为Spring AMQP消息。

    StreamMessageConverter用于将Spring AMQP消息转换为本机流消息。

    您也可以直接发送本机流消息;使用messageBuilder()方法提供对生产者的消息生成器的访问。

    ProducerCustomizer提供了一种机制,用于在生成生产者之前对其进行自定义。

     二、Receiving Messages

    异步消息接收由StreamListenerContainer(以及使用@RabbitListener时的StreamRabbitListerContainerFactory)提供。

    侦听器容器需要一个Environment以及一个流名称。

    您可以使用经典的MessageListener接收Spring AMQP消息,也可以使用新接口接收本地流消息:

    public interface StreamMessageListener extends MessageListener {
    	void onStreamMessage(Message message, Context context);
    }

    有关支持的属性的信息,请参阅消息侦听器容器配置。

    与模板类似,容器具有ConsumerCustomizer属性。

    有关自定义环境和使用者的信息,请参阅Java客户端文档。

    使用@RabbitListener时,配置StreamRabbitListerContainerFactory;此时,大多数@RabbitListener属性(并发等)将被忽略。仅支持id、队列、autoStartup和containerFactory。此外,队列只能包含一个流名称。

    三、Examples

    @Bean
    RabbitStreamTemplate streamTemplate(Environment env) {
        RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
        template.setProducerCustomizer((name, builder) -> builder.name("test"));
        return template;
    }
    @Bean
    RabbitListenerContainerFactory rabbitListenerContainerFactory(Environment env) {
        return new StreamRabbitListenerContainerFactory(env);
    }
    @RabbitListener(queues = "test.stream.queue1")
    void listen(String in) {
        ...
    }
    @Bean
    RabbitListenerContainerFactory nativeFactory(Environment env) {
        StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
        factory.setNativeListener(true);
        factory.setConsumerCustomizer((id, builder) -> {
            builder.name("myConsumer")
                    .offset(OffsetSpecification.first())
                    .manualTrackingStrategy();
        });
        return factory;
    }
    @RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
    void nativeMsg(Message in, Context context) {
        ...
        context.storeOffset();
    }
    @Bean
    Queue stream() {
        return QueueBuilder.durable("test.stream.queue1")
                .stream()
                .build();
    }
    @Bean
    Queue stream() {
        return QueueBuilder.durable("test.stream.queue2")
                .stream()
                .build();
    }

    2.4.5版将adviceChain属性添加到StreamListenerContainer(及其工厂)。还提供了一个新的工厂bean来创建一个无状态重试拦截器,该拦截器带有一个可选的StreamMessageRecoverer,用于在使用原始流消息时使用。

    @Bean
    public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {
        StreamRetryOperationsInterceptorFactoryBean rfb =
                new StreamRetryOperationsInterceptorFactoryBean();
        rfb.setRetryOperations(retryTemplate);
        rfb.setStreamMessageRecoverer((msg, context, throwable) -> {
            ...
        });
        return rfb;
    }

    四、Super Streams

    超级流是分区流的抽象概念,通过将多个流队列绑定到具有参数x-Super-Stream:true的交换来实现。

    1、调配

    为了方便起见,可以通过定义类型为SuperStream的单个bean来提供超级流。

    @Bean
    SuperStream superStream() {
        return new SuperStream("my.super.stream", 3);
    }

    RabbitAdmin检测到这个bean,并将声明交换(my.super.stream)和3个队列(分区)-my.super-stream-n,其中n是0,1,2,绑定的路由密钥等于n。

    如果您还希望通过AMQP向exchange 发布,您可以提供自定义路由密钥:

    @Bean
    SuperStream superStream() {
        return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
    					.mapToObj(j -> "rk-" + j)
    					.collect(Collectors.toList()));
    }

    key 的数量必须等于分区的数量。

    2、向超级流生产消息

    你必须向 RabbitStreamTemplate 添加一个 superStreamRoutingFunction:

    @Bean
    RabbitStreamTemplate streamTemplate(Environment env) {
        RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
        template.setSuperStreamRouting(message -> {
            // some logic to return a String for the client's hashing algorithm
        });
        return template;
    }

    你也可以通过AMQP发布,使用 RabbitTemplate。

VPS购买请点击我

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!

目录[+]