基于MongoDb的事件订阅实现hook监听

-- 基于MongoDb的事件订阅实现hook监听(insert,update,remove,find等事件开始,事件成功等)
【官网】:https://https://www.mongodb.com/

应用场景

有时需要在mongodb的写入事件中做一些统计,分析时,我们需要hook。本文介绍一种基于订阅事件的hook方案。

基础资源

使用须知

为了安全起见,执行该操作之前请和公司相关负责人沟通确认,另外hook时添加的操作尽量不要影响正常业务,实现一些轻量的,异步的操作即可。另外建议做一些开关之类的配置。

配置步骤

常见问题

快速入门

A)基于事件订阅实现MongoDb 监听(hook)的示例代码.




using ConfigLab.Util;
using MongoDB.Bson;
using MongoDB.Bson.IO;
using MongoDB.Driver;
using MongoDB.Driver.Core.Events;
using System;
using System.Collections.Generic;
using System.Text;

namespace ConfigLab.DAL.Repository.MongoDb
{
    /// <summary>
    /// 作者: http://config.net.cn
    /// </summary>
    public  class BaseMongoContext
    {
        private readonly IMongoDatabase _database;
        public BaseMongoContext()
        {
            //支持订阅时间,方便hook mongo的json命令对象
			var credentials = MongoCredential.CreateCredential(
                databaseName: "BookAds",username: "账号",password: "密码"
            );
            var mongoClient = new MongoClient(new MongoClientSettings()
            {
                Server = new MongoServerAddress("IP", 27022),
                Credential = credentials,
                ConnectionMode = ConnectionMode.Standalone,
                ServerSelectionTimeout = TimeSpan.FromSeconds(15),
                ClusterConfigurator = builder =>
                {
                    builder.Subscribe(new SingleEventSubscriber<CommandStartedEvent>(CmdStartHandlerForFindCommand));//开始执行时的回调
                    builder.Subscribe(new SingleEventSubscriber<CommandSucceededEvent>(CmdSuccessHandlerForFindCommand));//执行成功后的回调
                }
            });
            _database = mongoClient.GetDatabase("BookAds");
        }
        protected IMongoCollection<T> GetCollection<T>(string name){
            return _database?.GetCollection<T>(name);
        }
        private void CmdStartHandlerForFindCommand(CommandStartedEvent cmdStart){
            WriteToLog(cmdStart.Command, "request",$"ConnectId={cmdStart.ConnectionId},requestId={cmdStart.RequestId},ServerId={cmdStart.ServiceId},timestamp={cmdStart.Timestamp}");
        }
        private void CmdSuccessHandlerForFindCommand(CommandSucceededEvent cmdSuccess){
            WriteToLog(cmdSuccess.Reply, "response", $"ConnectId={cmdSuccess.ConnectionId},requestId={cmdSuccess.RequestId},ServerId={cmdSuccess.ServiceId},timestamp={cmdSuccess.Timestamp}");//cmdSuccess.CommandName 
        }
        private void WriteToLog(BsonDocument data, string type,string msg){
            Loger.WriteLog($" MongoDb hook Find {type} ************** {msg}");
            Loger.WriteLog(data.ToJson(
              new JsonWriterSettings
              {
                  Indent = true
              }));
        }
    }
}


B)基于事件订阅实现MongoDb 监听(hook)的示例输出。

==================select
{
  "find" : "Book",
  "filter" : {
    "ClassId" : "20112"
  }

==================insert
hook.request:
基础信息: ConnectId={ ServerId : { ClusterId : 1, EndPoint : "127.0.0.1:27022" }, LocalValue : 3, ServerValue : "16194" },requestId=13,ServerId=,timestamp=2020/6/23 5:49:09 
参数信息:
{
  "insert" : "Book",
  "ordered" : true,
  "documents" : [{
      "_id" : "x20210823114500002",
      "BookType" : 0,
      "ClassId" : "20112",
      "SalesStagegyType" : "api_check_tools_SalesStagegyType1",
      "SalesStagegyTypeOptionId" : "api_check_tools_questionopId1",
      "CreateDate" : ISODate("2020-06-23T05:49:08.924Z"),
      "CreateUserId" : "8086"
    }]

hook.response:
基础信息:ConnectId={ ServerId : { ClusterId : 1, EndPoint : "127.0.0.1:27022" }, LocalValue : 3, ServerValue : "16194" },requestId=13,ServerId=,timestamp=2020/6/23 5:49:09
返回信息:
 {
  "n" : 1,
  "ok" : 1.0
}  
==================updatea
hook.request: 
基础信息:ConnectId={ ServerId : { ClusterId : 1, EndPoint : "127.0.0.1:27022" }, LocalValue : 3, ServerValue : "89012" },requestId=13,ServerId=,timestamp=2020/6/23 5:53:29 
参数信息:
 {
  "update" : "Book",
  "ordered" : true,
  "updates" : [{
      "q" : {
        "_id" : "x20210823114500002"
      },
      "u" : {
        "$set" : {
          "SalesStagegyTypeOptionId" : "api_check_tools_questionopId2"
        }
      }
    }]

hook.response:
基础信息:ConnectId={ ServerId : { ClusterId : 1, EndPoint : "127.0.0.1:27022" }, LocalValue : 3, ServerValue : "89012" },requestId=13,ServerId=,timestamp=2020/6/23 5:53:29 
返回信息:
{
  "n" : 1,
  "nModified" : 1,
  "ok" : 1.0

==================delete
hook.request:
基础信息:ConnectId={ ServerId : { ClusterId : 1, EndPoint : "127.0.0.1:27022" }, LocalValue : 3, ServerValue : "68905" },requestId=13,ServerId=,timestamp=2020/6/23 5:59:25 
参数信息:
{
  "delete" : "Book",
  "ordered" : true,
  "deletes" : [{
      "q" : {
        "_id" : "x20210823114500002"
      },
      "limit" : 1
    }]

hook.response:
基础信息:ConnectId={ ServerId : { ClusterId : 1, EndPoint : "127.0.0.1:27022" }, LocalValue : 3, ServerValue : "68905" },requestId=13,ServerId=,timestamp=2020/6/23 5:59:25 
返回信息:
{
  "n" : 1,
  "ok" : 1.0



参考资料