feat: create bulljs job to consume remote ticket creation queue

feat-scaling-ticket-remote-creation
Henrriky 2024-05-22 16:03:26 -03:00
parent f06539ae6f
commit 4f4c2af13a
5 changed files with 305 additions and 205 deletions

View File

@ -21,6 +21,7 @@
"@types/pino": "^6.3.4", "@types/pino": "^6.3.4",
"axios": "^1.2.3", "axios": "^1.2.3",
"bcryptjs": "^2.4.3", "bcryptjs": "^2.4.3",
"bull": "^4.12.8",
"cookie-parser": "^1.4.5", "cookie-parser": "^1.4.5",
"cors": "^2.8.5", "cors": "^2.8.5",
"date-fns": "^2.30.0", "date-fns": "^2.30.0",
@ -55,6 +56,7 @@
"devDependencies": { "devDependencies": {
"@types/bcryptjs": "^2.4.2", "@types/bcryptjs": "^2.4.2",
"@types/bluebird": "^3.5.32", "@types/bluebird": "^3.5.32",
"@types/bull": "^4.10.0",
"@types/cookie-parser": "^1.4.2", "@types/cookie-parser": "^1.4.2",
"@types/cors": "^2.8.7", "@types/cors": "^2.8.7",
"@types/express": "^4.17.13", "@types/express": "^4.17.13",

View File

@ -19,10 +19,8 @@ import ptBR from "date-fns/locale/pt-BR";
import { splitDateTime } from "../helpers/SplitDateTime"; import { splitDateTime } from "../helpers/SplitDateTime";
import format from "date-fns/format"; import format from "date-fns/format";
import ListTicketsServiceCache from "../services/TicketServices/ListTicketServiceCache";
import { searchTicketCache, loadTicketsCache } from "../helpers/TicketCache"; import { Op } from "sequelize";
import { Op, where } from "sequelize";
type IndexQuery = { type IndexQuery = {
searchParam: string; searchParam: string;
@ -49,37 +47,17 @@ interface TicketData {
import ListStatusChatEndService from "../services/StatusChatEndService/ListStatusChatEndService"; import ListStatusChatEndService from "../services/StatusChatEndService/ListStatusChatEndService";
import Ticket from "../models/Ticket"; import Ticket from "../models/Ticket";
import ShowUserServiceReport from "../services/UserServices/ShowUserServiceReport";
import TicketEmiterSumOpenClosedByUser from "../helpers/OnlineReporEmiterInfoByUser"; import TicketEmiterSumOpenClosedByUser from "../helpers/OnlineReporEmiterInfoByUser";
import CountTicketService from "../services/TicketServices/CountTicketService"; import CountTicketService from "../services/TicketServices/CountTicketService";
import CountTicketsByUserQueue from "../services/UserServices/CountTicketsByUserQueue"; import { HttpStatusCode } from "axios";
import ShowUserService from "../services/UserServices/ShowUserService";
import axios from "axios";
import User from "../models/User";
import CheckContactOpenTickets from "../helpers/CheckContactOpenTickets";
import GetDefaultWhatsApp from "../helpers/GetDefaultWhatsApp";
import { getWbot } from "../libs/wbot";
import endPointQuery from "../helpers/old_EndPointQuery";
import Contact from "../models/Contact";
import BotIsOnQueue from "../helpers/BotIsOnQueue"; import BotIsOnQueue from "../helpers/BotIsOnQueue";
import { setMessageAsRead } from "../helpers/SetMessageAsRead"; import { setMessageAsRead } from "../helpers/SetMessageAsRead";
import { getSettingValue } from "../helpers/WhaticketSettings"; import { getSettingValue } from "../helpers/WhaticketSettings";
import ListWhatsAppsForQueueService from "../services/WhatsappService/ListWhatsAppsForQueueService"; import ListWhatsAppsForQueueService from "../services/WhatsappService/ListWhatsAppsForQueueService";
import ListWhatsAppsNumber from "../services/WhatsappService/ListWhatsAppsNumber"; import ListWhatsAppsNumber from "../services/WhatsappService/ListWhatsAppsNumber";
import Whatsapp from "../models/Whatsapp";
import AppError from "../errors/AppError"; import AppError from "../errors/AppError";
import CreateOrUpdateContactService from "../services/ContactServices/CreateOrUpdateContactService";
import FindOrCreateTicketService from "../services/TicketServices/FindOrCreateTicketService";
import CheckIsValidContact from "../services/WbotServices/CheckIsValidContact";
import GetProfilePicUrl from "../services/WbotServices/GetProfilePicUrl";
import CreateContactService from "../services/ContactServices/CreateContactService";
import { botSendMessage } from "../services/WbotServices/wbotMessageListener";
import WhatsappQueue from "../models/WhatsappQueue";
import { del, get, set } from "../helpers/RedisClient"; import { del, get, set } from "../helpers/RedisClient";
import CountStatusChatEndService from "../services/StatusChatEndService/CountStatusChatEndService"; import { initRemoteTicketCreationQueue, remoteTicketCreationQueue } from "../libs/bulljs/remote-ticket-creation-queue";
import Queue from "../models/Queue";
import StatusChatEnd from "../models/StatusChatEnd";
import controllByNumber from "../helpers/controllByNumber";
export const index = async (req: Request, res: Response): Promise<Response> => { export const index = async (req: Request, res: Response): Promise<Response> => {
const { const {
@ -125,11 +103,12 @@ export const remoteTicketCreation = async (
req: Request, req: Request,
res: Response res: Response
): Promise<Response> => { ): Promise<Response> => {
initRemoteTicketCreationQueue();
let { queueId, contact_from, cc, contact_to, msg, contact_name }: any = let { queueId, contact_from, cc, contact_to, msg, contact_name }: any =
req.body; req.body;
let whatsappId: any;
if (!queueId && !contact_from && !cc) { if (!queueId && !contact_from && !cc) {
return res.status(400).json({ return res.status(400).json({
error: `Property 'queueId' or 'contact_from' or 'cc' is required.` error: `Property 'queueId' or 'contact_from' or 'cc' is required.`
@ -154,188 +133,23 @@ export const remoteTicketCreation = async (
} }
} }
if (queueId) { //Create a job to remote ticket creation
const whatsapps = await ListWhatsAppsForQueueService(queueId, "CONNECTED"); remoteTicketCreationQueue.add(`${contact_from}-${contact_to}`, {
input: {
if (!whatsapps || whatsapps?.length == 0) {
return res.status(500).json({
msg: `queueId ${queueId} does not have a WhatsApp number associated with it or the number's session is disconnected.`
});
}
const { id } = whatsapps[0];
whatsappId = id;
} else if (contact_from) {
const whatsapp = await Whatsapp.findOne({
where: { number: contact_from, status: "CONNECTED" }
});
if (!whatsapp) {
return res.status(404).json({
msg: `Whatsapp number ${contact_from} not found or disconnected!`
});
}
const { id } = whatsapp;
const { queues } = await ShowWhatsAppService(id);
if (!queues || queues.length == 0) {
return res.status(500).json({
msg: `The WhatsApp number ${contact_from} is not associated with any queue! `
});
}
queueId = queues[0].id;
whatsappId = id;
} else if (cc) {
const queue = await Queue.findOne({ where: { cc } });
if (!queue) {
return res.status(404).json({
msg: `Queue with cc ${cc} not found! `
});
}
queueId = queue.id;
const whatsapps = await ListWhatsAppsForQueueService(queueId, "CONNECTED");
if (whatsapps.length === 0) {
return res.status(500).json({
msg: `No WhatsApp found for this cc ${cc} or the WhatsApp number is disconnected! `
});
}
whatsappId = whatsapps[0].id;
}
// const validNumber = await CheckIsValidContact(contact_to, true);
const validNumber = contact_to;
if (validNumber) {
let contact = await Contact.findOne({ where: { number: validNumber } });
if (!contact) {
// const profilePicUrl = await GetProfilePicUrl(validNumber);
contact = await CreateContactService({
name: contact_name ? contact_name : contact_to,
number: validNumber
// profilePicUrl
});
const io = getIO();
io.emit("contact", {
action: "create",
contact
});
}
const { id: contactId } = contact;
const botInfo = await BotIsOnQueue("botqueue");
// ticket from queueChoice or bot
let ticket: any = await Ticket.findOne({
where: {
[Op.or]: [
{ contactId, status: "queueChoice" },
{ contactId, status: "open", userId: botInfo.userIdBot }
]
}
});
if (getSettingValue("whatsaAppCloudApi")?.value == "enabled") {
if (ticket) {
await UpdateTicketService({
ticketData: { status: "closed" },
ticketId: ticket.id
});
ticket = null;
}
} else {
if (ticket) {
await UpdateTicketService({
ticketData: { status: "closed" },
ticketId: ticket.id
});
}
}
ticket = await Ticket.findOne({
where: {
[Op.or]: [
{ contactId, status: "pending" },
{ contactId, status: "open" }
]
}
});
if (ticket) {
console.log(
`THE CAMPAIGN TICKET WAS NOT CREATED BECAUSE THE TICKET IS PENDING OR OPEN`
);
return res.status(422).json({
msg: `The campaign ticket was not created because the number ${contact_to} already has a ticket open or pending`
});
}
ticket = await FindOrCreateTicketService(
contact,
whatsappId,
0,
undefined,
queueId, queueId,
true contact_from,
); cc,
contact_to,
// botSendMessage(ticket, `${msg}`); msg,
contact_name,
await ticket.update({ }
lastMessage: msg
});
await set(
`remote:ticketId:${ticket.id}`,
JSON.stringify({
id: ticket.id,
createdAt: ticket.createdAt,
updatedAt: ticket.updatedAt,
whatsappId: ticket.whatsappId,
status: ticket.status
}) })
);
const io = getIO(); return res.status(HttpStatusCode.Processing).json({
io.to(ticket.status).emit("ticket", { msg: "Ticket initialization process started successfully"
action: "update", })
ticket
});
const obj = await controllByNumber();
if (obj?.tickets) {
io.emit("remoteTickesControll", {
action: "update",
tickets: obj.ticketIds
});
}
console.log(
`REMOTE TICKET CREATION FROM ENDPOINT | STATUS: 200 | MSG: success`
);
return res.status(200).json({ msg: "success" });
}
console.log(
`REMOTE TICKET CREATION FROM ENDPOINT | STATUS: 500 | MSG: The number ${contact_to} does not exist on WhatsApp`
);
return res
.status(500)
.json({ msg: `The number ${contact_to} does not exist on WhatsApp` });
}; };
export const store = async (req: Request, res: Response): Promise<Response> => { export const store = async (req: Request, res: Response): Promise<Response> => {

View File

@ -0,0 +1,231 @@
import { Job } from "bull";
import { HttpStatusCode } from "axios";
import { Op } from "sequelize";
import { getIO } from "../socket";
import ListWhatsAppsForQueueService from "../../services/WhatsappService/ListWhatsAppsForQueueService";
import Whatsapp from "../../models/Whatsapp";
import ShowWhatsAppService from "../../services/WhatsappService/ShowWhatsAppService";
import Queue from "../../models/Queue";
import Contact from "../../models/Contact";
import Ticket from "../../models/Ticket";
import CreateContactService from "../../services/ContactServices/CreateContactService";
import UpdateTicketService from "../../services/TicketServices/UpdateTicketService";
import FindOrCreateTicketService from "../../services/TicketServices/FindOrCreateTicketService";
import controllByNumber from "../../helpers/controllByNumber";
import _botIsOnQueue from "../../helpers/BotIsOnQueue";
import { getSettingValue } from "../../helpers/WhaticketSettings";
import { set } from "../../helpers/RedisClient";
export const handleAddRemoteTicketCreationQueue = async (job: Job) => {
try {
const { id, data } = job
console.log(`handleAddRemoteTicketCreationQueue Job started with ID ${id}`)
let { queueId, contact_from, cc, contact_to, msg, contact_name }: any = data;
let whatsappId: any;
if (queueId) {
const whatsapps = await ListWhatsAppsForQueueService(queueId, "CONNECTED");
if (!whatsapps || whatsapps?.length == 0) {
return Promise.reject(new Error(`${HttpStatusCode.InternalServerError}: queueId ${queueId} does not have a WhatsApp number associated with it or the number's session is disconnected.`));
// return res.status(500).json({
// msg: ``
// });
}
const { id } = whatsapps[0];
whatsappId = id;
} else if (contact_from) {
const whatsapp = await Whatsapp.findOne({
where: { number: contact_from, status: "CONNECTED" }
});
if (!whatsapp) {
return Promise.reject(new Error(`${HttpStatusCode.NotFound}: Whatsapp number ${contact_from} not found or disconnected!`));
// return res.status(404).json({
// msg: `Whatsapp number ${contact_from} not found or disconnected!`
// });
}
const { id } = whatsapp;
const { queues } = await ShowWhatsAppService(id);
if (!queues || queues.length == 0) {
return Promise.reject(new Error(`${HttpStatusCode.InternalServerError}: The WhatsApp number ${contact_from} is not associated with any queue!`));
// return res.status(500).json({
// msg: `The WhatsApp number ${contact_from} is not associated with any queue! `
// });
}
queueId = queues[0].id;
whatsappId = id;
} else if (cc) {
const queue = await Queue.findOne({ where: { cc } });
if (!queue) {
return Promise.reject(new Error(`${HttpStatusCode.NotFound}: Queue with cc ${cc} not found! `));
// return res.status(404).json({
// msg: `Queue with cc ${cc} not found! `
// });
}
queueId = queue.id;
const whatsapps = await ListWhatsAppsForQueueService(queueId, "CONNECTED");
if (whatsapps.length === 0) {
return Promise.reject(new Error(`${HttpStatusCode.InternalServerError}: No WhatsApp found for this cc ${cc} or the WhatsApp number is disconnected!`));
// return res.status(500).json({
// msg: `No WhatsApp found for this cc ${cc} or the WhatsApp number is disconnected! `
// });
}
whatsappId = whatsapps[0].id;
}
// const validNumber = await CheckIsValidContact(contact_to, true);
const validNumber = contact_to;
if (validNumber) {
let contact = await Contact.findOne({ where: { number: validNumber } });
if (!contact) {
// const profilePicUrl = await GetProfilePicUrl(validNumber);
contact = await CreateContactService({
name: contact_name ? contact_name : contact_to,
number: validNumber
// profilePicUrl
});
const io = getIO();
io.emit("contact", {
action: "create",
contact
});
}
const { id: contactId } = contact;
const botInfo = await _botIsOnQueue("botqueue");
// ticket from queueChoice or bot
let ticket: any = await Ticket.findOne({
where: {
[Op.or]: [
{ contactId, status: "queueChoice" },
{ contactId, status: "open", userId: botInfo.userIdBot }
]
}
});
if (getSettingValue("whatsaAppCloudApi")?.value == "enabled") {
if (ticket) {
await UpdateTicketService({
ticketData: { status: "closed" },
ticketId: ticket.id
});
ticket = null;
}
} else {
if (ticket) {
await UpdateTicketService({
ticketData: { status: "closed" },
ticketId: ticket.id
});
}
}
ticket = await Ticket.findOne({
where: {
[Op.or]: [
{ contactId, status: "pending" },
{ contactId, status: "open" }
]
}
});
if (ticket) {
console.log(
`THE CAMPAIGN TICKET WAS NOT CREATED BECAUSE THE TICKET IS PENDING OR OPEN`
);
return Promise.reject(
new Error(`${HttpStatusCode.UnprocessableEntity}: The campaign ticket was not created because the number ${contact_to} already has a ticket open or pending`)
);
// return res.status(422).json({
// msg: `The campaign ticket was not created because the number ${contact_to} already has a ticket open or pending`
// });
}
ticket = await FindOrCreateTicketService(
contact,
whatsappId,
0,
undefined,
queueId,
true
);
// botSendMessage(ticket, `${msg}`);
await ticket.update({
lastMessage: msg
});
await set(
`remote:ticketId:${ticket.id}`,
JSON.stringify({
id: ticket.id,
createdAt: ticket.createdAt,
updatedAt: ticket.updatedAt,
whatsappId: ticket.whatsappId,
status: ticket.status
})
);
const io = getIO();
io.to(ticket.status).emit("ticket", {
action: "update",
ticket
});
const obj = await controllByNumber();
if (obj?.tickets) {
io.emit("remoteTickesControll", {
action: "update",
tickets: obj.ticketIds
});
}
console.log(
`REMOTE TICKET CREATION FROM ENDPOINT | STATUS: 200 | MSG: success`
);
return Promise.resolve(); // return res.status(200).json({ msg: "success" });
}
console.log(
`REMOTE TICKET CREATION FROM ENDPOINT | STATUS: 500 | MSG: The number ${contact_to} does not exist on WhatsApp`
);
return Promise.reject(
new Error(`${HttpStatusCode.InternalServerError}: The number ${contact_to} does not exist on WhatsApp`)
);
// return res
// .status(500)
// .json({ msg: `The number ${contact_to} does not exist on WhatsApp` });
} catch (error) {
console.log(`Error trying to execute handleAddRemoteTicketCreation. \nReason:`, error)
return Promise.reject(new Error("Error trying to create a remote ticket in queue"))
}
}

View File

@ -0,0 +1,53 @@
import Queue from "bull";
import cluster from "cluster";
import { handleAddRemoteTicketCreationQueue } from "./remote-ticket-creation-handle";
export let remoteTicketCreationQueue: Queue.Queue<any>;
export function initRemoteTicketCreationQueue() {
if (!remoteTicketCreationQueue) {
remoteTicketCreationQueue = new Queue("remote-ticket-creation-queue", process.env.REDIS_URI as string, {
limiter: {
duration: 5000,
max: 1000,
},
defaultJobOptions: {
attempts: 3,
},
});
const numOfWorkers = 4;
if (cluster.isMaster) {
console.log("Main Worker of remote ticket creation initiated")
for (let i = 0; i < numOfWorkers; i++) {
cluster.fork()
}
cluster.on('exit', function (worker, code, signal) {
console.log(`Worker ${worker.id} of remote ticket creation with code ${code}, signal: ${signal}...`)
});
remoteTicketCreationQueue.on("completed", (job, result) => {
console.log(`Job completed ${result}`)
});
remoteTicketCreationQueue.on("error", (error) => {
console.log(`Error trying to execute Remote Ticket Creation JOB ${error}`)
})
remoteTicketCreationQueue.on("failed", (job) => {
console.log(`Failed trying to execute Remote Ticket Creation JOB ${job.id} with data ${job.data}`)
})
} else {
console.log(`Worker of remote ticket creation initiated`)
remoteTicketCreationQueue.process(numOfWorkers, handleAddRemoteTicketCreationQueue)
}
} else {
console.log(`Warning: Trying to init remote ticket creation queue process, but it is already running.`)
}
}

2
package-lock.json generated
View File

@ -1,5 +1,5 @@
{ {
"name": "whaticket", "name": "projeto-hit",
"lockfileVersion": 2, "lockfileVersion": 2,
"requires": true, "requires": true,
"packages": { "packages": {