Add barebone to support socket.io
(This is more a temp commit to update the frontend)
This commit is contained in:
89
backend/lib/rabbit.js
Normal file
89
backend/lib/rabbit.js
Normal file
@@ -0,0 +1,89 @@
|
||||
import { logger, RABBITMQ_URI } from '../app';
|
||||
import { Beat } from '../models/beat/beat.model.';
|
||||
import { ISeverity, NotificationType } from '../models/notifications/notification.interface';
|
||||
import { addNotification, Notification } from '../models/notifications/notification.model';
|
||||
import { IPhone } from '../models/phone/phone.interface';
|
||||
import { Phone } from '../models/phone/phone.model';
|
||||
import { User } from '../models/user/user.model';
|
||||
|
||||
interface IBeat {
|
||||
token: string,
|
||||
gpsLocation: Array<number>,
|
||||
battery: number,
|
||||
timestamp: number
|
||||
}
|
||||
|
||||
export class RabbitMQ {
|
||||
connection: amqp.Connection | null = null;
|
||||
channel: amqp.Channel | null = null;
|
||||
|
||||
timeouts: Map<string, NodeJS.Timeout> = new Map<string, NodeJS.Timeout>();
|
||||
|
||||
async init() {
|
||||
this.connection = await amqp.connect(RABBITMQ_URI);
|
||||
this.channel = await this.connection.createChannel();
|
||||
|
||||
this.channel.consume('tracker', async (income) => {
|
||||
if (income === undefined || income === null) return;
|
||||
|
||||
const msg: IBeat = JSON.parse(income.content.toString()) as IBeat
|
||||
|
||||
// Get phone
|
||||
const phone = await Phone.findOne({ androidId: msg.token });
|
||||
if (phone == undefined) {
|
||||
logger.warning(`Received beat from unknown device with id ${msg.token}`);
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info(`New beat from ${phone.displayName} with ${msg.gpsLocation[2]}, ${msg.gpsLocation[3]}m height and accuracy and ${msg.battery}% battery`);
|
||||
|
||||
const newBeat = await Beat.create({
|
||||
phone: phone._id,
|
||||
// [latitude, longitude, altitude]
|
||||
coordinate: [msg.gpsLocation[0], msg.gpsLocation[1], msg.gpsLocation[2]],
|
||||
accuracy: msg.gpsLocation[3],
|
||||
speed: msg.gpsLocation[4],
|
||||
battery: msg.battery,
|
||||
createdAt: msg.timestamp
|
||||
});
|
||||
|
||||
// Broadcast if device became active
|
||||
if (this.timeouts.has(phone.id)) {
|
||||
clearTimeout(this.timeouts.get(phone.id)!!);
|
||||
} else {
|
||||
phone.active = true;
|
||||
await phone.save();
|
||||
this.publish(phone.user.toString(), phone.toJSON(), 'phone_alive', ISeverity.SUCCESS);
|
||||
}
|
||||
|
||||
const timeoutTimer = setTimeout(async () => {
|
||||
this.publish(phone.user.toString(), phone.toJSON(), 'phone_dead', ISeverity.WARN);
|
||||
this.timeouts.delete(phone.id);
|
||||
phone.active = false;
|
||||
await phone.save();
|
||||
}, 30_000);
|
||||
this.timeouts.set(phone.id, timeoutTimer);
|
||||
|
||||
this.publish(phone.user.toString(), newBeat.toJSON(), 'beat');
|
||||
}, { noAck: true });
|
||||
}
|
||||
|
||||
async publish(userId: string, data: any, type: NotificationType, severity = ISeverity.INFO) {
|
||||
if (this.connection == undefined) await this.init();
|
||||
|
||||
const user = await User.findById(userId);
|
||||
if (user === null) return;
|
||||
|
||||
/* Manage notifications */
|
||||
if (type != 'beat') {
|
||||
if (type == 'phone_alive' || type == 'phone_dead') {
|
||||
addNotification(type, severity, ((data as IPhone)._id), user);
|
||||
}
|
||||
}
|
||||
|
||||
data = { type, severity, ...data };
|
||||
console.log('Send:', data);
|
||||
|
||||
this.channel?.publish('amq.topic', userId, Buffer.from(JSON.stringify(data)));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user