MongoDB的ChangeStream为用户提供了非常便利的获取变化数据接口,在这里为大家提供一种使用思路和实现方式。
前置条件
1. 数据库必须是复制集群模式,可以是单节点,但不能是standalone
官方提供了复制集群模式的配置方式:
https://docs.mongodb.com/manual/administration/replica-set-deployment/
如果是已经安装好的standalone模式,也可以转换:
https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/
2. 权限说明
需要有相应database的find、changeStream权限
{ resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] }
用到的方法
1. watch()
通过此方法可以获取Change Event数据,从而获得变化数据;该方法可以监听MongoClient、MongoDatabase、MongoCollection不同层面的数据变化。
2. ChangeStreamIterable.fullDocument(FullDocument fullDocument)
通过此方法可以设置Update、Replace操作的Change Event中是否包含fullDocument。
3. ChangeStreamDocument.getFullDocument()
通过此方法可以获取Insert、Update、Replace操作后的Document全部属性值,对于Update、Replace操作需要配合
ChangeStreamIterable.fullDocument(FullDocument fullDocument)方法使用。
4. ChangeStreamDocument.getUpdateDescription()
通过此方法获取Update、Replace的变化属性及删除属性。
5. ChangeStreamIterable.startAtOperationTime()
用于设置获取从哪个时间点开始的变化数据,还有其他几个类似方法,具体请看官方说明(一手资料优于二手资料)。
实现代码
long lastOperationTime = 0; //设置获取从哪个时间点开始的变化数据MongoClient mongoClient = MongoClients.create("mongodb://admin:123456@127.0.0.1:27017/?maxPoolSize=10&w=majority");MongoCursor<ChangeStreamDocument<Document>> cursor = mongoClient.watch() .fullDocument(FullDocument.UPDATE_LOOKUP) .startAtOperationTime(new BsonTimestamp(lastOperationTime)) .iterator();while(cursor.hasNext()){ ChangeStreamDocument result = cursor.next(); result.getNamespace(); //包含database及collection名 result.getOperationType(); //数据变化类型:INSERT、UPDATE、REPLACE、DELETE result.getFullDocument(); //Insert、Update、Replace操作后的Document全部属性值 result.getUpdateDescription().getUpdatedFields(); //Update、Replace操作变化的属性 result.getUpdateDescription().getRemovedFields(); //Update、Replace操作删除的字段}
注意点
1. admin/local/config库不能采集;
2. 采集Update、Replace、Delete数据不会得到变化前的数据,这点与一些数据库不同;
3. url中需要设置w=majority,此参数在部分版本中必须设置,详情可参考官方说明。
声明:本站部分文章及图片源自用户投稿,如本站任何资料有侵权请您尽早请联系jinwei@zod.com.cn进行处理,非常感谢!