Spring Data访问 MongoDB(九)----可尾游标Tailable Cursors

03-12 1434阅读

【Spring连载】使用Spring Data访问 MongoDB(九)----可尾游标Tailable Cursors

  • 一、使用MessageListener的可尾游标
  • 二、Reactive可尾游标
  • 三、可尾游标与变更流(Change Streams)的关系

    默认情况下,当客户端用完游标提供的所有结果时,MongoDB会自动关闭游标。在耗尽时关闭游标会将流转换为有限流。对于 有上限的集合,可以使用 Tailable Cursor,该Cursor在客户端消耗完所有最初返回的数据后保持打开状态。

    可以使用MongoOperations.createCollection创建有上限的集合。为此,请提供所需的CollectionOptions.empty().capped()…​。

    可尾游标可以与命令式(imperative )与反应式(reactive)MongoDB API一起使用。强烈建议使用反应式变体,因为它的资源密集度较低。然而,如果你不能使用反应式API,那么你仍然可以使用Spring生态系统中已经流行的消息传递(messaging)概念。

    一、使用MessageListener的可尾游标

    使用同步Driver监听一个有上限的集合会创建一个长时间运行的阻塞任务,该任务需要委托给一个单独的组件。在这种情况下,我们需要首先创建一个MessageListenerContainer,它将是运行特定SubscriptionRequest的主要入口点。Spring Data MongoDB已经附带了一个默认实现,该实现在MongoTemplate上运行,能够为TailableCursorRequest创建和运行Task实例。

    Spring Data访问 MongoDB(九)----可尾游标Tailable Cursors
    (图片来源网络,侵删)

    以下示例展示如何将可尾游标与MessageListener实例一起使用:

    例1:使用MessageListener实例的可尾游标

    MessageListenerContainer container = new DefaultMessageListenerContainer(template);
    container.start();                   --------1
    MessageListener listener = System.out::println;                     --------2
    TailableCursorRequest request = TailableCursorRequest.builder()
      .collection("orders")              --------3
      .filter(query(where("value").lt(100)))                                            --------4
      .publishTo(listener)               --------5
      .build();
    container.register(request, User.class);                                            --------6
    // ...
    container.stop();                    --------7
    1. 启动容器将初始化资源,并启动已注册SubscriptionRequest实例的Task实例。启动后添加的Requests会立即运行。
    2. 定义接收消息时调用的监听器。Message#getBody()被转换为请求的域类型。使用“Document”接收未经转换的原始结果。
    3. 设置要监听的集合。
    4. 为要接收的documents提供可选的筛选器。
    5. 设置要将传入消息发布到的消息监听器。
    6. 注册请求。返回的Subscription可用于检查当前任务状态并取消以释放资源。
    7. 一旦确定不再需要容器,请不要忘记停止它。这样做会停止容器中所有正在运行的Task实例。
    

    二、Reactive可尾游标

    将可尾游标与反应(reactive)数据类型一起使用,可以构建无限流。可尾游标在外部关闭它之前一直保持打开状态。当新documents到达一个有上限的集合时,可尾游标会发出数据。

    如果查询未返回匹配项,或者游标返回集合“末尾”的document,然后应用程序删除该document,则可尾游标可能会失效。以下示例展示了如何创建和使用无限流查询:

    例2:使用ReactiveMongoOperations进行无限流查询

    Flux stream = template.tail(query(where("name").is("Joe")), Person.class);
    Disposable subscription = stream.doOnNext(person -> System.out.println(person)).subscribe();
    // …
    // Later: Dispose the subscription to close the stream
    subscription.dispose();
    

    Spring Data MongoDB Reactive存储库通过使用@Tailable注解查询方法来支持无限流。这适用于返回Flux和其他能够发出多种元素的reactive类型的方法,如下例所示:

    例3:使用ReactiveMongoRepository进行无限流查询

    public interface PersonRepository extends ReactiveMongoRepository {
      @Tailable
      Flux findByFirstname(String firstname);
    }
    Flux stream = repository.findByFirstname("Joe");
    Disposable subscription = stream.doOnNext(System.out::println).subscribe();
    // …
    // Later: Dispose the subscription to close the stream
    subscription.dispose();
    

    三、可尾游标与变更流(Change Streams)的关系

    Tailable Cursors 和 Change Streams 都是 MongoDB 中用于监听数据变化的功能,但它们在实现和使用上有一些区别。

    Tailable Cursors 是一种旧的功能,最初用于在 capped collection(固定大小集合)上进行持续的查询。当你使用 tailable cursor 时,查询会一直保持打开状态,如果没有数据,查询不会立即返回,而是会等待新的数据出现,并将新数据返回给客户端。Tailable Cursors 适用于一些特定的场景,例如日志采集和实时数据处理。然而,它们有一些限制,例如无法用于普通的集合,而且可能存在性能上的挑战。

    Change Streams 是 MongoDB 3.6 版本引入的功能,它们提供了一种更强大和灵活的方式来监听集合中的数据变化。通过 Change Stream,你可以监听整个集合甚至是整个数据库的变化,而不仅仅是固定大小集合。Change Streams 支持丰富的过滤和监视选项,可以监听插入、更新和删除操作,并且可以支持复杂的数据变化模式。相比于 Tailable Cursors,Change Streams 更加灵活和强大,适用于更广泛的使用场景。

    因此,虽然 Tailable Cursors 和 Change Streams 都可以用于监听数据变化,但在实现和功能上有一定的区别。一般来说,如果你使用的是较新版本的 MongoDB,推荐使用 Change Streams 来实现数据变化的监听功能。

VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]