import { logger, RABBITMQ_URI } from '../app'; import { Beat } from '../models/beat/beat.model.'; import { ISeverity, NotificationType } from '../models/notifications/notification.interface'; import { addNotification, Notification } from '../models/notifications/notification.model'; import { IPhone } from '../models/phone/phone.interface'; import { Phone } from '../models/phone/phone.model'; import { User } from '../models/user/user.model'; interface IBeat { token: string, gpsLocation: Array, battery: number, timestamp: number } export class RabbitMQ { connection: amqp.Connection | null = null; channel: amqp.Channel | null = null; timeouts: Map = new Map(); async init() { this.connection = await amqp.connect(RABBITMQ_URI); this.channel = await this.connection.createChannel(); this.channel.consume('tracker', async (income) => { if (income === undefined || income === null) return; const msg: IBeat = JSON.parse(income.content.toString()) as IBeat // Get phone const phone = await Phone.findOne({ androidId: msg.token }); if (phone == undefined) { logger.warning(`Received beat from unknown device with id ${msg.token}`); return; } logger.info(`New beat from ${phone.displayName} with ${msg.gpsLocation[2]}, ${msg.gpsLocation[3]}m height and accuracy and ${msg.battery}% battery`); const newBeat = await Beat.create({ phone: phone._id, // [latitude, longitude, altitude] coordinate: [msg.gpsLocation[0], msg.gpsLocation[1], msg.gpsLocation[2]], accuracy: msg.gpsLocation[3], speed: msg.gpsLocation[4], battery: msg.battery, createdAt: msg.timestamp }); // Broadcast if device became active if (this.timeouts.has(phone.id)) { clearTimeout(this.timeouts.get(phone.id)!!); } else { phone.active = true; await phone.save(); this.publish(phone.user.toString(), phone.toJSON(), 'phone_alive', ISeverity.SUCCESS); } const timeoutTimer = setTimeout(async () => { this.publish(phone.user.toString(), phone.toJSON(), 'phone_dead', ISeverity.WARN); this.timeouts.delete(phone.id); phone.active = false; await phone.save(); }, 30_000); this.timeouts.set(phone.id, timeoutTimer); this.publish(phone.user.toString(), newBeat.toJSON(), 'beat'); }, { noAck: true }); } async publish(userId: string, data: any, type: NotificationType, severity = ISeverity.INFO) { if (this.connection == undefined) await this.init(); const user = await User.findById(userId); if (user === null) return; /* Manage notifications */ if (type != 'beat') { if (type == 'phone_alive' || type == 'phone_dead') { addNotification(type, severity, ((data as IPhone)._id), user); } } data = { type, severity, ...data }; console.log('Send:', data); this.channel?.publish('amq.topic', userId, Buffer.from(JSON.stringify(data))); } }