MongoDB 数据采集

MongoDB的ChangeStream为用户提供了非常便利的获取变化数据接口,在这里为大家提供一种使用思路和实现方式。

前置条件

1. 数据库必须是复制集群模式,可以是单节点,但不能是standalone

官方提供了复制集群模式的配置方式:

https://docs.mongodb.com/manual/administration/replica-set-deployment/

如果是已经安装好的standalone模式,也可以转换:

https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/

2. 权限说明

需要有相应database的find、changeStream权限

{ resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] }

用到的方法

1. watch()

通过此方法可以获取Change Event数据,从而获得变化数据;该方法可以监听MongoClient、MongoDatabase、MongoCollection不同层面的数据变化。

2. ChangeStreamIterable.fullDocument(FullDocument fullDocument)

通过此方法可以设置Update、Replace操作的Change Event中是否包含fullDocument。

3. ChangeStreamDocument.getFullDocument()

通过此方法可以获取Insert、Update、Replace操作后的Document全部属性值,对于Update、Replace操作需要配合
ChangeStreamIterable.fullDocument(FullDocument fullDocument)方法使用。

4. ChangeStreamDocument.getUpdateDescription()

通过此方法获取Update、Replace的变化属性及删除属性。

5. ChangeStreamIterable.startAtOperationTime()

用于设置获取从哪个时间点开始的变化数据,还有其他几个类似方法,具体请看官方说明(一手资料优于二手资料)。

实现代码

long lastOperationTime = 0; //设置获取从哪个时间点开始的变化数据MongoClient mongoClient = MongoClients.create("mongodb://admin:123456@127.0.0.1:27017/?maxPoolSize=10&w=majority");MongoCursor<ChangeStreamDocument<Document>> cursor = mongoClient.watch()	.fullDocument(FullDocument.UPDATE_LOOKUP)	.startAtOperationTime(new BsonTimestamp(lastOperationTime))	.iterator();while(cursor.hasNext()){  ChangeStreamDocument result = cursor.next();  result.getNamespace(); //包含database及collection名  result.getOperationType(); //数据变化类型:INSERT、UPDATE、REPLACE、DELETE  result.getFullDocument(); //Insert、Update、Replace操作后的Document全部属性值  result.getUpdateDescription().getUpdatedFields(); //Update、Replace操作变化的属性  result.getUpdateDescription().getRemovedFields(); //Update、Replace操作删除的字段}

注意点

1. admin/local/config库不能采集;

2. 采集Update、Replace、Delete数据不会得到变化前的数据,这点与一些数据库不同;

3. url中需要设置w=majority,此参数在部分版本中必须设置,详情可参考官方说明。

声明:本站部分文章及图片源自用户投稿,如本站任何资料有侵权请您尽早请联系jinwei@zod.com.cn进行处理,非常感谢!

上一篇 2022年1月14日
下一篇 2022年1月14日

相关推荐