Switch from RabbitMQ to server-sent events

(not fully working yet)
This commit is contained in:
2021-05-03 20:58:40 +02:00
parent 533749c7c8
commit daa7209742
38 changed files with 22158 additions and 672 deletions

170
backend/lib/eventManager.ts Normal file
View File

@@ -0,0 +1,170 @@
import { Response } from "express";
import { logger } from "../app";
import { ISeverity, NotificationType, PublicNotificationType } from "../models/notifications/notification.interface";
import { addNotification } from "../models/notifications/notification.model";
import { IPhone } from "../models/phone/phone.interface";
import { IUser } from "../models/user/user.interface";
import { User } from "../models/user/user.model";
import { randomString } from "./crypto";
/**
* This class stores one specific client.
*/
export class Client {
id: string;
userId: string;
stream: Response;
constructor(stream: Response, userId: string) {
this.id = randomString(16);
this.userId = userId;
this.stream = stream;
}
send(type: NotificationType, data: any) {
this.stream.write(`event: ${type}\ndata: ${JSON.stringify(data)}\n\n`);
}
async getUser() {
return await User.findById(this.userId);
}
}
export class Clients {
private clients: Client[];
constructor(clients: Client[]) {
this.clients = clients;
}
getClientsByUser(userId: string) {
const userClients = [];
for (let i = 0; i < this.clients.length; i++) {
if (this.clients[i].userId === userId) {
userClients.push(this.clients[i]);
}
}
return userClients;
}
closeAllClientsByUser(userId: string) {
this.getClientsByUser(userId).forEach(client => {
client.stream.end();
});
}
addClient(client: Client) {
this.clients.push(client);
return client;
}
getClients() {
return this.clients;
}
}
export class EventManager {
constructor() {
setInterval(() => {
this.broadcast('info', { message: "Test" });
}, 2000);
}
// This map stores a open data stream and it's associated room.
private clients: Clients = new Clients([]);
private addClient(stream: Response, userId: string) {
this.clients.addClient(new Client(stream, userId));
}
/**
* Add a client to a specific room
* @param room Used as an id for the specific room
* @param stream A open connection to the user
*/
async join(userId: string, stream: Response) {
if (stream.req == undefined) {
stream.send(500);
return;
}
// Check user
const user = await User.findById(userId);
if (user === null) {
stream.send(401);
return;
}
// Make sure to keep the connection open
stream.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
this.addClient(stream, userId);
logger.debug(`Client ${stream.req.hostname} of user ${user.name} joined.`);
}
/**
* Push a new event into a specific room
* @param event Type of the event
* @param data Content of the event
* @param selector Room to push in. If empty then it will be a public broadcast to anyone.
*/
push(type: NotificationType, data: any, user: IUser, severity = ISeverity.INFO) {
let clients = this.clients.getClientsByUser(user.id);
if (clients === undefined) return;
/* Manage notifications */
if (type != 'beat' && user !== undefined) {
if (type == 'phone_alive' || type == 'phone_dead') {
addNotification(type, severity, ((data as IPhone)._id), user);
}
}
data = { type, severity, ...data };
clients.forEach((client) => {
client.stream.write(`event: ${type}\ndata: ${JSON.stringify(data)}\n\n`);
});
if (user === undefined) {
logger.debug(`Broadcasted event ${type} to all users (${clients.length} clients affected)`);
} else {
logger.debug(`Broadcasted event ${type} to user ${user.id} (${clients.length} clients affected)`);
}
}
/**
* Very much like push() but it will send this message to **every connected client!**
*/
broadcast(type: PublicNotificationType, data: any) {
this.clients.getClients().forEach(async client => {
console.log(`Send ${JSON.stringify(data)} (of type ${type}) to a client of user ${(await client.getUser())?.name}`);
client.stream.write(`event: message\ndata: ${JSON.stringify(data)}\n\n`);
});
logger.debug(`Broadcasted event ${type} to all users (${this.clients.getClients().length} clients affected)`);
}
/**
* End the communication with a specific client.
*/
end(stream: Response, userId: string) {
stream.end();
logger.debug(`End connection with ${stream.req?.hostname} (user: ${userId})`);
}
static buildEventTypeName(type: EventType, user: IUser) {
return `${type}-${user}`;
}
}
export type EventType =
| 'tracker' // Receive just the gps location of a specific user.
| 'user' // Receive user updates.
| 'all'; // Receive all above events.

View File

@@ -63,7 +63,7 @@ export class RabbitMQ {
this.timeouts.delete(phone.id);
phone.active = false;
await phone.save();
}, 30_000);
}, 60_000);
this.timeouts.set(phone.id, timeoutTimer);
this.publish(phone.user.toString(), newBeat.toJSON(), 'beat');