From bc406cb1d067812a6c1c91bb6448975a748fef35 Mon Sep 17 00:00:00 2001 From: adriano Date: Thu, 17 Nov 2022 11:40:37 -0300 Subject: [PATCH] =?UTF-8?q?Implementa=C3=A7=C3=A3o=20de=20cache=20para=20w?= =?UTF-8?q?hatsapp?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/src/helpers/ContactsCache.ts | 4 +- backend/src/helpers/WhatsCache.ts | 240 ++++++++++++++++++ backend/src/libs/wbot.ts | 11 +- backend/src/server.ts | 2 + .../WbotServices/SendWhatsAppMessage.ts | 33 ++- .../WbotServices/StartWhatsAppSession.ts | 5 + .../src/services/WbotServices/wbotMonitor.ts | 3 + .../WhatsappService/CreateWhatsAppService.ts | 1 + .../WhatsappService/ListWhatsAppsNumber.ts | 4 +- .../WhatsappService/UpdateWhatsAppService.ts | 10 + 10 files changed, 297 insertions(+), 16 deletions(-) create mode 100644 backend/src/helpers/WhatsCache.ts diff --git a/backend/src/helpers/ContactsCache.ts b/backend/src/helpers/ContactsCache.ts index 1cb7835..902a385 100644 --- a/backend/src/helpers/ContactsCache.ts +++ b/backend/src/helpers/ContactsCache.ts @@ -67,9 +67,7 @@ const updateContactCacheById = async (id: string | number, update_fields: object const contact_cache: any = await redis.hgetall(`contact:${id}`) try { - if (contact_cache && Object.keys(contact_cache).length > 0) { - - // await redis.del(`contact:${id}`) + if (contact_cache && Object.keys(contact_cache).length > 0) { update_fields.escaped_name = escapeCharCache(update_fields.name) diff --git a/backend/src/helpers/WhatsCache.ts b/backend/src/helpers/WhatsCache.ts new file mode 100644 index 0000000..84f4a1a --- /dev/null +++ b/backend/src/helpers/WhatsCache.ts @@ -0,0 +1,240 @@ + +import Redis from 'ioredis' +import { type } from 'os' +const unflatten = require('flat').unflatten +var flatten = require('flat') +import ListWhatsAppsNumber from "../services/WhatsappService/ListWhatsAppsNumber" +import { redisConn } from './TicketCache' +import Whatsapp from "../models/Whatsapp"; + +const deleteWhatsappCache = async (hash:any) => { + + const redis: any = await redisConn(); + + if(!redis) return + + if (redis.status !== 'connect') return + + const whatsapp_cache: any = await redis.hgetall(hash) + + try { + if (whatsapp_cache && Object.keys(whatsapp_cache).length > 0) { + + await redis.del(hash) + + console.log(`Whatsapp cache number ${whatsapp_cache['number']} deleted!`) + } + else { + console.log('WHATSAPP CACHE NOT FOUND!') + } + } catch (error) { + console.log(`There was an error on deleteWhatsappCache: ${error}`) + } + + redis.quit() +} + +const updateWhatsappCache = async (hash: any, json_object: any) => { + + const redis: any = await redisConn(); + + if(!redis) return + + if (redis.status !== 'connect') return + + const pipeline = redis.pipeline() + + let entries = Object.entries(json_object) + + entries.forEach((e: any) => { + pipeline.hset(hash, e[0], e[1]) + }) + + await pipeline.exec(() => { console.log("whatsapp Key/value inserted/updated") }); + + redis.quit() + +} + +const updateWhatsappCacheById = async (hash:any, update_fields: object | any) => { + + const redis: any = await redisConn(); + + if(!redis) return + + if (redis.status !== 'connect') return + + const whatsapp_cache: any = await redis.hgetall(hash) + + try { + if (whatsapp_cache && Object.keys(whatsapp_cache).length > 0) { + + // update_fields.escaped_name = escapeCharCache(update_fields.name) + + await updateWhatsappCache(hash, update_fields) + + console.log(`WHATSAPP ${whatsapp_cache['number']} CACHE WAS UPDATED!`) + } + else { + console.log('WHATSAPP CACHE NOT FOUND!') + } + } catch (error) { + console.log(`There was an error on updateWhatsappCacheById: ${error}`) + } + + redis.quit() +} + +// const createOrUpdateContactCache = async (hash: any, contact: any) => { + +// const redis: any = await redisConn(); + +// if(!redis) return + +// if (redis.status !== 'connect') return + +// if (contact.name) { +// contact.escaped_name = escapeCharCache(contact.name) +// } + +// await redis.hmset(hash, contact); + +// redis.quit() + +// } + + +async function searchWhatsappCache(search: string) { + + const redis: any = await redisConn(); + + if(!redis) return + + if (redis.status !== 'connect') return null + + const response: any = await redis.call('FT.SEARCH', 'idx_whatsapp', `(@status:*${search}*)|(@number:*${search}*)`, 'SORTBY', 'status', 'ASC') + redis.quit() + + + if (response.length === 1) { + return [] + } + + const results: any = [] + + for (let n = 2; n < response.length; n += 2) { + const result: any = {} + const fieldNamesAndValues = response[n] + + for (let m = 0; m < fieldNamesAndValues.length; m += 2) { + const k = fieldNamesAndValues[m] + const v = fieldNamesAndValues[m + 1] + result[k] = v + } + + results.push(result) + } + + return results +} + + + +const insertOrUpeateWhatsCache = async (hash:any, whatsapp: any) => { + + const redis: any = await redisConn(); + + if(!redis) return + + if (redis.status !== 'connect') return + + if(Array.isArray(whatsapp)){ + + const pipeline = redis.pipeline() + + for (let i = 0; i < whatsapp.length; i++) { + + pipeline.hmset(hash, whatsapp[i]); + } + + await pipeline.exec(() => { console.log(`${whatsapp.length} WHATSAPP INSERTED IN CACHE!`) }); + } + else{ + + await redis.hmset(hash,JSON.parse(JSON.stringify(whatsapp))); + + console.log(`${whatsapp.length} WHATSAPP INSERTED OR UPADTED IN CACHE!`) + + } + + + redis.quit() +} + + + + +const loadWhatsappCache = async () => { + + await createWhatsappIndexCache('idx_whatsapp') + + const redis: any = await redisConn(); + + if(!redis) return + + if (redis.status !== 'connect') return + + let whatsapps:any = await Whatsapp.findAll({raw: true}) + + const pipeline = redis.pipeline() + + for (let i = 0; i < whatsapps.length; i++) { + + whatsapps[i].createdAt = new Date(whatsapps[i].createdAt).toISOString() + whatsapps[i].updatedAt = new Date(whatsapps[i].updatedAt).toISOString() + + // whatsapps[i].escaped_name = escapeCharCache(whatsapps[i].name) + + pipeline.hmset(`whatsapp:${whatsapps[i].id}`, whatsapps[i]); + } + + await pipeline.exec(() => { console.log(`${whatsapps.length} WHATSAPPS INSERTED IN CACHE!`) }); + + redis.quit() +} + +const createWhatsappIndexCache = async (hashIndex: string) => { + + const redis: any = await redisConn(); + + if(!redis) return + + if (redis.status !== 'connect') return + + try { + + const lst_index_redis: any = await redis.call('FT._LIST') + + if (lst_index_redis.includes(hashIndex)) { + console.log('entrou...') + await redis.call('FT.DROPINDEX', hashIndex) + } + + const response = await redis.call('FT.CREATE', hashIndex, 'ON', 'HASH', 'PREFIX', '1', 'whatsapp:', 'SCHEMA', 'status', 'TEXT', 'SORTABLE', 'number', 'TEXT', 'SORTABLE') + + console.log('Whatsapp index created: ', response) + + } catch (error) { + console.log('There was an error on createWhatsappIndexCache: ', error) + } + + redis.quit() +} + +export { + loadWhatsappCache, + searchWhatsappCache, + updateWhatsappCacheById, + insertOrUpeateWhatsCache, + deleteWhatsappCache +} \ No newline at end of file diff --git a/backend/src/libs/wbot.ts b/backend/src/libs/wbot.ts index cb2cd12..e666211 100644 --- a/backend/src/libs/wbot.ts +++ b/backend/src/libs/wbot.ts @@ -20,6 +20,7 @@ const sessions: Session[] = []; let backupSession: any[] = [] +import { insertOrUpeateWhatsCache } from "../helpers/WhatsCache"; @@ -87,6 +88,7 @@ export const initWbot = async (whatsapp: Whatsapp, backupSessionRestore: boolean logger.info("Session:", sessionName); qrCode.generate(qr, { small: true }); await whatsapp.update({ qrcode: qr, status: "qrcode", retries: 0 }); + await insertOrUpeateWhatsCache(`whatsapp:${whatsapp.id}`, { qrcode: qr, status: "qrcode", retries: 0 }) const sessionIndex = sessions.findIndex(s => s.id === whatsapp.id); if (sessionIndex === -1) { @@ -112,6 +114,7 @@ export const initWbot = async (whatsapp: Whatsapp, backupSessionRestore: boolean if (whatsapp.retries > 1) { await whatsapp.update({ session: "", retries: 0 }); + await insertOrUpeateWhatsCache(`whatsapp:${whatsapp.id}`, { session: "", retries: 0 }) } const retry = whatsapp.retries; @@ -119,6 +122,10 @@ export const initWbot = async (whatsapp: Whatsapp, backupSessionRestore: boolean status: "DISCONNECTED", retries: retry + 1 }); + await insertOrUpeateWhatsCache(`whatsapp:${whatsapp.id}`, { + status: "DISCONNECTED", + retries: retry + 1 + }) io.emit("whatsappSession", { action: "update", @@ -132,13 +139,15 @@ export const initWbot = async (whatsapp: Whatsapp, backupSessionRestore: boolean logger.info(`Session: ${sessionName} READY`); console.log('>>>>>>>>>>>>>> ready wbot.ts MOBILE NUMBER: ', wbot.info["wid"]["user"]) + await whatsapp.update({ status: "CONNECTED", qrcode: "", retries: 0, number: wbot.info["wid"]["user"] - }); + }); + await insertOrUpeateWhatsCache(`whatsapp:${whatsapp.id}`,whatsapp) io.emit("whatsappSession", { action: "update", diff --git a/backend/src/server.ts b/backend/src/server.ts index f4949f6..b5c793c 100644 --- a/backend/src/server.ts +++ b/backend/src/server.ts @@ -9,6 +9,7 @@ import { startWhoIsOnlineMonitor } from "./helpers/WhoIsOnlineMonitor" import { loadTicketsCache, flushCache, cacheSize } from './helpers/TicketCache' import { loadContactsCache } from './helpers/ContactsCache' +import { loadWhatsappCache } from './helpers/WhatsCache' const server = app.listen(process.env.PORT, () => { logger.info(`Server started on port: ${process.env.PORT}`); @@ -29,6 +30,7 @@ gracefulShutdown(server); await flushCache() await loadContactsCache() await loadTicketsCache() + await loadWhatsappCache() } })() diff --git a/backend/src/services/WbotServices/SendWhatsAppMessage.ts b/backend/src/services/WbotServices/SendWhatsAppMessage.ts index e07c8e3..68d4877 100644 --- a/backend/src/services/WbotServices/SendWhatsAppMessage.ts +++ b/backend/src/services/WbotServices/SendWhatsAppMessage.ts @@ -20,6 +20,7 @@ import { json } from "sequelize/types"; import sendMessageMultiSession from "../../helpers/TrySendMessageMultiSession"; import { restartWhatsSession } from "../../helpers/RestartWhatsSession"; +import { insertOrUpeateWhatsCache, searchWhatsappCache } from "../../helpers/WhatsCache"; @@ -34,22 +35,25 @@ const SendWhatsAppMessage = async ({ ticket, quotedMsg }: Request): Promise => { + + var timetaken = "Time taken to send message"; + + + console.time(timetaken) + + let quotedMsgSerializedId: string | undefined; if (quotedMsg) { await GetWbotMessage(ticket, quotedMsg.id); quotedMsgSerializedId = SerializeWbotMsgId(ticket, quotedMsg); } + + //TEST DEL - //TEST DEL - - // const whatsapp = await ShowWhatsAppService(33); - - // const wbot2 = getWbot(whatsapp.id); - - // await wbot2.logout(); - - // - + let whats_number = await searchWhatsappCache(`${ticket.whatsappId}`) + console.log('---') + console.log('whats_number search: ', whats_number) + console.log('---') let whatsapps: any @@ -104,12 +108,16 @@ const SendWhatsAppMessage = async ({ try { + console.time + const sentMessage = await wbot.sendMessage(`${ticket.contact.number}@${ticket.isGroup ? "g" : "c"}.us`, body, { quotedMessageId: quotedMsgSerializedId, linkPreview: false }); await ticket.update({ lastMessage: body }); await updateTicketCacheByTicketId(ticket.id, { lastMessage: body, updatedAt: new Date(ticket.updatedAt).toISOString() }) + console.timeEnd(timetaken) + return sentMessage; } catch (err) { @@ -122,6 +130,11 @@ const SendWhatsAppMessage = async ({ await whatsapp.update({ status: "RESTORING", }); + + await insertOrUpeateWhatsCache(`whatsapp:${whatsapp.id}`, { + status: "RESTORING", + }) + setTimeout(() => restartWhatsSession(whatsapp, true), 90000); } diff --git a/backend/src/services/WbotServices/StartWhatsAppSession.ts b/backend/src/services/WbotServices/StartWhatsAppSession.ts index 3b02cd2..bd34e23 100644 --- a/backend/src/services/WbotServices/StartWhatsAppSession.ts +++ b/backend/src/services/WbotServices/StartWhatsAppSession.ts @@ -4,9 +4,14 @@ import { wbotMessageListener } from "./wbotMessageListener"; import { getIO } from "../../libs/socket"; import wbotMonitor from "./wbotMonitor"; import { logger } from "../../utils/logger"; +import { insertOrUpeateWhatsCache } from "../../helpers/WhatsCache"; export const StartWhatsAppSession = async (whatsapp: Whatsapp, backupSession: boolean = false): Promise => { await whatsapp.update({ status: "OPENING" }); + + await insertOrUpeateWhatsCache(`whatsapp:${whatsapp.id}`, { + status: "OPENING", + }) const io = getIO(); io.emit("whatsappSession", { diff --git a/backend/src/services/WbotServices/wbotMonitor.ts b/backend/src/services/WbotServices/wbotMonitor.ts index 54b4c2e..c7e0565 100644 --- a/backend/src/services/WbotServices/wbotMonitor.ts +++ b/backend/src/services/WbotServices/wbotMonitor.ts @@ -1,5 +1,6 @@ import * as Sentry from "@sentry/node"; import { Client } from "whatsapp-web.js"; +import { insertOrUpeateWhatsCache } from "../../helpers/WhatsCache"; import { getIO } from "../../libs/socket"; import Whatsapp from "../../models/Whatsapp"; @@ -25,6 +26,7 @@ const wbotMonitor = async ( try { await whatsapp.update({ status: newState }); + await insertOrUpeateWhatsCache(`whatsapp:${whatsapp.id}`, { status: newState }) } catch (err) { Sentry.captureException(err); logger.error(err); @@ -59,6 +61,7 @@ const wbotMonitor = async ( logger.info(`Disconnected session: ${sessionName}, reason: ${reason}`); try { await whatsapp.update({ status: "OPENING", session: "" }); + await insertOrUpeateWhatsCache(`whatsapp:${whatsapp.id}`, { status: "OPENING", session: "" }) } catch (err) { Sentry.captureException(err); logger.error(err); diff --git a/backend/src/services/WhatsappService/CreateWhatsAppService.ts b/backend/src/services/WhatsappService/CreateWhatsAppService.ts index b9f8d37..e2d896f 100644 --- a/backend/src/services/WhatsappService/CreateWhatsAppService.ts +++ b/backend/src/services/WhatsappService/CreateWhatsAppService.ts @@ -62,6 +62,7 @@ const CreateWhatsAppService = async ({ }); if (oldDefaultWhatsapp) { await oldDefaultWhatsapp.update({ isDefault: false }); + } } diff --git a/backend/src/services/WhatsappService/ListWhatsAppsNumber.ts b/backend/src/services/WhatsappService/ListWhatsAppsNumber.ts index e236490..b9c921f 100644 --- a/backend/src/services/WhatsappService/ListWhatsAppsNumber.ts +++ b/backend/src/services/WhatsappService/ListWhatsAppsNumber.ts @@ -12,8 +12,8 @@ const ListWhatsAppsNumber = async (whatsappId: string | number, status: string): const whatsapps = await Whatsapp.findAll({ raw: true, - where: {number: whatsapp.number, status: status}, - attributes:['id', 'number'] + where: { number: whatsapp.number, status: status }, + attributes: ['id', 'number', 'status', 'isDefault'] }); return whatsapps; diff --git a/backend/src/services/WhatsappService/UpdateWhatsAppService.ts b/backend/src/services/WhatsappService/UpdateWhatsAppService.ts index 5c6fde7..2b4e850 100644 --- a/backend/src/services/WhatsappService/UpdateWhatsAppService.ts +++ b/backend/src/services/WhatsappService/UpdateWhatsAppService.ts @@ -5,6 +5,7 @@ import AppError from "../../errors/AppError"; import Whatsapp from "../../models/Whatsapp"; import ShowWhatsAppService from "./ShowWhatsAppService"; import AssociateWhatsappQueue from "./AssociateWhatsappQueue"; +import { insertOrUpeateWhatsCache } from "../../helpers/WhatsCache"; interface WhatsappData { name?: string; @@ -78,6 +79,15 @@ const UpdateWhatsAppService = async ({ isDefault }); + await insertOrUpeateWhatsCache(`whatsapp:${whatsapp.id}`, { + name, + status, + session, + greetingMessage, + farewellMessage, + isDefault + }) + await AssociateWhatsappQueue(whatsapp, queueIds); return { whatsapp, oldDefaultWhatsapp };