【后端开发实习】用MongoDB和Redis实现消息队列搭建分布式邮件消息系统

07-17 1863阅读

用Redis实现消息队列并搭建分布式邮件消息系统

  • 系统介绍
  • Redis实现消息队列
    • 思路分析
    • 代码实现
    • MongoDB监听数据变化
      • 思路分析
      • 代码实现
        • Mongoose测试连接
        • 监听mongodb数据变化
        • 注意点

          系统介绍

          本次要实现的是一个能够实现实时监控Mongodb中数据变化的系统,要能够在数据发生变动的时候实时将变动消息发送给指定的邮箱。

          • Node.js:用于开发的语言,既能用于前端开发,又能用来做后端开发。
          • Redis:用于搭建消息队列,实现消息的分布式。
          • MongoDB:持久化数据,同时实现触发条件的监听,当MongoDB中有新增数据的时候发送新增数据的邮件消息。

            Redis实现消息队列

            思路分析

            主要使用的就是Redis-smq这个库,下面展示的就是主要使用的消息队列类,其中包括了很多队列种类,有先进先出、优先级先出等方式。

            【后端开发实习】用MongoDB和Redis实现消息队列搭建分布式邮件消息系统

            整个库的原理如下结构图,本次使用到的只有主线,就是发送和接收:

            【后端开发实习】用MongoDB和Redis实现消息队列搭建分布式邮件消息系统

            代码实现

            const { transemail } = require('../email_list/email.js');
            const redis = require('promise-redis-client');
            const redisHost = 'localhost';
            const redisPort = 6379;
            // 配置 Redis 客户端
            const createRedisClient = () => {
                return new Promise((resolve, reject) => {
                    let client = redis.createClient({ host: redisHost, port: redisPort });
                    client.on('error', err => {
                        console.log('Redis 连接出错');
                        reject(err);
                    });
                    client.on('ready', () => {
                        console.log('Redis ready');
                        resolve(client);
                    });
                });
            };
            async function startWaitMsg(redisClient) {
                while (true) {
                    let res = null;
                    try {
                        res = await redisClient.brpop('bookChanges', 0);
                        console.log('收到消息', res);
                    } catch (err) {
                        console.log('brpop 出错,重新 brpop');
                        continue;
                    }
                    res = res.toString();
                    transemail(res);
                }
            }
            async function listenredis() {
                try {
                    // 启动生产者
                    // startProducer();
                    // 创建 Redis 客户端
                    const redisClient = await createRedisClient();
                    // 启动消息监听
                    startWaitMsg(redisClient);
                } catch (error) {
                    console.error('Error:', error);
                }
            }
            //测试的时候使用的代码
            listenredis().catch(console.error);
            // 处理退出信号以关闭客户端
            process.on('SIGINT', async () => {
                console.log('Closing clients...');
                process.exit(0);
            });
            

            MongoDB监听数据变化

            思路分析

            由于要实现实时检测,经过分析以后使用mongoose中的数据流监控最为合适,但是要实现这个方法需要用到watch方法,这个方法只有在mongodb有副本集的时候才能使用,因此还需要提前配置好mongodb才能进行这里下一步的操作,如果没有配置过mongodb的副本集的可以参考我的这篇博客。

            1. 用mongoose中的watch连接mongodb副本集数据库获取数据变化。
            2. 将数据变化发送到redis消息队列中。

            首先在命令行中将服务启动:

            【后端开发实习】用MongoDB和Redis实现消息队列搭建分布式邮件消息系统

            代码实现

            Mongoose测试连接

            const mongoose = require('mongoose');
            mongoose.connect('mongodb://localhost/test', {
              useNewUrlParser: true,
              useUnifiedTopology: true
            }).then(() => {
              console.log('Successfully connected to MongoDB');
              const bookSchema = new mongoose.Schema({
                title: String,
                author: String
              });
              const Book = mongoose.model('Book', bookSchema);
              const bookChangeStream = Book.watch();
              bookChangeStream.on('change', (change) => {
                console.log('Collection changed:', change);
                if (change.operationType === 'insert') {
                  console.log('New book added:', change.fullDocument);
                }
              });
            }).catch((error) => {
              console.log('Error connecting to MongoDB:', error);
            });
            

            【后端开发实习】用MongoDB和Redis实现消息队列搭建分布式邮件消息系统

            测试结果:

            在Mongo Campass中添加数据以后,在终端中出现如下消息:

            【后端开发实习】用MongoDB和Redis实现消息队列搭建分布式邮件消息系统

            证明测试成功,可以进行下一步操作啦!

            监听mongodb数据变化

            const redis = require('redis');
            const mongoose = require('mongoose');
            // 创建 Redis 客户端
            const redisClient = redis.createClient({
            	host: 'localhost',
            	port: 6379
              });
              
              // 连接到 Redis
            redisClient.connect();
              
            //连接mongodb数据库并检测变化发送到redis消息队列
            async function connectAndMonitorMongoDB(redisClient) {
              try {
                await mongoose.connect('mongodb://localhost/test', {
                  useNewUrlParser: true,
                  useUnifiedTopology: true
                });
                console.log('Successfully connected to MongoDB');
                const bookSchema = new mongoose.Schema({
                  title: String,
                  author: String
                });
                const Book = mongoose.model('Book', bookSchema);
                const bookChangeStream = Book.watch();
            	try{
            		bookChangeStream.on('change', (change) => {
            			console.log('Collection changed:', change);
            			console.log("type of change:",typeof(change));
            			msg = JSON.stringify(change.fullDocument);
            			msg = msg.replace(/{|}/g, '');
            			msg = "New message received:"+msg;
            			console.log("massage:",msg);
            			console.log("type of message:",typeof(msg));
            			if (change.operationType === 'insert') {
            			  console.log('New book added:', msg);
            			  redisClient.lPush('bookChanges', msg, function(err, reply) {
            				if (err) {
            				  console.log('Error storing JSON to Redis:', err);
            				} else {
            				  console.log('JSON stored successfully, list length:', reply);
            				}})
            			}
            		  });
            	}catch (err){
            		console.log("error while loading data into redis:", err)
            	}
              } catch (error) {
                console.log('Error connecting to MongoDB:', error);
              }
            }
            // module.exports = { connectAndMonitorMongoDB };
            async function main() {
              try {
                await connectAndMonitorMongoDB(redisClient);
                console.log('Monitoring MongoDB changes...');
              } catch (error) {
                console.error('Failed to start monitoring:', error);
              }
            }
            main();
            

            注意点

            在nodejs中将JSON对象转换成字符串的JSON.Stringify函数并不是严格的转换成字符串而是带有一个大括号,然而这个在进行redis进队列的时候会有问题,因此需要用正则表达式去掉大括号:

            msg = JSON.stringify(change.fullDocument);
            msg = msg.replace(/{|}/g, '');
            msg = "New message received:"+msg;
            
VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]