From 4f4c2af13a9abcb66badd0976be9c91eaad97981 Mon Sep 17 00:00:00 2001 From: Henrriky Date: Wed, 22 May 2024 16:03:26 -0300 Subject: [PATCH] feat: create bulljs job to consume remote ticket creation queue --- backend/package.json | 2 + backend/src/controllers/TicketController.ts | 222 ++--------------- .../bulljs/remote-ticket-creation-handle.ts | 231 ++++++++++++++++++ .../bulljs/remote-ticket-creation-queue.ts | 53 ++++ package-lock.json | 2 +- 5 files changed, 305 insertions(+), 205 deletions(-) create mode 100644 backend/src/libs/bulljs/remote-ticket-creation-handle.ts create mode 100644 backend/src/libs/bulljs/remote-ticket-creation-queue.ts diff --git a/backend/package.json b/backend/package.json index 1e5bd13..f2c67d4 100644 --- a/backend/package.json +++ b/backend/package.json @@ -21,6 +21,7 @@ "@types/pino": "^6.3.4", "axios": "^1.2.3", "bcryptjs": "^2.4.3", + "bull": "^4.12.8", "cookie-parser": "^1.4.5", "cors": "^2.8.5", "date-fns": "^2.30.0", @@ -55,6 +56,7 @@ "devDependencies": { "@types/bcryptjs": "^2.4.2", "@types/bluebird": "^3.5.32", + "@types/bull": "^4.10.0", "@types/cookie-parser": "^1.4.2", "@types/cors": "^2.8.7", "@types/express": "^4.17.13", diff --git a/backend/src/controllers/TicketController.ts b/backend/src/controllers/TicketController.ts index da1399d..8fd063e 100644 --- a/backend/src/controllers/TicketController.ts +++ b/backend/src/controllers/TicketController.ts @@ -19,10 +19,8 @@ import ptBR from "date-fns/locale/pt-BR"; import { splitDateTime } from "../helpers/SplitDateTime"; import format from "date-fns/format"; -import ListTicketsServiceCache from "../services/TicketServices/ListTicketServiceCache"; -import { searchTicketCache, loadTicketsCache } from "../helpers/TicketCache"; -import { Op, where } from "sequelize"; +import { Op } from "sequelize"; type IndexQuery = { searchParam: string; @@ -49,37 +47,17 @@ interface TicketData { import ListStatusChatEndService from "../services/StatusChatEndService/ListStatusChatEndService"; import Ticket from "../models/Ticket"; -import ShowUserServiceReport from "../services/UserServices/ShowUserServiceReport"; import TicketEmiterSumOpenClosedByUser from "../helpers/OnlineReporEmiterInfoByUser"; import CountTicketService from "../services/TicketServices/CountTicketService"; -import CountTicketsByUserQueue from "../services/UserServices/CountTicketsByUserQueue"; -import ShowUserService from "../services/UserServices/ShowUserService"; -import axios from "axios"; -import User from "../models/User"; -import CheckContactOpenTickets from "../helpers/CheckContactOpenTickets"; -import GetDefaultWhatsApp from "../helpers/GetDefaultWhatsApp"; -import { getWbot } from "../libs/wbot"; -import endPointQuery from "../helpers/old_EndPointQuery"; -import Contact from "../models/Contact"; +import { HttpStatusCode } from "axios"; import BotIsOnQueue from "../helpers/BotIsOnQueue"; import { setMessageAsRead } from "../helpers/SetMessageAsRead"; import { getSettingValue } from "../helpers/WhaticketSettings"; import ListWhatsAppsForQueueService from "../services/WhatsappService/ListWhatsAppsForQueueService"; import ListWhatsAppsNumber from "../services/WhatsappService/ListWhatsAppsNumber"; -import Whatsapp from "../models/Whatsapp"; import AppError from "../errors/AppError"; -import CreateOrUpdateContactService from "../services/ContactServices/CreateOrUpdateContactService"; -import FindOrCreateTicketService from "../services/TicketServices/FindOrCreateTicketService"; -import CheckIsValidContact from "../services/WbotServices/CheckIsValidContact"; -import GetProfilePicUrl from "../services/WbotServices/GetProfilePicUrl"; -import CreateContactService from "../services/ContactServices/CreateContactService"; -import { botSendMessage } from "../services/WbotServices/wbotMessageListener"; -import WhatsappQueue from "../models/WhatsappQueue"; import { del, get, set } from "../helpers/RedisClient"; -import CountStatusChatEndService from "../services/StatusChatEndService/CountStatusChatEndService"; -import Queue from "../models/Queue"; -import StatusChatEnd from "../models/StatusChatEnd"; -import controllByNumber from "../helpers/controllByNumber"; +import { initRemoteTicketCreationQueue, remoteTicketCreationQueue } from "../libs/bulljs/remote-ticket-creation-queue"; export const index = async (req: Request, res: Response): Promise => { const { @@ -125,11 +103,12 @@ export const remoteTicketCreation = async ( req: Request, res: Response ): Promise => { + + initRemoteTicketCreationQueue(); + let { queueId, contact_from, cc, contact_to, msg, contact_name }: any = req.body; - let whatsappId: any; - if (!queueId && !contact_from && !cc) { return res.status(400).json({ error: `Property 'queueId' or 'contact_from' or 'cc' is required.` @@ -154,188 +133,23 @@ export const remoteTicketCreation = async ( } } - if (queueId) { - const whatsapps = await ListWhatsAppsForQueueService(queueId, "CONNECTED"); - - if (!whatsapps || whatsapps?.length == 0) { - return res.status(500).json({ - msg: `queueId ${queueId} does not have a WhatsApp number associated with it or the number's session is disconnected.` - }); - } - - const { id } = whatsapps[0]; - - whatsappId = id; - } else if (contact_from) { - const whatsapp = await Whatsapp.findOne({ - where: { number: contact_from, status: "CONNECTED" } - }); - - if (!whatsapp) { - return res.status(404).json({ - msg: `Whatsapp number ${contact_from} not found or disconnected!` - }); - } - - const { id } = whatsapp; - - const { queues } = await ShowWhatsAppService(id); - - if (!queues || queues.length == 0) { - return res.status(500).json({ - msg: `The WhatsApp number ${contact_from} is not associated with any queue! ` - }); - } - - queueId = queues[0].id; - whatsappId = id; - } else if (cc) { - const queue = await Queue.findOne({ where: { cc } }); - if (!queue) { - return res.status(404).json({ - msg: `Queue with cc ${cc} not found! ` - }); - } - - queueId = queue.id; - - const whatsapps = await ListWhatsAppsForQueueService(queueId, "CONNECTED"); - - if (whatsapps.length === 0) { - return res.status(500).json({ - msg: `No WhatsApp found for this cc ${cc} or the WhatsApp number is disconnected! ` - }); - } - - whatsappId = whatsapps[0].id; - } - - // const validNumber = await CheckIsValidContact(contact_to, true); - const validNumber = contact_to; - - if (validNumber) { - let contact = await Contact.findOne({ where: { number: validNumber } }); - - if (!contact) { - // const profilePicUrl = await GetProfilePicUrl(validNumber); - - contact = await CreateContactService({ - name: contact_name ? contact_name : contact_to, - number: validNumber - // profilePicUrl - }); - - const io = getIO(); - io.emit("contact", { - action: "create", - contact - }); - } - - const { id: contactId } = contact; - - const botInfo = await BotIsOnQueue("botqueue"); - - // ticket from queueChoice or bot - let ticket: any = await Ticket.findOne({ - where: { - [Op.or]: [ - { contactId, status: "queueChoice" }, - { contactId, status: "open", userId: botInfo.userIdBot } - ] - } - }); - - if (getSettingValue("whatsaAppCloudApi")?.value == "enabled") { - if (ticket) { - await UpdateTicketService({ - ticketData: { status: "closed" }, - ticketId: ticket.id - }); - ticket = null; - } - } else { - if (ticket) { - await UpdateTicketService({ - ticketData: { status: "closed" }, - ticketId: ticket.id - }); - } - } - - ticket = await Ticket.findOne({ - where: { - [Op.or]: [ - { contactId, status: "pending" }, - { contactId, status: "open" } - ] - } - }); - - if (ticket) { - console.log( - `THE CAMPAIGN TICKET WAS NOT CREATED BECAUSE THE TICKET IS PENDING OR OPEN` - ); - - return res.status(422).json({ - msg: `The campaign ticket was not created because the number ${contact_to} already has a ticket open or pending` - }); - } - - ticket = await FindOrCreateTicketService( - contact, - whatsappId, - 0, - undefined, + //Create a job to remote ticket creation + remoteTicketCreationQueue.add(`${contact_from}-${contact_to}`, { + input: { queueId, - true - ); - - // botSendMessage(ticket, `${msg}`); - - await ticket.update({ - lastMessage: msg - }); - - await set( - `remote:ticketId:${ticket.id}`, - JSON.stringify({ - id: ticket.id, - createdAt: ticket.createdAt, - updatedAt: ticket.updatedAt, - whatsappId: ticket.whatsappId, - status: ticket.status - }) - ); - - const io = getIO(); - io.to(ticket.status).emit("ticket", { - action: "update", - ticket - }); - - const obj = await controllByNumber(); - - if (obj?.tickets) { - io.emit("remoteTickesControll", { - action: "update", - tickets: obj.ticketIds - }); + contact_from, + cc, + contact_to, + msg, + contact_name, } + }) - console.log( - `REMOTE TICKET CREATION FROM ENDPOINT | STATUS: 200 | MSG: success` - ); + return res.status(HttpStatusCode.Processing).json({ + msg: "Ticket initialization process started successfully" + }) - return res.status(200).json({ msg: "success" }); - } - console.log( - `REMOTE TICKET CREATION FROM ENDPOINT | STATUS: 500 | MSG: The number ${contact_to} does not exist on WhatsApp` - ); - return res - .status(500) - .json({ msg: `The number ${contact_to} does not exist on WhatsApp` }); }; export const store = async (req: Request, res: Response): Promise => { diff --git a/backend/src/libs/bulljs/remote-ticket-creation-handle.ts b/backend/src/libs/bulljs/remote-ticket-creation-handle.ts new file mode 100644 index 0000000..95efa7e --- /dev/null +++ b/backend/src/libs/bulljs/remote-ticket-creation-handle.ts @@ -0,0 +1,231 @@ + +import { Job } from "bull"; +import { HttpStatusCode } from "axios"; +import { Op } from "sequelize"; + +import { getIO } from "../socket"; + +import ListWhatsAppsForQueueService from "../../services/WhatsappService/ListWhatsAppsForQueueService"; +import Whatsapp from "../../models/Whatsapp"; +import ShowWhatsAppService from "../../services/WhatsappService/ShowWhatsAppService"; +import Queue from "../../models/Queue"; +import Contact from "../../models/Contact"; +import Ticket from "../../models/Ticket"; + +import CreateContactService from "../../services/ContactServices/CreateContactService"; +import UpdateTicketService from "../../services/TicketServices/UpdateTicketService"; +import FindOrCreateTicketService from "../../services/TicketServices/FindOrCreateTicketService"; + +import controllByNumber from "../../helpers/controllByNumber"; +import _botIsOnQueue from "../../helpers/BotIsOnQueue"; +import { getSettingValue } from "../../helpers/WhaticketSettings"; +import { set } from "../../helpers/RedisClient"; + +export const handleAddRemoteTicketCreationQueue = async (job: Job) => { + + try { + const { id, data } = job + console.log(`handleAddRemoteTicketCreationQueue Job started with ID ${id}`) + let { queueId, contact_from, cc, contact_to, msg, contact_name }: any = data; + let whatsappId: any; + + + + if (queueId) { + const whatsapps = await ListWhatsAppsForQueueService(queueId, "CONNECTED"); + + if (!whatsapps || whatsapps?.length == 0) { + return Promise.reject(new Error(`${HttpStatusCode.InternalServerError}: queueId ${queueId} does not have a WhatsApp number associated with it or the number's session is disconnected.`)); + // return res.status(500).json({ + // msg: `` + // }); + } + + const { id } = whatsapps[0]; + + whatsappId = id; + } else if (contact_from) { + const whatsapp = await Whatsapp.findOne({ + where: { number: contact_from, status: "CONNECTED" } + }); + + if (!whatsapp) { + return Promise.reject(new Error(`${HttpStatusCode.NotFound}: Whatsapp number ${contact_from} not found or disconnected!`)); + // return res.status(404).json({ + // msg: `Whatsapp number ${contact_from} not found or disconnected!` + // }); + } + + const { id } = whatsapp; + + const { queues } = await ShowWhatsAppService(id); + + if (!queues || queues.length == 0) { + return Promise.reject(new Error(`${HttpStatusCode.InternalServerError}: The WhatsApp number ${contact_from} is not associated with any queue!`)); + // return res.status(500).json({ + // msg: `The WhatsApp number ${contact_from} is not associated with any queue! ` + // }); + } + + queueId = queues[0].id; + whatsappId = id; + } else if (cc) { + const queue = await Queue.findOne({ where: { cc } }); + if (!queue) { + return Promise.reject(new Error(`${HttpStatusCode.NotFound}: Queue with cc ${cc} not found! `)); + // return res.status(404).json({ + // msg: `Queue with cc ${cc} not found! ` + // }); + } + + queueId = queue.id; + + const whatsapps = await ListWhatsAppsForQueueService(queueId, "CONNECTED"); + + if (whatsapps.length === 0) { + return Promise.reject(new Error(`${HttpStatusCode.InternalServerError}: No WhatsApp found for this cc ${cc} or the WhatsApp number is disconnected!`)); + // return res.status(500).json({ + // msg: `No WhatsApp found for this cc ${cc} or the WhatsApp number is disconnected! ` + // }); + } + + whatsappId = whatsapps[0].id; + } + + // const validNumber = await CheckIsValidContact(contact_to, true); + const validNumber = contact_to; + + if (validNumber) { + let contact = await Contact.findOne({ where: { number: validNumber } }); + + if (!contact) { + // const profilePicUrl = await GetProfilePicUrl(validNumber); + + contact = await CreateContactService({ + name: contact_name ? contact_name : contact_to, + number: validNumber + // profilePicUrl + }); + + const io = getIO(); + io.emit("contact", { + action: "create", + contact + }); + } + + const { id: contactId } = contact; + + const botInfo = await _botIsOnQueue("botqueue"); + + // ticket from queueChoice or bot + let ticket: any = await Ticket.findOne({ + where: { + [Op.or]: [ + { contactId, status: "queueChoice" }, + { contactId, status: "open", userId: botInfo.userIdBot } + ] + } + }); + + if (getSettingValue("whatsaAppCloudApi")?.value == "enabled") { + if (ticket) { + await UpdateTicketService({ + ticketData: { status: "closed" }, + ticketId: ticket.id + }); + ticket = null; + } + } else { + if (ticket) { + await UpdateTicketService({ + ticketData: { status: "closed" }, + ticketId: ticket.id + }); + } + } + + ticket = await Ticket.findOne({ + where: { + [Op.or]: [ + { contactId, status: "pending" }, + { contactId, status: "open" } + ] + } + }); + + if (ticket) { + console.log( + `THE CAMPAIGN TICKET WAS NOT CREATED BECAUSE THE TICKET IS PENDING OR OPEN` + ); + return Promise.reject( + new Error(`${HttpStatusCode.UnprocessableEntity}: The campaign ticket was not created because the number ${contact_to} already has a ticket open or pending`) + ); + // return res.status(422).json({ + // msg: `The campaign ticket was not created because the number ${contact_to} already has a ticket open or pending` + // }); + } + + ticket = await FindOrCreateTicketService( + contact, + whatsappId, + 0, + undefined, + queueId, + true + ); + + // botSendMessage(ticket, `${msg}`); + + await ticket.update({ + lastMessage: msg + }); + + await set( + `remote:ticketId:${ticket.id}`, + JSON.stringify({ + id: ticket.id, + createdAt: ticket.createdAt, + updatedAt: ticket.updatedAt, + whatsappId: ticket.whatsappId, + status: ticket.status + }) + ); + + const io = getIO(); + io.to(ticket.status).emit("ticket", { + action: "update", + ticket + }); + + const obj = await controllByNumber(); + + if (obj?.tickets) { + io.emit("remoteTickesControll", { + action: "update", + tickets: obj.ticketIds + }); + } + + console.log( + `REMOTE TICKET CREATION FROM ENDPOINT | STATUS: 200 | MSG: success` + ); + return Promise.resolve(); // return res.status(200).json({ msg: "success" }); + } + + console.log( + `REMOTE TICKET CREATION FROM ENDPOINT | STATUS: 500 | MSG: The number ${contact_to} does not exist on WhatsApp` + ); + + return Promise.reject( + new Error(`${HttpStatusCode.InternalServerError}: The number ${contact_to} does not exist on WhatsApp`) + ); + // return res + // .status(500) + // .json({ msg: `The number ${contact_to} does not exist on WhatsApp` }); + } catch (error) { + console.log(`Error trying to execute handleAddRemoteTicketCreation. \nReason:`, error) + return Promise.reject(new Error("Error trying to create a remote ticket in queue")) + } +} + diff --git a/backend/src/libs/bulljs/remote-ticket-creation-queue.ts b/backend/src/libs/bulljs/remote-ticket-creation-queue.ts new file mode 100644 index 0000000..3e1f6ee --- /dev/null +++ b/backend/src/libs/bulljs/remote-ticket-creation-queue.ts @@ -0,0 +1,53 @@ +import Queue from "bull"; +import cluster from "cluster"; +import { handleAddRemoteTicketCreationQueue } from "./remote-ticket-creation-handle"; + + +export let remoteTicketCreationQueue: Queue.Queue; + +export function initRemoteTicketCreationQueue() { + if (!remoteTicketCreationQueue) { + remoteTicketCreationQueue = new Queue("remote-ticket-creation-queue", process.env.REDIS_URI as string, { + limiter: { + duration: 5000, + max: 1000, + }, + defaultJobOptions: { + attempts: 3, + }, + }); + + const numOfWorkers = 4; + + if (cluster.isMaster) { + console.log("Main Worker of remote ticket creation initiated") + for (let i = 0; i < numOfWorkers; i++) { + cluster.fork() + } + + cluster.on('exit', function (worker, code, signal) { + console.log(`Worker ${worker.id} of remote ticket creation with code ${code}, signal: ${signal}...`) + }); + + remoteTicketCreationQueue.on("completed", (job, result) => { + console.log(`Job completed ${result}`) + }); + + remoteTicketCreationQueue.on("error", (error) => { + console.log(`Error trying to execute Remote Ticket Creation JOB ${error}`) + }) + + remoteTicketCreationQueue.on("failed", (job) => { + console.log(`Failed trying to execute Remote Ticket Creation JOB ${job.id} with data ${job.data}`) + }) + + } else { + console.log(`Worker of remote ticket creation initiated`) + + remoteTicketCreationQueue.process(numOfWorkers, handleAddRemoteTicketCreationQueue) + } + } else { + console.log(`Warning: Trying to init remote ticket creation queue process, but it is already running.`) + } +} + diff --git a/package-lock.json b/package-lock.json index f1ff13b..e17b739 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,5 +1,5 @@ { - "name": "whaticket", + "name": "projeto-hit", "lockfileVersion": 2, "requires": true, "packages": {