Files
Livebeat/backend/lib/rabbit.ts
Mondei1 8b54431449 Devices can now subscribe to specific topics.
- Device can now become (in)active
- Error alert makes sound
- Alerts now execute function on click
2020-11-14 21:15:05 +01:00

75 lines
2.8 KiB
TypeScript

import * as amqp from 'amqplib';
import { logger, RABBITMQ_URI } from '../app';
import { Beat } from '../models/beat/beat.model.';
import { Phone } from '../models/phone/phone.model';
interface IBeat {
token: string,
gpsLocation: Array<number>,
battery: number,
timestamp: number
}
export class RabbitMQ {
connection: amqp.Connection | null = null;
channel: amqp.Channel | null = null;
timeouts: Map<string, Timeout> = new Map<string, Timeout>();
async init() {
this.connection = await amqp.connect(RABBITMQ_URI);
this.channel = await this.connection.createChannel();
this.channel.consume('tracker', async (income) => {
if (income === undefined || income === null) return;
const msg: IBeat = JSON.parse(income.content.toString()) as IBeat
// Get phone
const phone = await Phone.findOne({ androidId: msg.token });
if (phone == undefined) {
logger.warning(`Received beat from unknown device with id ${msg.token}`);
return;
}
logger.info(`New beat from ${phone.displayName} with ${msg.gpsLocation[2]}, ${msg.gpsLocation[3]}m height and accuracy and ${msg.battery}% battery`);
const newBeat = await Beat.create({
phone: phone._id,
// [latitude, longitude, altitude]
coordinate: [msg.gpsLocation[0], msg.gpsLocation[1], msg.gpsLocation[2]],
accuracy: msg.gpsLocation[3],
speed: msg.gpsLocation[4],
battery: msg.battery,
createdAt: msg.timestamp
});
// Broadcast if device became active
if (this.timeouts.has(phone.id)) {
clearTimeout(this.timeouts.get(phone.id));
} else {
logger.debug('Set phone active');
phone.active = true;
await phone.save();
this.publish(phone.user.toString(), phone.toJSON(), 'phone_alive');
}
const timeoutTimer = setTimeout(async () => {
this.publish(phone.user.toString(), phone.toJSON(), 'phone_dead');
this.timeouts.delete(phone.id);
phone.active = false;
await phone.save();
}, 30_000);
this.timeouts.set(phone.id, timeoutTimer);
this.publish(phone.user.toString(), newBeat.toJSON(), 'beat');
}, { noAck: true });
}
async publish(userId: string, data: any, type: 'beat' | 'phone_alive' | 'phone_dead' | 'phone_register' | 'panic' = 'beat') {
if (this.connection == undefined) await this.init()
data = { type, ...data };
this.channel?.publish('amq.topic', userId, Buffer.from(JSON.stringify(data)));
}
}