Spring Data访问 MongoDB(九)----可尾游标Tailable Cursors
【Spring连载】使用Spring Data访问 MongoDB(九)----可尾游标Tailable Cursors
- 一、使用MessageListener的可尾游标
- 二、Reactive可尾游标
- 三、可尾游标与变更流(Change Streams)的关系
默认情况下,当客户端用完游标提供的所有结果时,MongoDB会自动关闭游标。在耗尽时关闭游标会将流转换为有限流。对于 有上限的集合,可以使用 Tailable Cursor,该Cursor在客户端消耗完所有最初返回的数据后保持打开状态。
可以使用MongoOperations.createCollection创建有上限的集合。为此,请提供所需的CollectionOptions.empty().capped()…。
可尾游标可以与命令式(imperative )与反应式(reactive)MongoDB API一起使用。强烈建议使用反应式变体,因为它的资源密集度较低。然而,如果你不能使用反应式API,那么你仍然可以使用Spring生态系统中已经流行的消息传递(messaging)概念。
一、使用MessageListener的可尾游标
使用同步Driver监听一个有上限的集合会创建一个长时间运行的阻塞任务,该任务需要委托给一个单独的组件。在这种情况下,我们需要首先创建一个MessageListenerContainer,它将是运行特定SubscriptionRequest的主要入口点。Spring Data MongoDB已经附带了一个默认实现,该实现在MongoTemplate上运行,能够为TailableCursorRequest创建和运行Task实例。
(图片来源网络,侵删)以下示例展示如何将可尾游标与MessageListener实例一起使用:
例1:使用MessageListener实例的可尾游标
MessageListenerContainer container = new DefaultMessageListenerContainer(template); container.start(); --------1 MessageListener listener = System.out::println; --------2 TailableCursorRequest request = TailableCursorRequest.builder() .collection("orders") --------3 .filter(query(where("value").lt(100))) --------4 .publishTo(listener) --------5 .build(); container.register(request, User.class); --------6 // ... container.stop(); --------7 1. 启动容器将初始化资源,并启动已注册SubscriptionRequest实例的Task实例。启动后添加的Requests会立即运行。 2. 定义接收消息时调用的监听器。Message#getBody()被转换为请求的域类型。使用“Document”接收未经转换的原始结果。 3. 设置要监听的集合。 4. 为要接收的documents提供可选的筛选器。 5. 设置要将传入消息发布到的消息监听器。 6. 注册请求。返回的Subscription可用于检查当前任务状态并取消以释放资源。 7. 一旦确定不再需要容器,请不要忘记停止它。这样做会停止容器中所有正在运行的Task实例。
二、Reactive可尾游标
将可尾游标与反应(reactive)数据类型一起使用,可以构建无限流。可尾游标在外部关闭它之前一直保持打开状态。当新documents到达一个有上限的集合时,可尾游标会发出数据。
如果查询未返回匹配项,或者游标返回集合“末尾”的document,然后应用程序删除该document,则可尾游标可能会失效。以下示例展示了如何创建和使用无限流查询:
例2:使用ReactiveMongoOperations进行无限流查询
Flux stream = template.tail(query(where("name").is("Joe")), Person.class); Disposable subscription = stream.doOnNext(person -> System.out.println(person)).subscribe(); // … // Later: Dispose the subscription to close the stream subscription.dispose();
Spring Data MongoDB Reactive存储库通过使用@Tailable注解查询方法来支持无限流。这适用于返回Flux和其他能够发出多种元素的reactive类型的方法,如下例所示:
例3:使用ReactiveMongoRepository进行无限流查询
public interface PersonRepository extends ReactiveMongoRepository { @Tailable Flux findByFirstname(String firstname); } Flux stream = repository.findByFirstname("Joe"); Disposable subscription = stream.doOnNext(System.out::println).subscribe(); // … // Later: Dispose the subscription to close the stream subscription.dispose();
三、可尾游标与变更流(Change Streams)的关系
Tailable Cursors 和 Change Streams 都是 MongoDB 中用于监听数据变化的功能,但它们在实现和使用上有一些区别。
Tailable Cursors 是一种旧的功能,最初用于在 capped collection(固定大小集合)上进行持续的查询。当你使用 tailable cursor 时,查询会一直保持打开状态,如果没有数据,查询不会立即返回,而是会等待新的数据出现,并将新数据返回给客户端。Tailable Cursors 适用于一些特定的场景,例如日志采集和实时数据处理。然而,它们有一些限制,例如无法用于普通的集合,而且可能存在性能上的挑战。
Change Streams 是 MongoDB 3.6 版本引入的功能,它们提供了一种更强大和灵活的方式来监听集合中的数据变化。通过 Change Stream,你可以监听整个集合甚至是整个数据库的变化,而不仅仅是固定大小集合。Change Streams 支持丰富的过滤和监视选项,可以监听插入、更新和删除操作,并且可以支持复杂的数据变化模式。相比于 Tailable Cursors,Change Streams 更加灵活和强大,适用于更广泛的使用场景。
因此,虽然 Tailable Cursors 和 Change Streams 都可以用于监听数据变化,但在实现和功能上有一定的区别。一般来说,如果你使用的是较新版本的 MongoDB,推荐使用 Change Streams 来实现数据变化的监听功能。