From 3aae93141a313566bf7b7746a619952e5799e908 Mon Sep 17 00:00:00 2001 From: adriano Date: Mon, 14 Nov 2022 11:21:58 -0300 Subject: [PATCH] =?UTF-8?q?Cria=C3=A7=C3=A3o=20de=20coluna=20number=20na?= =?UTF-8?q?=20tabela=20whatsapp=20e=20desenvolvimento=20do=20ignore=20id?= =?UTF-8?q?=20from=20message?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...112040251-add-number-column-to-whatsapp.ts | 13 + .../helpers/LastMessageIdByContactCache.ts | 262 ++++++++++++++++++ backend/src/helpers/TicketCache.ts | 4 +- backend/src/libs/wbot.ts | 7 +- backend/src/models/Whatsapp.ts | 3 + .../WbotServices/SendWhatsAppMessage.ts | 46 ++- .../WbotServices/wbotMessageListener.ts | 86 +++++- .../WhatsappService/ListWhatsAppsNumber.ts | 28 ++ 8 files changed, 422 insertions(+), 27 deletions(-) create mode 100644 backend/src/database/migrations/20221112040251-add-number-column-to-whatsapp.ts create mode 100644 backend/src/helpers/LastMessageIdByContactCache.ts create mode 100644 backend/src/services/WhatsappService/ListWhatsAppsNumber.ts diff --git a/backend/src/database/migrations/20221112040251-add-number-column-to-whatsapp.ts b/backend/src/database/migrations/20221112040251-add-number-column-to-whatsapp.ts new file mode 100644 index 0000000..58cd82e --- /dev/null +++ b/backend/src/database/migrations/20221112040251-add-number-column-to-whatsapp.ts @@ -0,0 +1,13 @@ +import { QueryInterface, DataTypes } from "sequelize"; + +module.exports = { + up: (queryInterface: QueryInterface) => { + return queryInterface.addColumn("Whatsapps", "number", { + type: DataTypes.TEXT + }); + }, + + down: (queryInterface: QueryInterface) => { + return queryInterface.removeColumn("Whatsapps", "number"); + } +}; diff --git a/backend/src/helpers/LastMessageIdByContactCache.ts b/backend/src/helpers/LastMessageIdByContactCache.ts new file mode 100644 index 0000000..15e26a4 --- /dev/null +++ b/backend/src/helpers/LastMessageIdByContactCache.ts @@ -0,0 +1,262 @@ + +import Redis from 'ioredis' +import { type } from 'os' +const unflatten = require('flat').unflatten +var flatten = require('flat') +import ListContactsServiceCache from "../services/ContactServices/ListContactsServiceCache" +import { redisConn } from './TicketCache' + + +const deleteContactsByIdCache = async (id: string | number) => { + + const redis: any = await redisConn(); + + if (!redis) return + + if (redis.status !== 'connect') return + + const contact_cache: any = await redis.hgetall(`contact:${id}`) + + try { + if (contact_cache && Object.keys(contact_cache).length > 0) { + + await redis.del(`contact:${id}`) + + console.log(`Contacts cache number ${contact_cache['number']} deleted!`) + } + else { + console.log('CONTACT CACHE NOT FOUND!') + } + } catch (error) { + console.log(`There was an error on deleteContactsByIdCache: ${error}`) + } + + redis.quit() +} + +const updateContactCache = async (hash: any, json_object: any) => { + + const redis: any = await redisConn(); + + if (!redis) return + + if (redis.status !== 'connect') return + + const pipeline = redis.pipeline() + + let entries = Object.entries(json_object) + + entries.forEach((e: any) => { + pipeline.hset(hash, e[0], e[1]) + }) + + await pipeline.exec(() => { console.log("Key/value inserted/updated") }); + + redis.quit() + +} + +const updateContactCacheById = async (id: string | number, update_fields: object | any) => { + + const redis: any = await redisConn(); + + if (!redis) return + + if (redis.status !== 'connect') return + + const contact_cache: any = await redis.hgetall(`contact:${id}`) + + try { + if (contact_cache && Object.keys(contact_cache).length > 0) { + + // await redis.del(`contact:${id}`) + + update_fields.escaped_name = escapeCharCache(update_fields.name) + + await updateContactCache(`contact:${id}`, update_fields) + + console.log(`CONTACT ${contact_cache['number']} CACHE WAS UPDATED!`) + } + else { + console.log('CONTACT CACHE NOT FOUND!') + } + } catch (error) { + console.log(`There was an error on updateContactCacheById: ${error}`) + } + + redis.quit() +} + +const createOrUpdateContactCache = async (hash: any, contact: any) => { + + const redis: any = await redisConn(); + + if (!redis) return + + if (redis.status !== 'connect') return + + if (contact.name) { + contact.escaped_name = escapeCharCache(contact.name) + } + + await redis.hmset(hash, contact); + + redis.quit() + +} + + +async function searchContactCache(search: string, offset: number, limit: number) { + + const redis: any = await redisConn(); + + if (!redis) return + + if (redis.status !== 'connect') return null + + search = escapeCharCache(search) + + const response: any = await redis.call('FT.SEARCH', 'idx_contact_message', `(@escaped_name:*${search}*)|(@number:*${search}*)`, 'LIMIT', offset, limit, 'SORTBY', 'escaped_name', 'ASC') + redis.quit() + + + if (response.length === 1) { + return [] + } + + const results: any = [] + + for (let n = 2; n < response.length; n += 2) { + const result: any = {} + const fieldNamesAndValues = response[n] + + for (let m = 0; m < fieldNamesAndValues.length; m += 2) { + const k = fieldNamesAndValues[m] + const v = fieldNamesAndValues[m + 1] + result[k] = v + } + + results.push(result) + } + + return results +} + + +const removeExtraSpace = (str: string) => { + + str = str.replace(/^\s+/g, '') + + return str.replace(/\s+/g, ' ') +} + +const escapeCharCache = (str: string) => { + + const pattern = /[\'|\"|\.|\,|\;|\<|\>|\{|\}|\[|\]|\"|\'|\=|\~|\*|\:|\#|\+|\^|\$|\@|\%|\!|\&|\)|\(|/|\-|\\)]/g; // no match, use replace function. + + let newStr = str.replace(pattern, (t1) => `\\${t1}`); + + newStr = removeExtraSpace(newStr) + + return newStr.trim() + +} + +const getLastId = async (hash: any) => { + + const redis: any = await redisConn(); + + if (!redis) return + + if (redis.status !== 'connect') return + + const contact_cache: any = await redis.hgetall(hash) + + return contact_cache + +} + +const insertMessageContactCache = async (hash: any, contact_message: any) => { + + const redis: any = await redisConn(); + + if (!redis) return + + if (redis.status !== 'connect') return + + await redis.hmset(hash, contact_message); + + console.log('CREATED/UPDATED CONTACT MESSAGE') + + redis.quit() + +} + +const loadContactsCache = async () => { + + await createContactMessageIndexCache('idx_contact_message') + + const redis: any = await redisConn(); + + if (!redis) return + + if (redis.status !== 'connect') return + + let contacts = await ListContactsServiceCache() + + const pipeline = redis.pipeline() + + for (let i = 0; i < contacts.length; i++) { + + contacts[i].createdAt = new Date(contacts[i].createdAt).toISOString() + contacts[i].updatedAt = new Date(contacts[i].updatedAt).toISOString() + + contacts[i].escaped_name = escapeCharCache(contacts[i].name) + + pipeline.hmset(`contact:${contacts[i].id}`, contacts[i]); + } + + await pipeline.exec(() => { console.log(`${contacts.length} CONTACTS INSERTED IN CACHE!`) }); + + redis.quit() +} + +const createContactMessageIndexCache = async (hashIndex: string) => { + + const redis: any = await redisConn(); + + if (!redis) return + + if (redis.status !== 'connect') return + + try { + + const lst_index_redis: any = await redis.call('FT._LIST') + + if (lst_index_redis.includes(hashIndex)) { + console.log('entrou...') + await redis.call('FT.DROPINDEX', hashIndex) + } + + const response = await redis.call('FT.CREATE', hashIndex, 'ON', 'HASH', 'PREFIX', '1', 'contact_message:', 'SCHEMA', 'number', 'TEXT', 'SORTABLE') + + console.log('contact_message index created: ', response) + + } catch (error) { + console.log('There was an error on contact_message: ', error) + } + + redis.quit() +} + +export { + loadContactsCache, + searchContactCache, + deleteContactsByIdCache, + updateContactCacheById, + createOrUpdateContactCache, + escapeCharCache, + + insertMessageContactCache, + getLastId +} \ No newline at end of file diff --git a/backend/src/helpers/TicketCache.ts b/backend/src/helpers/TicketCache.ts index dc4fe80..68e53b2 100644 --- a/backend/src/helpers/TicketCache.ts +++ b/backend/src/helpers/TicketCache.ts @@ -207,9 +207,7 @@ const createOrUpdateTicketCache = async (hash: any, ticket: any) => { if(!redis) return - if (redis.status !== 'connect') return - - if (redis.status !== 'connect') return + if (redis.status !== 'connect') return ticket.escaped_name = escapeCharCache(ticket['contact.name']) diff --git a/backend/src/libs/wbot.ts b/backend/src/libs/wbot.ts index 079022b..c3f914e 100644 --- a/backend/src/libs/wbot.ts +++ b/backend/src/libs/wbot.ts @@ -35,7 +35,9 @@ const syncUnreadMessages = async (wbot: Session) => { }); for (const msg of unreadMessages) { + // console.log('--BACKEND MSG: ', msg) + await handleMessage(msg, wbot); } @@ -129,12 +131,13 @@ export const initWbot = async (whatsapp: Whatsapp, backupSessionRestore: boolean wbot.on("ready", async () => { logger.info(`Session: ${sessionName} READY`); - // console.log('>>>>>>>>>>>>>> ready wbot.ts MOBILE NUMBER: ', wbot.info["wid"]["user"]) + console.log('>>>>>>>>>>>>>> ready wbot.ts MOBILE NUMBER: ', wbot.info["wid"]["user"]) await whatsapp.update({ status: "CONNECTED", qrcode: "", - retries: 0 + retries: 0, + number: wbot.info["wid"]["user"] }); io.emit("whatsappSession", { diff --git a/backend/src/models/Whatsapp.ts b/backend/src/models/Whatsapp.ts index 8442faa..1a3a6fc 100644 --- a/backend/src/models/Whatsapp.ts +++ b/backend/src/models/Whatsapp.ts @@ -53,6 +53,9 @@ class Whatsapp extends Model { @Column(DataType.TEXT) farewellMessage: string; + @Column + number: string; + @Default(false) @AllowNull @Column diff --git a/backend/src/services/WbotServices/SendWhatsAppMessage.ts b/backend/src/services/WbotServices/SendWhatsAppMessage.ts index 7ad1f50..c274601 100644 --- a/backend/src/services/WbotServices/SendWhatsAppMessage.ts +++ b/backend/src/services/WbotServices/SendWhatsAppMessage.ts @@ -5,13 +5,17 @@ import GetWbotMessage from "../../helpers/GetWbotMessage"; import SerializeWbotMsgId from "../../helpers/SerializeWbotMsgId"; import Message from "../../models/Message"; import Ticket from "../../models/Ticket"; +import Whatsapp from "../../models/Whatsapp"; import ShowWhatsAppService from "../WhatsappService/ShowWhatsAppService"; import wbotByUserQueue from '../../helpers/GetWbotByUserQueue' import { WhatsIndex } from "../../helpers/LoadBalanceWhatsSameQueue"; -import { updateTicketCacheByTicketId } from '../../helpers/TicketCache' +import { deleteTicketsByContactsCache, updateTicketCacheByTicketId } from '../../helpers/TicketCache' + +import ListWhatsAppsNumber from "../WhatsappService/ListWhatsAppsNumber"; +import { getWbot } from "../../libs/wbot"; interface Request { body: string; @@ -30,12 +34,39 @@ const SendWhatsAppMessage = async ({ quotedMsgSerializedId = SerializeWbotMsgId(ticket, quotedMsg); } + //TEST DEL + + // const whatsapp = await ShowWhatsAppService(33); - const whatsapp = await ShowWhatsAppService(ticket.whatsappId); + // const wbot2 = getWbot(whatsapp.id); - if (whatsapp.status != 'CONNECTED') { + // await wbot2.logout(); + + // - let whatsapps = await wbotByUserQueue(ticket.userId) + let whatsapps:any + + const listWhatsapp = await ListWhatsAppsNumber(ticket.whatsappId, 'CONNECTED') + + if (listWhatsapp.length > 1) { + + const _whatsapp = listWhatsapp[Math.floor(Math.random() * listWhatsapp.length)]; + + await ticket.update({ whatsappId: _whatsapp.id }); + + } + else { + + whatsapps = await Whatsapp.findOne({ + where: { id: ticket.whatsappId }, + attributes: ['status'] + }) + + } + + if (listWhatsapp.length == 0 || whatsapps !='CONNECTED') { + + whatsapps = await wbotByUserQueue(ticket.userId) if (whatsapps.length > 0) { @@ -54,19 +85,14 @@ const SendWhatsAppMessage = async ({ } - const wbot = await GetTicketWbot(ticket); - - try { const sentMessage = await wbot.sendMessage(`${ticket.contact.number}@${ticket.isGroup ? "g" : "c"}.us`, body, { quotedMessageId: quotedMsgSerializedId, linkPreview: false }); + await ticket.update({ lastMessage: body }); - - // TEST DEL await updateTicketCacheByTicketId(ticket.id, { lastMessage: body, updatedAt: new Date(ticket.updatedAt).toISOString() }) - // return sentMessage; } catch (err) { diff --git a/backend/src/services/WbotServices/wbotMessageListener.ts b/backend/src/services/WbotServices/wbotMessageListener.ts index 18fe020..4c59268 100644 --- a/backend/src/services/WbotServices/wbotMessageListener.ts +++ b/backend/src/services/WbotServices/wbotMessageListener.ts @@ -54,8 +54,14 @@ import { splitDateTime } from "../../helpers/SplitDateTime"; // import { updateTicketCacheByTicketId } from '../../helpers/TicketCache' +import { insertMessageContactCache, getLastId } from '../../helpers/LastMessageIdByContactCache' +let testLastId = '' + +let lst: any[] = [] +let lstAux: any[] = [] + interface Session extends Client { id?: number; @@ -235,7 +241,7 @@ const verifyQueue = async ( ticketData: { queueId: choosenQueue.id }, ticketId: ticket.id }); - + let botOptions = '' @@ -297,7 +303,7 @@ const verifyQueue = async ( } - + } }; @@ -362,12 +368,68 @@ const handleMessage = async ( msg: WbotMessage, wbot: Session ): Promise => { + + // TEST DEL MULTI SESSION + + if (!msg.id.fromMe) { + } + + // let index = lst.findIndex((x: any) => x.id == msg.id.id) + + // console.log('INDEX: ', index) + + // if (index == -1) { + + // lst.push({id: msg.id.id}) + + // } + // else{ + // console.log('IGNORED ID: ', msg.id.id) + // return + // } + + // console.log('PASSOU.................................FROM: ', msg.from.split("@")[0], ' | ID: ', msg.id.id) + + + + + + // const contact_message = await getLastId(`contact_message:5517988310949`) + + // if (contact_message && contact_message.id == msg.id.id) { + // console.log('IGNORED MESSAGE SAME ID FROM CLIENT: ', contact_message.id) + // return + // } + + // await insertMessageContactCache(`contact_message:5517988310949`, + // { + // from: msg.from.split("@")[0], + // to: msg.to.split("@")[0], + // id: msg.id.id + // }) + + // console.log('PASSOU............................................... contact_message.id: ',contact_message.id) + + + // if (testLastId == msg.id.id) { + // console.log('IGNORED MESSAGE SAME ID!') + // return + // } + // testLastId = msg.id.id + + // console.log('PASSOU............................................... msg.id.id: ',msg.id.id) + + + + + + + + if (!isValidMsg(msg)) { return; } - - try { let msgContact: WbotContact; let groupContact: Contact | undefined; @@ -386,7 +448,7 @@ const handleMessage = async ( msgContact = await msg.getContact(); - // + // console.log(`\n <<<<<<<<<< RECEIVING MESSAGE: Parcial msg and msgContact info: @@ -420,8 +482,8 @@ const handleMessage = async ( const unreadMessages = msg.fromMe ? 0 : chat.unreadCount; - const contact = await verifyContact(msgContact); - + const contact = await verifyContact(msgContact); + if (unreadMessages === 0 && whatsapp.farewellMessage && whatsapp.farewellMessage === msg.body) return; @@ -431,7 +493,7 @@ const handleMessage = async ( wbot.id!, unreadMessages, groupContact - ); + ); // // await updateTicketCacheByTicketId(ticket.id, {'contact.profilePicUrl': ticket.contact.profilePicUrl}) @@ -439,7 +501,7 @@ const handleMessage = async ( // Para responder para o cliente pelo mesmo whatsapp que ele enviou a mensagen if (wbot.id != ticket.whatsappId) { - + await ticket.update({ whatsappId: wbot.id }); } @@ -558,7 +620,7 @@ const handleMessage = async ( - + // test del let next = true @@ -792,7 +854,7 @@ const handleMessage = async ( if (whatsapp.status == 'CONNECTED') { - + let timestamp = Math.floor(Date.now() / 1000) @@ -801,7 +863,7 @@ const handleMessage = async ( await restartWhatsSession(whatsapp) - + } } diff --git a/backend/src/services/WhatsappService/ListWhatsAppsNumber.ts b/backend/src/services/WhatsappService/ListWhatsAppsNumber.ts new file mode 100644 index 0000000..e236490 --- /dev/null +++ b/backend/src/services/WhatsappService/ListWhatsAppsNumber.ts @@ -0,0 +1,28 @@ + +import Whatsapp from "../../models/Whatsapp"; + +const ListWhatsAppsNumber = async (whatsappId: string | number, status: string): Promise => { + + const whatsapp = await Whatsapp.findOne({ + raw: true, + where: { id: whatsappId } + }) + + if (whatsapp) { + + const whatsapps = await Whatsapp.findAll({ + raw: true, + where: {number: whatsapp.number, status: status}, + attributes:['id', 'number'] + }); + + return whatsapps; + + } + + return [] + + +}; + +export default ListWhatsAppsNumber;