Skip to content

Commit 8ac69c3

Browse files
committed
Merge origin/main into fix/search-path-errors
2 parents cc263e4 + 4f1fdf7 commit 8ac69c3

30 files changed

+3185
-1038
lines changed

backend/src/db/pg/PgContacts.ts

Lines changed: 166 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -81,16 +81,96 @@ export default class PgContacts implements Contacts {
8181
INSERT INTO private.messages("channel","folder_path","date","message_id","references","list_id","conversation","user_id")
8282
VALUES($1, $2, $3, $4, $5, $6, $7, $8);`;
8383

84-
private static readonly INSERT_POC_SQL = `
85-
INSERT INTO private.pointsofcontact("message_id","name","from","reply_to","to","cc","bcc","body","person_email","plus_address", "user_id")
86-
VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
87-
RETURNING id;`;
88-
8984
private static readonly UPSERT_PERSON_SQL = `
85+
WITH upserted AS (
86+
INSERT INTO private.persons ("name","email","url","image","location","same_as","given_name","family_name","job_title","identifiers","user_id", "source", "works_for", "mining_id")
87+
VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14)
88+
ON CONFLICT (email, user_id, source) DO UPDATE
89+
SET
90+
name = EXCLUDED.name,
91+
url = EXCLUDED.url,
92+
image = EXCLUDED.image,
93+
location = EXCLUDED.location,
94+
same_as = EXCLUDED.same_as,
95+
given_name = EXCLUDED.given_name,
96+
family_name = EXCLUDED.family_name,
97+
job_title = EXCLUDED.job_title,
98+
identifiers = EXCLUDED.identifiers,
99+
works_for = EXCLUDED.works_for,
100+
mining_id = EXCLUDED.mining_id
101+
WHERE
102+
private.persons.name IS DISTINCT FROM EXCLUDED.name
103+
OR private.persons.url IS DISTINCT FROM EXCLUDED.url
104+
OR private.persons.image IS DISTINCT FROM EXCLUDED.image
105+
OR private.persons.location IS DISTINCT FROM EXCLUDED.location
106+
OR private.persons.same_as IS DISTINCT FROM EXCLUDED.same_as
107+
OR private.persons.given_name IS DISTINCT FROM EXCLUDED.given_name
108+
OR private.persons.family_name IS DISTINCT FROM EXCLUDED.family_name
109+
OR private.persons.job_title IS DISTINCT FROM EXCLUDED.job_title
110+
OR private.persons.identifiers IS DISTINCT FROM EXCLUDED.identifiers
111+
OR private.persons.works_for IS DISTINCT FROM EXCLUDED.works_for
112+
OR private.persons.mining_id IS DISTINCT FROM EXCLUDED.mining_id
113+
RETURNING persons.email
114+
)
115+
SELECT email FROM upserted
116+
UNION ALL
117+
SELECT $2
118+
WHERE NOT EXISTS (SELECT 1 FROM upserted);`;
119+
120+
private static readonly UPSERT_PERSONS_BULK_SQL = `
90121
INSERT INTO private.persons ("name","email","url","image","location","same_as","given_name","family_name","job_title","identifiers","user_id", "source", "works_for", "mining_id")
91-
VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12, $13, $14)
92-
ON CONFLICT (email, user_id, source) DO UPDATE SET name=excluded.name
93-
RETURNING persons.email;`;
122+
SELECT *
123+
FROM UNNEST(
124+
$1::text[],
125+
$2::text[],
126+
$3::text[],
127+
$4::text[],
128+
$5::text[],
129+
$6::text[][],
130+
$7::text[],
131+
$8::text[],
132+
$9::text[],
133+
$10::text[][],
134+
$11::uuid[],
135+
$12::text[],
136+
$13::uuid[],
137+
$14::text[]
138+
)
139+
ON CONFLICT (email, user_id, source) DO UPDATE
140+
SET
141+
name = EXCLUDED.name,
142+
url = EXCLUDED.url,
143+
image = EXCLUDED.image,
144+
location = EXCLUDED.location,
145+
same_as = EXCLUDED.same_as,
146+
given_name = EXCLUDED.given_name,
147+
family_name = EXCLUDED.family_name,
148+
job_title = EXCLUDED.job_title,
149+
identifiers = EXCLUDED.identifiers,
150+
works_for = EXCLUDED.works_for,
151+
mining_id = EXCLUDED.mining_id
152+
WHERE
153+
private.persons.name IS DISTINCT FROM EXCLUDED.name
154+
OR private.persons.url IS DISTINCT FROM EXCLUDED.url
155+
OR private.persons.image IS DISTINCT FROM EXCLUDED.image
156+
OR private.persons.location IS DISTINCT FROM EXCLUDED.location
157+
OR private.persons.same_as IS DISTINCT FROM EXCLUDED.same_as
158+
OR private.persons.given_name IS DISTINCT FROM EXCLUDED.given_name
159+
OR private.persons.family_name IS DISTINCT FROM EXCLUDED.family_name
160+
OR private.persons.job_title IS DISTINCT FROM EXCLUDED.job_title
161+
OR private.persons.identifiers IS DISTINCT FROM EXCLUDED.identifiers
162+
OR private.persons.works_for IS DISTINCT FROM EXCLUDED.works_for
163+
OR private.persons.mining_id IS DISTINCT FROM EXCLUDED.mining_id;`;
164+
165+
private static readonly SELECT_PERSONS_STATUS_BY_EMAILS = `
166+
SELECT email, status
167+
FROM private.persons
168+
WHERE user_id = $1 AND email = ANY($2);
169+
`;
170+
171+
private static readonly INSERT_POC_BULK_SQL = `
172+
INSERT INTO private.pointsofcontact("message_id","name","from","reply_to","to","cc","bcc","body","person_email","plus_address", "user_id")
173+
VALUES %L;`;
94174

95175
private static readonly SELECT_RECENT_EMAIL_STATUS_BY_EMAIL = `
96176
SELECT *
@@ -242,10 +322,8 @@ export default class PgContacts implements Contacts {
242322
}
243323

244324
for (const { person, tags } of persons) {
245-
const {
246-
rows: [{ email }]
247-
// eslint-disable-next-line no-await-in-loop
248-
} = await this.pool.query(PgContacts.UPSERT_PERSON_SQL, [
325+
// eslint-disable-next-line no-await-in-loop
326+
await this.pool.query(PgContacts.UPSERT_PERSON_SQL, [
249327
person.name,
250328
person.email,
251329
person.url,
@@ -278,7 +356,7 @@ export default class PgContacts implements Contacts {
278356
);
279357
}
280358

281-
insertedContacts.add({ email, tags });
359+
insertedContacts.add({ email: person.email, tags });
282360
// eslint-disable-next-line no-await-in-loop
283361
await this.pool.query(
284362
`
@@ -287,7 +365,7 @@ export default class PgContacts implements Contacts {
287365
ON CONFLICT (user_id, email)
288366
DO UPDATE SET tags = ARRAY(SELECT DISTINCT UNNEST(private.refinedpersons.tags || EXCLUDED.tags));
289367
`,
290-
[userId, email, tags.map((tag) => tag.name)]
368+
[userId, person.email, tags.map((tag) => tag.name)]
291369
);
292370
}
293371

@@ -312,60 +390,91 @@ export default class PgContacts implements Contacts {
312390
userId
313391
]);
314392

315-
for (const { pointOfContact, person, tags } of persons) {
316-
// eslint-disable-next-line no-await-in-loop
317-
const { rowCount, rows } = await this.pool.query(
318-
'SELECT email, status FROM private.persons WHERE user_id = $1 AND email = $2;',
319-
[userId, person.email]
320-
);
393+
if (!persons.length) {
394+
return [];
395+
}
321396

322-
if (rowCount === 0 || rows[0]?.status === null) {
397+
const emails = persons.map(({ person }) => person.email);
398+
const { rows } = await this.pool.query(
399+
PgContacts.SELECT_PERSONS_STATUS_BY_EMAILS,
400+
[userId, emails]
401+
);
402+
403+
const statusByEmail = new Map<string, string | null>();
404+
rows.forEach((row) => statusByEmail.set(row.email, row.status));
405+
406+
await this.pool.query(PgContacts.UPSERT_PERSONS_BULK_SQL, [
407+
persons.map(({ person }) => person.name ?? null),
408+
persons.map(({ person }) => person.email),
409+
persons.map(({ person }) => person.url ?? null),
410+
persons.map(({ person }) => person.image ?? null),
411+
persons.map(({ person }) => person.location ?? null),
412+
persons.map(({ person }) => person.sameAs ?? null),
413+
persons.map(({ person }) => person.givenName ?? null),
414+
persons.map(({ person }) => person.familyName ?? null),
415+
persons.map(({ person }) => person.jobTitle ?? null),
416+
persons.map(({ person }) => person.identifiers ?? null),
417+
persons.map(() => userId),
418+
persons.map(({ person }) => person.source),
419+
persons.map(({ person }) => person.worksFor ?? null),
420+
persons.map(() => miningId)
421+
]);
422+
423+
const tagValues = persons.flatMap(({ person, tags }) => {
424+
if (statusByEmail.get(person.email) !== undefined) {
425+
const status = statusByEmail.get(person.email);
426+
if (status === null) {
427+
insertedContacts.add({ email: person.email, tags });
428+
}
429+
} else {
323430
insertedContacts.add({ email: person.email, tags });
324431
}
325432

326-
// eslint-disable-next-line no-await-in-loop
327-
await this.pool.query(PgContacts.UPSERT_PERSON_SQL, [
328-
person.name,
329-
person.email,
330-
person.url,
331-
person.image,
332-
person.location,
333-
person.sameAs,
334-
person.givenName,
335-
person.familyName,
336-
person.jobTitle,
337-
person.identifiers,
338-
userId,
339-
person.source,
340-
person.worksFor,
341-
miningId
342-
]);
343-
344-
const tagValues = tags.map((tag) => [
433+
return tags.map((tag) => [
345434
tag.name,
346435
tag.reachable,
347436
tag.source,
348437
userId,
349438
person.email
350439
]);
440+
});
351441

352-
// eslint-disable-next-line no-await-in-loop
353-
await Promise.allSettled([
354-
this.pool.query(format(PgContacts.INSERT_TAGS_SQL, tagValues)),
355-
this.pool.query(PgContacts.INSERT_POC_SQL, [
356-
message.messageId,
357-
pointOfContact.name,
358-
pointOfContact.from,
359-
pointOfContact.replyTo,
360-
pointOfContact.to,
361-
pointOfContact.cc,
362-
pointOfContact.bcc,
363-
pointOfContact.body,
364-
person.email,
365-
pointOfContact.plusAddress,
366-
userId
367-
])
368-
]);
442+
const pocValues = persons.map(({ pointOfContact, person }) => [
443+
message.messageId,
444+
pointOfContact.name,
445+
pointOfContact.from,
446+
pointOfContact.replyTo,
447+
pointOfContact.to,
448+
pointOfContact.cc,
449+
pointOfContact.bcc,
450+
pointOfContact.body,
451+
person.email,
452+
pointOfContact.plusAddress,
453+
userId
454+
]);
455+
456+
const [tagsInsertResult, pocInsertResult] = await Promise.allSettled([
457+
tagValues.length
458+
? this.pool.query(format(PgContacts.INSERT_TAGS_SQL, tagValues))
459+
: Promise.resolve(),
460+
pocValues.length
461+
? this.pool.query(format(PgContacts.INSERT_POC_BULK_SQL, pocValues))
462+
: Promise.resolve()
463+
]);
464+
465+
if (tagsInsertResult.status === 'rejected') {
466+
this.logger.error('[PgContacts.createContactsFromEmail:tags]', {
467+
error: tagsInsertResult.reason
468+
});
469+
}
470+
471+
if (pocInsertResult.status === 'rejected') {
472+
this.logger.error(
473+
'[PgContacts.createContactsFromEmail:pointsOfContact]',
474+
{
475+
error: pocInsertResult.reason
476+
}
477+
);
369478
}
370479

371480
return Array.from(insertedContacts);
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
import { describe, expect, it, jest } from '@jest/globals';
2+
import type { Pool } from 'pg';
3+
import type { Logger } from 'winston';
4+
import PgContacts from '../../../src/db/pg/PgContacts';
5+
import type { EmailExtractionResult } from '../../../src/db/types';
6+
7+
function createMockLogger(): Logger {
8+
return {
9+
error: jest.fn(),
10+
warn: jest.fn(),
11+
info: jest.fn(),
12+
debug: jest.fn()
13+
} as unknown as Logger;
14+
}
15+
16+
describe('PgContacts create from email', () => {
17+
it('batches person upserts and marks only new-or-unverified contacts', async () => {
18+
const query = jest
19+
.fn<Pool['query']>()
20+
.mockResolvedValueOnce({ rowCount: 1, rows: [] } as never)
21+
.mockResolvedValueOnce({
22+
rowCount: 2,
23+
rows: [
24+
{ email: 'known@example.com', status: 'valid' },
25+
{ email: 'pending@example.com', status: null }
26+
]
27+
} as never)
28+
.mockResolvedValue({ rowCount: 1, rows: [] } as never);
29+
30+
const pool = { query } as unknown as Pool;
31+
const contacts = new PgContacts(pool, createMockLogger());
32+
33+
const extractionResult: EmailExtractionResult = {
34+
type: 'email',
35+
message: {
36+
channel: 'imap',
37+
folderPath: 'INBOX',
38+
date: new Date().toISOString(),
39+
messageId: '<m-1@example.com>',
40+
references: [],
41+
listId: undefined,
42+
conversation: false
43+
},
44+
persons: [
45+
{
46+
pointOfContact: {
47+
name: 'Known Person',
48+
from: true,
49+
replyTo: false,
50+
to: true,
51+
cc: false,
52+
bcc: false,
53+
body: true,
54+
plusAddress: undefined
55+
},
56+
person: {
57+
name: 'Known Person',
58+
email: 'known@example.com',
59+
source: 'imap'
60+
},
61+
tags: []
62+
},
63+
{
64+
pointOfContact: {
65+
name: 'Pending Person',
66+
from: true,
67+
replyTo: false,
68+
to: true,
69+
cc: false,
70+
bcc: false,
71+
body: true,
72+
plusAddress: undefined
73+
},
74+
person: {
75+
name: 'Pending Person',
76+
email: 'pending@example.com',
77+
source: 'imap'
78+
},
79+
tags: [
80+
{
81+
name: 'inbox',
82+
reachable: 1,
83+
source: 'extractor'
84+
}
85+
]
86+
}
87+
]
88+
};
89+
90+
const result = await contacts.create(
91+
extractionResult,
92+
'user-1',
93+
'mining-1'
94+
);
95+
96+
expect(result).toEqual([
97+
{
98+
email: 'pending@example.com',
99+
tags: [
100+
{
101+
name: 'inbox',
102+
reachable: 1,
103+
source: 'extractor'
104+
}
105+
]
106+
}
107+
]);
108+
109+
expect(query).toHaveBeenCalledTimes(5);
110+
111+
const upsertSql = String(query.mock.calls[2][0]);
112+
expect(upsertSql).toContain('INSERT INTO private.persons');
113+
expect(upsertSql).toContain('IS DISTINCT FROM');
114+
});
115+
});

0 commit comments

Comments
 (0)