Implementação de cache para whatsapp

pull/21/head
adriano 2022-11-17 11:40:37 -03:00
parent d86a7efe12
commit bc406cb1d0
10 changed files with 297 additions and 16 deletions

View File

@ -67,9 +67,7 @@ const updateContactCacheById = async (id: string | number, update_fields: object
const contact_cache: any = await redis.hgetall(`contact:${id}`)
try {
if (contact_cache && Object.keys(contact_cache).length > 0) {
// await redis.del(`contact:${id}`)
if (contact_cache && Object.keys(contact_cache).length > 0) {
update_fields.escaped_name = escapeCharCache(update_fields.name)

View File

@ -0,0 +1,240 @@
import Redis from 'ioredis'
import { type } from 'os'
const unflatten = require('flat').unflatten
var flatten = require('flat')
import ListWhatsAppsNumber from "../services/WhatsappService/ListWhatsAppsNumber"
import { redisConn } from './TicketCache'
import Whatsapp from "../models/Whatsapp";
const deleteWhatsappCache = async (hash:any) => {
const redis: any = await redisConn();
if(!redis) return
if (redis.status !== 'connect') return
const whatsapp_cache: any = await redis.hgetall(hash)
try {
if (whatsapp_cache && Object.keys(whatsapp_cache).length > 0) {
await redis.del(hash)
console.log(`Whatsapp cache number ${whatsapp_cache['number']} deleted!`)
}
else {
console.log('WHATSAPP CACHE NOT FOUND!')
}
} catch (error) {
console.log(`There was an error on deleteWhatsappCache: ${error}`)
}
redis.quit()
}
const updateWhatsappCache = async (hash: any, json_object: any) => {
const redis: any = await redisConn();
if(!redis) return
if (redis.status !== 'connect') return
const pipeline = redis.pipeline()
let entries = Object.entries(json_object)
entries.forEach((e: any) => {
pipeline.hset(hash, e[0], e[1])
})
await pipeline.exec(() => { console.log("whatsapp Key/value inserted/updated") });
redis.quit()
}
const updateWhatsappCacheById = async (hash:any, update_fields: object | any) => {
const redis: any = await redisConn();
if(!redis) return
if (redis.status !== 'connect') return
const whatsapp_cache: any = await redis.hgetall(hash)
try {
if (whatsapp_cache && Object.keys(whatsapp_cache).length > 0) {
// update_fields.escaped_name = escapeCharCache(update_fields.name)
await updateWhatsappCache(hash, update_fields)
console.log(`WHATSAPP ${whatsapp_cache['number']} CACHE WAS UPDATED!`)
}
else {
console.log('WHATSAPP CACHE NOT FOUND!')
}
} catch (error) {
console.log(`There was an error on updateWhatsappCacheById: ${error}`)
}
redis.quit()
}
// const createOrUpdateContactCache = async (hash: any, contact: any) => {
// const redis: any = await redisConn();
// if(!redis) return
// if (redis.status !== 'connect') return
// if (contact.name) {
// contact.escaped_name = escapeCharCache(contact.name)
// }
// await redis.hmset(hash, contact);
// redis.quit()
// }
async function searchWhatsappCache(search: string) {
const redis: any = await redisConn();
if(!redis) return
if (redis.status !== 'connect') return null
const response: any = await redis.call('FT.SEARCH', 'idx_whatsapp', `(@status:*${search}*)|(@number:*${search}*)`, 'SORTBY', 'status', 'ASC')
redis.quit()
if (response.length === 1) {
return []
}
const results: any = []
for (let n = 2; n < response.length; n += 2) {
const result: any = {}
const fieldNamesAndValues = response[n]
for (let m = 0; m < fieldNamesAndValues.length; m += 2) {
const k = fieldNamesAndValues[m]
const v = fieldNamesAndValues[m + 1]
result[k] = v
}
results.push(result)
}
return results
}
const insertOrUpeateWhatsCache = async (hash:any, whatsapp: any) => {
const redis: any = await redisConn();
if(!redis) return
if (redis.status !== 'connect') return
if(Array.isArray(whatsapp)){
const pipeline = redis.pipeline()
for (let i = 0; i < whatsapp.length; i++) {
pipeline.hmset(hash, whatsapp[i]);
}
await pipeline.exec(() => { console.log(`${whatsapp.length} WHATSAPP INSERTED IN CACHE!`) });
}
else{
await redis.hmset(hash,JSON.parse(JSON.stringify(whatsapp)));
console.log(`${whatsapp.length} WHATSAPP INSERTED OR UPADTED IN CACHE!`)
}
redis.quit()
}
const loadWhatsappCache = async () => {
await createWhatsappIndexCache('idx_whatsapp')
const redis: any = await redisConn();
if(!redis) return
if (redis.status !== 'connect') return
let whatsapps:any = await Whatsapp.findAll({raw: true})
const pipeline = redis.pipeline()
for (let i = 0; i < whatsapps.length; i++) {
whatsapps[i].createdAt = new Date(whatsapps[i].createdAt).toISOString()
whatsapps[i].updatedAt = new Date(whatsapps[i].updatedAt).toISOString()
// whatsapps[i].escaped_name = escapeCharCache(whatsapps[i].name)
pipeline.hmset(`whatsapp:${whatsapps[i].id}`, whatsapps[i]);
}
await pipeline.exec(() => { console.log(`${whatsapps.length} WHATSAPPS INSERTED IN CACHE!`) });
redis.quit()
}
const createWhatsappIndexCache = async (hashIndex: string) => {
const redis: any = await redisConn();
if(!redis) return
if (redis.status !== 'connect') return
try {
const lst_index_redis: any = await redis.call('FT._LIST')
if (lst_index_redis.includes(hashIndex)) {
console.log('entrou...')
await redis.call('FT.DROPINDEX', hashIndex)
}
const response = await redis.call('FT.CREATE', hashIndex, 'ON', 'HASH', 'PREFIX', '1', 'whatsapp:', 'SCHEMA', 'status', 'TEXT', 'SORTABLE', 'number', 'TEXT', 'SORTABLE')
console.log('Whatsapp index created: ', response)
} catch (error) {
console.log('There was an error on createWhatsappIndexCache: ', error)
}
redis.quit()
}
export {
loadWhatsappCache,
searchWhatsappCache,
updateWhatsappCacheById,
insertOrUpeateWhatsCache,
deleteWhatsappCache
}

View File

@ -20,6 +20,7 @@ const sessions: Session[] = [];
let backupSession: any[] = []
import { insertOrUpeateWhatsCache } from "../helpers/WhatsCache";
@ -87,6 +88,7 @@ export const initWbot = async (whatsapp: Whatsapp, backupSessionRestore: boolean
logger.info("Session:", sessionName);
qrCode.generate(qr, { small: true });
await whatsapp.update({ qrcode: qr, status: "qrcode", retries: 0 });
await insertOrUpeateWhatsCache(`whatsapp:${whatsapp.id}`, { qrcode: qr, status: "qrcode", retries: 0 })
const sessionIndex = sessions.findIndex(s => s.id === whatsapp.id);
if (sessionIndex === -1) {
@ -112,6 +114,7 @@ export const initWbot = async (whatsapp: Whatsapp, backupSessionRestore: boolean
if (whatsapp.retries > 1) {
await whatsapp.update({ session: "", retries: 0 });
await insertOrUpeateWhatsCache(`whatsapp:${whatsapp.id}`, { session: "", retries: 0 })
}
const retry = whatsapp.retries;
@ -119,6 +122,10 @@ export const initWbot = async (whatsapp: Whatsapp, backupSessionRestore: boolean
status: "DISCONNECTED",
retries: retry + 1
});
await insertOrUpeateWhatsCache(`whatsapp:${whatsapp.id}`, {
status: "DISCONNECTED",
retries: retry + 1
})
io.emit("whatsappSession", {
action: "update",
@ -132,13 +139,15 @@ export const initWbot = async (whatsapp: Whatsapp, backupSessionRestore: boolean
logger.info(`Session: ${sessionName} READY`);
console.log('>>>>>>>>>>>>>> ready wbot.ts MOBILE NUMBER: ', wbot.info["wid"]["user"])
await whatsapp.update({
status: "CONNECTED",
qrcode: "",
retries: 0,
number: wbot.info["wid"]["user"]
});
});
await insertOrUpeateWhatsCache(`whatsapp:${whatsapp.id}`,whatsapp)
io.emit("whatsappSession", {
action: "update",

View File

@ -9,6 +9,7 @@ import { startWhoIsOnlineMonitor } from "./helpers/WhoIsOnlineMonitor"
import { loadTicketsCache, flushCache, cacheSize } from './helpers/TicketCache'
import { loadContactsCache } from './helpers/ContactsCache'
import { loadWhatsappCache } from './helpers/WhatsCache'
const server = app.listen(process.env.PORT, () => {
logger.info(`Server started on port: ${process.env.PORT}`);
@ -29,6 +30,7 @@ gracefulShutdown(server);
await flushCache()
await loadContactsCache()
await loadTicketsCache()
await loadWhatsappCache()
}
})()

View File

@ -20,6 +20,7 @@ import { json } from "sequelize/types";
import sendMessageMultiSession from "../../helpers/TrySendMessageMultiSession";
import { restartWhatsSession } from "../../helpers/RestartWhatsSession";
import { insertOrUpeateWhatsCache, searchWhatsappCache } from "../../helpers/WhatsCache";
@ -34,22 +35,25 @@ const SendWhatsAppMessage = async ({
ticket,
quotedMsg
}: Request): Promise<WbotMessage> => {
var timetaken = "Time taken to send message";
console.time(timetaken)
let quotedMsgSerializedId: string | undefined;
if (quotedMsg) {
await GetWbotMessage(ticket, quotedMsg.id);
quotedMsgSerializedId = SerializeWbotMsgId(ticket, quotedMsg);
}
//TEST DEL
//TEST DEL
// const whatsapp = await ShowWhatsAppService(33);
// const wbot2 = getWbot(whatsapp.id);
// await wbot2.logout();
//
let whats_number = await searchWhatsappCache(`${ticket.whatsappId}`)
console.log('---')
console.log('whats_number search: ', whats_number)
console.log('---')
let whatsapps: any
@ -104,12 +108,16 @@ const SendWhatsAppMessage = async ({
try {
console.time
const sentMessage = await wbot.sendMessage(`${ticket.contact.number}@${ticket.isGroup ? "g" : "c"}.us`, body, { quotedMessageId: quotedMsgSerializedId, linkPreview: false });
await ticket.update({ lastMessage: body });
await updateTicketCacheByTicketId(ticket.id, { lastMessage: body, updatedAt: new Date(ticket.updatedAt).toISOString() })
console.timeEnd(timetaken)
return sentMessage;
} catch (err) {
@ -122,6 +130,11 @@ const SendWhatsAppMessage = async ({
await whatsapp.update({
status: "RESTORING",
});
await insertOrUpeateWhatsCache(`whatsapp:${whatsapp.id}`, {
status: "RESTORING",
})
setTimeout(() => restartWhatsSession(whatsapp, true), 90000);
}

View File

@ -4,9 +4,14 @@ import { wbotMessageListener } from "./wbotMessageListener";
import { getIO } from "../../libs/socket";
import wbotMonitor from "./wbotMonitor";
import { logger } from "../../utils/logger";
import { insertOrUpeateWhatsCache } from "../../helpers/WhatsCache";
export const StartWhatsAppSession = async (whatsapp: Whatsapp, backupSession: boolean = false): Promise<void> => {
await whatsapp.update({ status: "OPENING" });
await insertOrUpeateWhatsCache(`whatsapp:${whatsapp.id}`, {
status: "OPENING",
})
const io = getIO();
io.emit("whatsappSession", {

View File

@ -1,5 +1,6 @@
import * as Sentry from "@sentry/node";
import { Client } from "whatsapp-web.js";
import { insertOrUpeateWhatsCache } from "../../helpers/WhatsCache";
import { getIO } from "../../libs/socket";
import Whatsapp from "../../models/Whatsapp";
@ -25,6 +26,7 @@ const wbotMonitor = async (
try {
await whatsapp.update({ status: newState });
await insertOrUpeateWhatsCache(`whatsapp:${whatsapp.id}`, { status: newState })
} catch (err) {
Sentry.captureException(err);
logger.error(err);
@ -59,6 +61,7 @@ const wbotMonitor = async (
logger.info(`Disconnected session: ${sessionName}, reason: ${reason}`);
try {
await whatsapp.update({ status: "OPENING", session: "" });
await insertOrUpeateWhatsCache(`whatsapp:${whatsapp.id}`, { status: "OPENING", session: "" })
} catch (err) {
Sentry.captureException(err);
logger.error(err);

View File

@ -62,6 +62,7 @@ const CreateWhatsAppService = async ({
});
if (oldDefaultWhatsapp) {
await oldDefaultWhatsapp.update({ isDefault: false });
}
}

View File

@ -12,8 +12,8 @@ const ListWhatsAppsNumber = async (whatsappId: string | number, status: string):
const whatsapps = await Whatsapp.findAll({
raw: true,
where: {number: whatsapp.number, status: status},
attributes:['id', 'number']
where: { number: whatsapp.number, status: status },
attributes: ['id', 'number', 'status', 'isDefault']
});
return whatsapps;

View File

@ -5,6 +5,7 @@ import AppError from "../../errors/AppError";
import Whatsapp from "../../models/Whatsapp";
import ShowWhatsAppService from "./ShowWhatsAppService";
import AssociateWhatsappQueue from "./AssociateWhatsappQueue";
import { insertOrUpeateWhatsCache } from "../../helpers/WhatsCache";
interface WhatsappData {
name?: string;
@ -78,6 +79,15 @@ const UpdateWhatsAppService = async ({
isDefault
});
await insertOrUpeateWhatsCache(`whatsapp:${whatsapp.id}`, {
name,
status,
session,
greetingMessage,
farewellMessage,
isDefault
})
await AssociateWhatsappQueue(whatsapp, queueIds);
return { whatsapp, oldDefaultWhatsapp };