feat: pg-boss job queue, notifications, client interactions, bulk email
This commit is contained in:
@@ -155,10 +155,37 @@ export const communications = pgTable('communications', {
|
|||||||
aiModel: text('ai_model'), // Which model was used
|
aiModel: text('ai_model'), // Which model was used
|
||||||
status: text('status').default('draft'), // 'draft' | 'approved' | 'sent'
|
status: text('status').default('draft'), // 'draft' | 'approved' | 'sent'
|
||||||
sentAt: timestamp('sent_at'),
|
sentAt: timestamp('sent_at'),
|
||||||
|
batchId: text('batch_id'), // for grouping bulk sends
|
||||||
|
|
||||||
createdAt: timestamp('created_at').defaultNow().notNull(),
|
createdAt: timestamp('created_at').defaultNow().notNull(),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Notifications table
|
||||||
|
export const notifications = pgTable('notifications', {
|
||||||
|
id: uuid('id').primaryKey().defaultRandom(),
|
||||||
|
userId: text('user_id').references(() => users.id, { onDelete: 'cascade' }).notNull(),
|
||||||
|
type: text('type').notNull(), // 'event_reminder' | 'interaction' | 'system'
|
||||||
|
title: text('title').notNull(),
|
||||||
|
message: text('message').notNull(),
|
||||||
|
read: boolean('read').default(false),
|
||||||
|
clientId: uuid('client_id').references(() => clients.id, { onDelete: 'set null' }),
|
||||||
|
eventId: uuid('event_id').references(() => events.id, { onDelete: 'set null' }),
|
||||||
|
createdAt: timestamp('created_at').defaultNow().notNull(),
|
||||||
|
});
|
||||||
|
|
||||||
|
// Interactions table (touchpoint logging)
|
||||||
|
export const interactions = pgTable('interactions', {
|
||||||
|
id: uuid('id').primaryKey().defaultRandom(),
|
||||||
|
userId: text('user_id').references(() => users.id, { onDelete: 'cascade' }).notNull(),
|
||||||
|
clientId: uuid('client_id').references(() => clients.id, { onDelete: 'cascade' }).notNull(),
|
||||||
|
type: text('type').notNull(), // 'call' | 'meeting' | 'email' | 'note' | 'other'
|
||||||
|
title: text('title').notNull(),
|
||||||
|
description: text('description'),
|
||||||
|
duration: integer('duration'), // in minutes
|
||||||
|
contactedAt: timestamp('contacted_at').notNull(),
|
||||||
|
createdAt: timestamp('created_at').defaultNow().notNull(),
|
||||||
|
});
|
||||||
|
|
||||||
// Client notes table
|
// Client notes table
|
||||||
export const clientNotes = pgTable('client_notes', {
|
export const clientNotes = pgTable('client_notes', {
|
||||||
id: uuid('id').primaryKey().defaultRandom(),
|
id: uuid('id').primaryKey().defaultRandom(),
|
||||||
@@ -175,6 +202,8 @@ export const usersRelations = relations(users, ({ many }) => ({
|
|||||||
clients: many(clients),
|
clients: many(clients),
|
||||||
events: many(events),
|
events: many(events),
|
||||||
communications: many(communications),
|
communications: many(communications),
|
||||||
|
notifications: many(notifications),
|
||||||
|
interactions: many(interactions),
|
||||||
sessions: many(sessions),
|
sessions: many(sessions),
|
||||||
accounts: many(accounts),
|
accounts: many(accounts),
|
||||||
}));
|
}));
|
||||||
@@ -187,6 +216,33 @@ export const clientsRelations = relations(clients, ({ one, many }) => ({
|
|||||||
events: many(events),
|
events: many(events),
|
||||||
communications: many(communications),
|
communications: many(communications),
|
||||||
notes: many(clientNotes),
|
notes: many(clientNotes),
|
||||||
|
interactions: many(interactions),
|
||||||
|
}));
|
||||||
|
|
||||||
|
export const notificationsRelations = relations(notifications, ({ one }) => ({
|
||||||
|
user: one(users, {
|
||||||
|
fields: [notifications.userId],
|
||||||
|
references: [users.id],
|
||||||
|
}),
|
||||||
|
client: one(clients, {
|
||||||
|
fields: [notifications.clientId],
|
||||||
|
references: [clients.id],
|
||||||
|
}),
|
||||||
|
event: one(events, {
|
||||||
|
fields: [notifications.eventId],
|
||||||
|
references: [events.id],
|
||||||
|
}),
|
||||||
|
}));
|
||||||
|
|
||||||
|
export const interactionsRelations = relations(interactions, ({ one }) => ({
|
||||||
|
user: one(users, {
|
||||||
|
fields: [interactions.userId],
|
||||||
|
references: [users.id],
|
||||||
|
}),
|
||||||
|
client: one(clients, {
|
||||||
|
fields: [interactions.clientId],
|
||||||
|
references: [clients.id],
|
||||||
|
}),
|
||||||
}));
|
}));
|
||||||
|
|
||||||
export const clientNotesRelations = relations(clientNotes, ({ one }) => ({
|
export const clientNotesRelations = relations(clientNotes, ({ one }) => ({
|
||||||
|
|||||||
14
src/index.ts
14
src/index.ts
@@ -14,10 +14,13 @@ import { activityRoutes } from './routes/activity';
|
|||||||
import { insightsRoutes } from './routes/insights';
|
import { insightsRoutes } from './routes/insights';
|
||||||
import { reportsRoutes } from './routes/reports';
|
import { reportsRoutes } from './routes/reports';
|
||||||
import { notesRoutes } from './routes/notes';
|
import { notesRoutes } from './routes/notes';
|
||||||
|
import { notificationRoutes } from './routes/notifications';
|
||||||
|
import { interactionRoutes } from './routes/interactions';
|
||||||
import { db } from './db';
|
import { db } from './db';
|
||||||
import { users } from './db/schema';
|
import { users } from './db/schema';
|
||||||
import { eq } from 'drizzle-orm';
|
import { eq } from 'drizzle-orm';
|
||||||
import type { User } from './lib/auth';
|
import type { User } from './lib/auth';
|
||||||
|
import { initJobQueue } from './services/jobs';
|
||||||
|
|
||||||
const app = new Elysia()
|
const app = new Elysia()
|
||||||
// CORS
|
// CORS
|
||||||
@@ -72,6 +75,8 @@ const app = new Elysia()
|
|||||||
.use(insightsRoutes)
|
.use(insightsRoutes)
|
||||||
.use(reportsRoutes)
|
.use(reportsRoutes)
|
||||||
.use(notesRoutes)
|
.use(notesRoutes)
|
||||||
|
.use(notificationRoutes)
|
||||||
|
.use(interactionRoutes)
|
||||||
)
|
)
|
||||||
|
|
||||||
// Error handler
|
// Error handler
|
||||||
@@ -111,6 +116,15 @@ const app = new Elysia()
|
|||||||
|
|
||||||
console.log(`🚀 Network App API running at ${app.server?.hostname}:${app.server?.port}`);
|
console.log(`🚀 Network App API running at ${app.server?.hostname}:${app.server?.port}`);
|
||||||
|
|
||||||
|
// Initialize pg-boss job queue
|
||||||
|
(async () => {
|
||||||
|
try {
|
||||||
|
await initJobQueue();
|
||||||
|
} catch (e) {
|
||||||
|
console.error('pg-boss init failed (will retry on next restart):', e);
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
|
||||||
// Bootstrap: ensure donovan@donovankelly.xyz is admin
|
// Bootstrap: ensure donovan@donovankelly.xyz is admin
|
||||||
(async () => {
|
(async () => {
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -1,12 +1,12 @@
|
|||||||
import { Elysia, t } from 'elysia';
|
import { Elysia, t } from 'elysia';
|
||||||
import { db } from '../db';
|
import { db } from '../db';
|
||||||
import { clients, events, communications } from '../db/schema';
|
import { clients, events, communications, interactions } from '../db/schema';
|
||||||
import { eq, and, desc } from 'drizzle-orm';
|
import { eq, and, desc } from 'drizzle-orm';
|
||||||
import type { User } from '../lib/auth';
|
import type { User } from '../lib/auth';
|
||||||
|
|
||||||
export interface ActivityItem {
|
export interface ActivityItem {
|
||||||
id: string;
|
id: string;
|
||||||
type: 'email_sent' | 'email_drafted' | 'event_created' | 'client_contacted' | 'client_created' | 'client_updated';
|
type: 'email_sent' | 'email_drafted' | 'event_created' | 'client_contacted' | 'client_created' | 'client_updated' | 'interaction';
|
||||||
title: string;
|
title: string;
|
||||||
description?: string;
|
description?: string;
|
||||||
date: string;
|
date: string;
|
||||||
@@ -110,6 +110,37 @@ export const activityRoutes = new Elysia({ prefix: '/clients' })
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Interactions
|
||||||
|
const clientInteractions = await db.select()
|
||||||
|
.from(interactions)
|
||||||
|
.where(and(
|
||||||
|
eq(interactions.clientId, params.id),
|
||||||
|
eq(interactions.userId, user.id),
|
||||||
|
))
|
||||||
|
.orderBy(desc(interactions.contactedAt));
|
||||||
|
|
||||||
|
for (const interaction of clientInteractions) {
|
||||||
|
const typeLabels: Record<string, string> = {
|
||||||
|
call: '📞 Phone Call',
|
||||||
|
meeting: '🤝 Meeting',
|
||||||
|
email: '✉️ Email',
|
||||||
|
note: '📝 Note',
|
||||||
|
other: '📌 Interaction',
|
||||||
|
};
|
||||||
|
activities.push({
|
||||||
|
id: `interaction-${interaction.id}`,
|
||||||
|
type: 'interaction',
|
||||||
|
title: `${typeLabels[interaction.type] || typeLabels.other}: ${interaction.title}`,
|
||||||
|
description: interaction.description || undefined,
|
||||||
|
date: interaction.contactedAt.toISOString(),
|
||||||
|
metadata: {
|
||||||
|
interactionId: interaction.id,
|
||||||
|
interactionType: interaction.type,
|
||||||
|
duration: interaction.duration,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
// Sort by date descending
|
// Sort by date descending
|
||||||
activities.sort((a, b) => new Date(b.date).getTime() - new Date(a.date).getTime());
|
activities.sort((a, b) => new Date(b.date).getTime() - new Date(a.date).getTime());
|
||||||
|
|
||||||
|
|||||||
@@ -1,10 +1,11 @@
|
|||||||
import { Elysia, t } from 'elysia';
|
import { Elysia, t } from 'elysia';
|
||||||
import { db } from '../db';
|
import { db } from '../db';
|
||||||
import { clients, communications, userProfiles } from '../db/schema';
|
import { clients, communications, userProfiles } from '../db/schema';
|
||||||
import { eq, and } from 'drizzle-orm';
|
import { eq, and, inArray } from 'drizzle-orm';
|
||||||
import { generateEmail, generateSubject, generateBirthdayMessage, type AIProvider } from '../services/ai';
|
import { generateEmail, generateSubject, generateBirthdayMessage, type AIProvider } from '../services/ai';
|
||||||
import { sendEmail } from '../services/email';
|
import { sendEmail } from '../services/email';
|
||||||
import type { User } from '../lib/auth';
|
import type { User } from '../lib/auth';
|
||||||
|
import { randomUUID } from 'crypto';
|
||||||
|
|
||||||
export const emailRoutes = new Elysia({ prefix: '/emails' })
|
export const emailRoutes = new Elysia({ prefix: '/emails' })
|
||||||
// Generate email for a client
|
// Generate email for a client
|
||||||
@@ -294,6 +295,147 @@ export const emailRoutes = new Elysia({ prefix: '/emails' })
|
|||||||
}),
|
}),
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// Bulk generate emails
|
||||||
|
.post('/bulk-generate', async ({ body, user }: {
|
||||||
|
body: { clientIds: string[]; purpose: string; provider?: AIProvider };
|
||||||
|
user: User;
|
||||||
|
}) => {
|
||||||
|
const batchId = randomUUID();
|
||||||
|
|
||||||
|
// Get user profile
|
||||||
|
const [profile] = await db.select()
|
||||||
|
.from(userProfiles)
|
||||||
|
.where(eq(userProfiles.userId, user.id))
|
||||||
|
.limit(1);
|
||||||
|
|
||||||
|
const advisorInfo = {
|
||||||
|
name: user.name,
|
||||||
|
title: profile?.title || '',
|
||||||
|
company: profile?.company || '',
|
||||||
|
phone: profile?.phone || '',
|
||||||
|
signature: profile?.emailSignature || '',
|
||||||
|
};
|
||||||
|
|
||||||
|
// Get all selected clients
|
||||||
|
const selectedClients = await db.select()
|
||||||
|
.from(clients)
|
||||||
|
.where(and(
|
||||||
|
inArray(clients.id, body.clientIds),
|
||||||
|
eq(clients.userId, user.id),
|
||||||
|
));
|
||||||
|
|
||||||
|
if (selectedClients.length === 0) {
|
||||||
|
throw new Error('No valid clients found');
|
||||||
|
}
|
||||||
|
|
||||||
|
const results = [];
|
||||||
|
|
||||||
|
for (const client of selectedClients) {
|
||||||
|
try {
|
||||||
|
const content = await generateEmail({
|
||||||
|
advisorName: advisorInfo.name,
|
||||||
|
advisorTitle: advisorInfo.title,
|
||||||
|
advisorCompany: advisorInfo.company,
|
||||||
|
advisorPhone: advisorInfo.phone,
|
||||||
|
advisorSignature: advisorInfo.signature,
|
||||||
|
clientName: client.firstName,
|
||||||
|
interests: client.interests || [],
|
||||||
|
notes: client.notes || '',
|
||||||
|
purpose: body.purpose,
|
||||||
|
provider: body.provider,
|
||||||
|
});
|
||||||
|
|
||||||
|
const subject = await generateSubject(body.purpose, client.firstName, body.provider);
|
||||||
|
|
||||||
|
const [comm] = await db.insert(communications)
|
||||||
|
.values({
|
||||||
|
userId: user.id,
|
||||||
|
clientId: client.id,
|
||||||
|
type: 'email',
|
||||||
|
subject,
|
||||||
|
content,
|
||||||
|
aiGenerated: true,
|
||||||
|
aiModel: body.provider || 'anthropic',
|
||||||
|
status: 'draft',
|
||||||
|
batchId,
|
||||||
|
})
|
||||||
|
.returning();
|
||||||
|
|
||||||
|
results.push({ clientId: client.id, email: comm, success: true });
|
||||||
|
} catch (error: any) {
|
||||||
|
results.push({ clientId: client.id, error: error.message, success: false });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return { batchId, results, total: selectedClients.length, generated: results.filter(r => r.success).length };
|
||||||
|
}, {
|
||||||
|
body: t.Object({
|
||||||
|
clientIds: t.Array(t.String({ format: 'uuid' }), { minItems: 1 }),
|
||||||
|
purpose: t.String({ minLength: 1 }),
|
||||||
|
provider: t.Optional(t.Union([t.Literal('anthropic'), t.Literal('openai')])),
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
|
||||||
|
// Bulk send all drafts in a batch
|
||||||
|
.post('/bulk-send', async ({ body, user }: {
|
||||||
|
body: { batchId: string };
|
||||||
|
user: User;
|
||||||
|
}) => {
|
||||||
|
// Get all drafts in this batch
|
||||||
|
const drafts = await db.select({
|
||||||
|
email: communications,
|
||||||
|
client: clients,
|
||||||
|
})
|
||||||
|
.from(communications)
|
||||||
|
.innerJoin(clients, eq(communications.clientId, clients.id))
|
||||||
|
.where(and(
|
||||||
|
eq(communications.batchId, body.batchId),
|
||||||
|
eq(communications.userId, user.id),
|
||||||
|
eq(communications.status, 'draft'),
|
||||||
|
));
|
||||||
|
|
||||||
|
const results = [];
|
||||||
|
|
||||||
|
for (const { email, client } of drafts) {
|
||||||
|
if (!client.email) {
|
||||||
|
results.push({ id: email.id, success: false, error: 'Client has no email' });
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
await sendEmail({
|
||||||
|
to: client.email,
|
||||||
|
subject: email.subject || 'Message from your advisor',
|
||||||
|
content: email.content,
|
||||||
|
});
|
||||||
|
|
||||||
|
await db.update(communications)
|
||||||
|
.set({ status: 'sent', sentAt: new Date() })
|
||||||
|
.where(eq(communications.id, email.id));
|
||||||
|
|
||||||
|
await db.update(clients)
|
||||||
|
.set({ lastContactedAt: new Date() })
|
||||||
|
.where(eq(clients.id, client.id));
|
||||||
|
|
||||||
|
results.push({ id: email.id, success: true });
|
||||||
|
} catch (error: any) {
|
||||||
|
results.push({ id: email.id, success: false, error: error.message });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
batchId: body.batchId,
|
||||||
|
total: drafts.length,
|
||||||
|
sent: results.filter(r => r.success).length,
|
||||||
|
failed: results.filter(r => !r.success).length,
|
||||||
|
results,
|
||||||
|
};
|
||||||
|
}, {
|
||||||
|
body: t.Object({
|
||||||
|
batchId: t.String({ minLength: 1 }),
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
|
||||||
// Delete draft
|
// Delete draft
|
||||||
.delete('/:id', async ({ params, user }: { params: { id: string }; user: User }) => {
|
.delete('/:id', async ({ params, user }: { params: { id: string }; user: User }) => {
|
||||||
const [deleted] = await db.delete(communications)
|
const [deleted] = await db.delete(communications)
|
||||||
|
|||||||
141
src/routes/interactions.ts
Normal file
141
src/routes/interactions.ts
Normal file
@@ -0,0 +1,141 @@
|
|||||||
|
import { Elysia, t } from 'elysia';
|
||||||
|
import { db } from '../db';
|
||||||
|
import { interactions, clients } from '../db/schema';
|
||||||
|
import { eq, and, desc } from 'drizzle-orm';
|
||||||
|
import type { User } from '../lib/auth';
|
||||||
|
|
||||||
|
export const interactionRoutes = new Elysia()
|
||||||
|
// List interactions for a client
|
||||||
|
.get('/clients/:clientId/interactions', async ({ params, user }: { params: { clientId: string }; user: User }) => {
|
||||||
|
// Verify client belongs to user
|
||||||
|
const [client] = await db.select()
|
||||||
|
.from(clients)
|
||||||
|
.where(and(eq(clients.id, params.clientId), eq(clients.userId, user.id)))
|
||||||
|
.limit(1);
|
||||||
|
|
||||||
|
if (!client) throw new Error('Client not found');
|
||||||
|
|
||||||
|
const items = await db.select()
|
||||||
|
.from(interactions)
|
||||||
|
.where(and(eq(interactions.clientId, params.clientId), eq(interactions.userId, user.id)))
|
||||||
|
.orderBy(desc(interactions.contactedAt));
|
||||||
|
|
||||||
|
return items;
|
||||||
|
}, {
|
||||||
|
params: t.Object({ clientId: t.String({ format: 'uuid' }) }),
|
||||||
|
})
|
||||||
|
|
||||||
|
// Create interaction
|
||||||
|
.post('/clients/:clientId/interactions', async ({ params, body, user }: {
|
||||||
|
params: { clientId: string };
|
||||||
|
body: { type: string; title: string; description?: string; duration?: number; contactedAt: string };
|
||||||
|
user: User;
|
||||||
|
}) => {
|
||||||
|
// Verify client belongs to user
|
||||||
|
const [client] = await db.select()
|
||||||
|
.from(clients)
|
||||||
|
.where(and(eq(clients.id, params.clientId), eq(clients.userId, user.id)))
|
||||||
|
.limit(1);
|
||||||
|
|
||||||
|
if (!client) throw new Error('Client not found');
|
||||||
|
|
||||||
|
const [interaction] = await db.insert(interactions)
|
||||||
|
.values({
|
||||||
|
userId: user.id,
|
||||||
|
clientId: params.clientId,
|
||||||
|
type: body.type,
|
||||||
|
title: body.title,
|
||||||
|
description: body.description,
|
||||||
|
duration: body.duration,
|
||||||
|
contactedAt: new Date(body.contactedAt),
|
||||||
|
})
|
||||||
|
.returning();
|
||||||
|
|
||||||
|
// Auto-update lastContactedAt on the client
|
||||||
|
const contactDate = new Date(body.contactedAt);
|
||||||
|
if (!client.lastContactedAt || contactDate > client.lastContactedAt) {
|
||||||
|
await db.update(clients)
|
||||||
|
.set({ lastContactedAt: contactDate, updatedAt: new Date() })
|
||||||
|
.where(eq(clients.id, params.clientId));
|
||||||
|
}
|
||||||
|
|
||||||
|
return interaction;
|
||||||
|
}, {
|
||||||
|
params: t.Object({ clientId: t.String({ format: 'uuid' }) }),
|
||||||
|
body: t.Object({
|
||||||
|
type: t.String({ minLength: 1 }),
|
||||||
|
title: t.String({ minLength: 1 }),
|
||||||
|
description: t.Optional(t.String()),
|
||||||
|
duration: t.Optional(t.Number({ minimum: 0 })),
|
||||||
|
contactedAt: t.String(),
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
|
||||||
|
// Update interaction
|
||||||
|
.put('/interactions/:id', async ({ params, body, user }: {
|
||||||
|
params: { id: string };
|
||||||
|
body: { type?: string; title?: string; description?: string; duration?: number; contactedAt?: string };
|
||||||
|
user: User;
|
||||||
|
}) => {
|
||||||
|
const updateData: Record<string, unknown> = {};
|
||||||
|
if (body.type !== undefined) updateData.type = body.type;
|
||||||
|
if (body.title !== undefined) updateData.title = body.title;
|
||||||
|
if (body.description !== undefined) updateData.description = body.description;
|
||||||
|
if (body.duration !== undefined) updateData.duration = body.duration;
|
||||||
|
if (body.contactedAt !== undefined) updateData.contactedAt = new Date(body.contactedAt);
|
||||||
|
|
||||||
|
const [updated] = await db.update(interactions)
|
||||||
|
.set(updateData)
|
||||||
|
.where(and(eq(interactions.id, params.id), eq(interactions.userId, user.id)))
|
||||||
|
.returning();
|
||||||
|
|
||||||
|
if (!updated) throw new Error('Interaction not found');
|
||||||
|
return updated;
|
||||||
|
}, {
|
||||||
|
params: t.Object({ id: t.String({ format: 'uuid' }) }),
|
||||||
|
body: t.Object({
|
||||||
|
type: t.Optional(t.String()),
|
||||||
|
title: t.Optional(t.String()),
|
||||||
|
description: t.Optional(t.String()),
|
||||||
|
duration: t.Optional(t.Number({ minimum: 0 })),
|
||||||
|
contactedAt: t.Optional(t.String()),
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
|
||||||
|
// Delete interaction
|
||||||
|
.delete('/interactions/:id', async ({ params, user }: { params: { id: string }; user: User }) => {
|
||||||
|
const [deleted] = await db.delete(interactions)
|
||||||
|
.where(and(eq(interactions.id, params.id), eq(interactions.userId, user.id)))
|
||||||
|
.returning({ id: interactions.id });
|
||||||
|
|
||||||
|
if (!deleted) throw new Error('Interaction not found');
|
||||||
|
return { success: true, id: deleted.id };
|
||||||
|
}, {
|
||||||
|
params: t.Object({ id: t.String({ format: 'uuid' }) }),
|
||||||
|
})
|
||||||
|
|
||||||
|
// Get all recent interactions across all clients (for dashboard)
|
||||||
|
.get('/interactions/recent', async ({ query, user }: { query: { limit?: string }; user: User }) => {
|
||||||
|
const limit = query.limit ? parseInt(query.limit) : 10;
|
||||||
|
|
||||||
|
const items = await db.select({
|
||||||
|
interaction: interactions,
|
||||||
|
client: {
|
||||||
|
id: clients.id,
|
||||||
|
firstName: clients.firstName,
|
||||||
|
lastName: clients.lastName,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
.from(interactions)
|
||||||
|
.innerJoin(clients, eq(interactions.clientId, clients.id))
|
||||||
|
.where(eq(interactions.userId, user.id))
|
||||||
|
.orderBy(desc(interactions.contactedAt))
|
||||||
|
.limit(limit);
|
||||||
|
|
||||||
|
return items.map(({ interaction, client }) => ({
|
||||||
|
...interaction,
|
||||||
|
client,
|
||||||
|
}));
|
||||||
|
}, {
|
||||||
|
query: t.Object({ limit: t.Optional(t.String()) }),
|
||||||
|
});
|
||||||
85
src/routes/notifications.ts
Normal file
85
src/routes/notifications.ts
Normal file
@@ -0,0 +1,85 @@
|
|||||||
|
import { Elysia, t } from 'elysia';
|
||||||
|
import { db } from '../db';
|
||||||
|
import { notifications, clients } from '../db/schema';
|
||||||
|
import { eq, and, desc, sql } from 'drizzle-orm';
|
||||||
|
import type { User } from '../lib/auth';
|
||||||
|
|
||||||
|
export const notificationRoutes = new Elysia({ prefix: '/notifications' })
|
||||||
|
// List notifications
|
||||||
|
.get('/', async ({ query, user }: { query: { limit?: string; unreadOnly?: string }; user: User }) => {
|
||||||
|
const limit = query.limit ? parseInt(query.limit) : 50;
|
||||||
|
const unreadOnly = query.unreadOnly === 'true';
|
||||||
|
|
||||||
|
let conditions = [eq(notifications.userId, user.id)];
|
||||||
|
if (unreadOnly) {
|
||||||
|
conditions.push(eq(notifications.read, false));
|
||||||
|
}
|
||||||
|
|
||||||
|
const items = await db.select({
|
||||||
|
notification: notifications,
|
||||||
|
client: {
|
||||||
|
id: clients.id,
|
||||||
|
firstName: clients.firstName,
|
||||||
|
lastName: clients.lastName,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
.from(notifications)
|
||||||
|
.leftJoin(clients, eq(notifications.clientId, clients.id))
|
||||||
|
.where(and(...conditions))
|
||||||
|
.orderBy(desc(notifications.createdAt))
|
||||||
|
.limit(limit);
|
||||||
|
|
||||||
|
// Unread count
|
||||||
|
const [unreadResult] = await db.select({
|
||||||
|
count: sql<number>`count(*)::int`,
|
||||||
|
})
|
||||||
|
.from(notifications)
|
||||||
|
.where(and(eq(notifications.userId, user.id), eq(notifications.read, false)));
|
||||||
|
|
||||||
|
return {
|
||||||
|
notifications: items.map(({ notification, client }) => ({
|
||||||
|
...notification,
|
||||||
|
client: client?.id ? client : null,
|
||||||
|
})),
|
||||||
|
unreadCount: unreadResult?.count || 0,
|
||||||
|
};
|
||||||
|
}, {
|
||||||
|
query: t.Object({
|
||||||
|
limit: t.Optional(t.String()),
|
||||||
|
unreadOnly: t.Optional(t.String()),
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
|
||||||
|
// Mark notification as read
|
||||||
|
.put('/:id/read', async ({ params, user }: { params: { id: string }; user: User }) => {
|
||||||
|
const [updated] = await db.update(notifications)
|
||||||
|
.set({ read: true })
|
||||||
|
.where(and(eq(notifications.id, params.id), eq(notifications.userId, user.id)))
|
||||||
|
.returning();
|
||||||
|
|
||||||
|
if (!updated) throw new Error('Notification not found');
|
||||||
|
return updated;
|
||||||
|
}, {
|
||||||
|
params: t.Object({ id: t.String({ format: 'uuid' }) }),
|
||||||
|
})
|
||||||
|
|
||||||
|
// Mark all as read
|
||||||
|
.post('/mark-all-read', async ({ user }: { user: User }) => {
|
||||||
|
await db.update(notifications)
|
||||||
|
.set({ read: true })
|
||||||
|
.where(and(eq(notifications.userId, user.id), eq(notifications.read, false)));
|
||||||
|
|
||||||
|
return { success: true };
|
||||||
|
})
|
||||||
|
|
||||||
|
// Delete notification
|
||||||
|
.delete('/:id', async ({ params, user }: { params: { id: string }; user: User }) => {
|
||||||
|
const [deleted] = await db.delete(notifications)
|
||||||
|
.where(and(eq(notifications.id, params.id), eq(notifications.userId, user.id)))
|
||||||
|
.returning({ id: notifications.id });
|
||||||
|
|
||||||
|
if (!deleted) throw new Error('Notification not found');
|
||||||
|
return { success: true };
|
||||||
|
}, {
|
||||||
|
params: t.Object({ id: t.String({ format: 'uuid' }) }),
|
||||||
|
});
|
||||||
160
src/services/jobs.ts
Normal file
160
src/services/jobs.ts
Normal file
@@ -0,0 +1,160 @@
|
|||||||
|
import { PgBoss } from 'pg-boss';
|
||||||
|
import { db } from '../db';
|
||||||
|
import { events, notifications, clients, users } from '../db/schema';
|
||||||
|
import { eq, and, gte, lte, sql } from 'drizzle-orm';
|
||||||
|
import { sendEmail } from './email';
|
||||||
|
|
||||||
|
let boss: PgBoss | null = null;
|
||||||
|
|
||||||
|
export async function initJobQueue(): Promise<PgBoss> {
|
||||||
|
const connectionString = process.env.DATABASE_URL;
|
||||||
|
if (!connectionString) {
|
||||||
|
throw new Error('DATABASE_URL required for job queue');
|
||||||
|
}
|
||||||
|
|
||||||
|
boss = new PgBoss(connectionString);
|
||||||
|
|
||||||
|
boss.on('error', (error) => {
|
||||||
|
console.error('[pg-boss] Error:', error);
|
||||||
|
});
|
||||||
|
|
||||||
|
await boss.start();
|
||||||
|
console.log('✅ pg-boss job queue started');
|
||||||
|
|
||||||
|
// Register job handlers
|
||||||
|
await boss.work('check-upcoming-events', { teamConcurrency: 1 }, checkUpcomingEvents);
|
||||||
|
await boss.work('send-event-reminder', { teamConcurrency: 5 }, sendEventReminder);
|
||||||
|
|
||||||
|
// Schedule daily check at 8am UTC
|
||||||
|
await boss.schedule('check-upcoming-events', '0 8 * * *', {}, {
|
||||||
|
tz: 'UTC',
|
||||||
|
});
|
||||||
|
|
||||||
|
console.log('✅ Job schedules registered');
|
||||||
|
return boss;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function getJobQueue(): PgBoss | null {
|
||||||
|
return boss;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Job: Check upcoming events and create notifications
|
||||||
|
async function checkUpcomingEvents(job: PgBoss.Job) {
|
||||||
|
console.log(`[jobs] Running checkUpcomingEvents at ${new Date().toISOString()}`);
|
||||||
|
|
||||||
|
try {
|
||||||
|
const now = new Date();
|
||||||
|
|
||||||
|
// Get all events with their reminder windows
|
||||||
|
const allEvents = await db.select({
|
||||||
|
event: events,
|
||||||
|
client: clients,
|
||||||
|
})
|
||||||
|
.from(events)
|
||||||
|
.innerJoin(clients, eq(events.clientId, clients.id));
|
||||||
|
|
||||||
|
let created = 0;
|
||||||
|
|
||||||
|
for (const { event, client } of allEvents) {
|
||||||
|
const reminderDays = event.reminderDays || 7;
|
||||||
|
let eventDate = new Date(event.date);
|
||||||
|
|
||||||
|
// For recurring events, adjust to this year
|
||||||
|
if (event.recurring) {
|
||||||
|
eventDate = new Date(now.getFullYear(), eventDate.getMonth(), eventDate.getDate());
|
||||||
|
// If the date already passed this year, check next year
|
||||||
|
if (eventDate < now) {
|
||||||
|
eventDate = new Date(now.getFullYear() + 1, eventDate.getMonth(), eventDate.getDate());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const daysUntil = Math.ceil((eventDate.getTime() - now.getTime()) / (1000 * 60 * 60 * 24));
|
||||||
|
|
||||||
|
// Within reminder window and not already triggered recently
|
||||||
|
if (daysUntil >= 0 && daysUntil <= reminderDays) {
|
||||||
|
// Check if already notified (within last 24h for this event)
|
||||||
|
const dayAgo = new Date(now.getTime() - 24 * 60 * 60 * 1000);
|
||||||
|
const existing = await db.select({ id: notifications.id })
|
||||||
|
.from(notifications)
|
||||||
|
.where(and(
|
||||||
|
eq(notifications.eventId, event.id),
|
||||||
|
eq(notifications.userId, event.userId),
|
||||||
|
gte(notifications.createdAt, dayAgo),
|
||||||
|
))
|
||||||
|
.limit(1);
|
||||||
|
|
||||||
|
if (existing.length === 0) {
|
||||||
|
// Create notification
|
||||||
|
await db.insert(notifications).values({
|
||||||
|
userId: event.userId,
|
||||||
|
type: 'event_reminder',
|
||||||
|
title: daysUntil === 0 ? `Today: ${event.title}` : `${event.title} in ${daysUntil} day${daysUntil !== 1 ? 's' : ''}`,
|
||||||
|
message: `${event.title} for ${client.firstName} ${client.lastName} is ${daysUntil === 0 ? 'today' : `coming up in ${daysUntil} day${daysUntil !== 1 ? 's' : ''}`}.`,
|
||||||
|
clientId: client.id,
|
||||||
|
eventId: event.id,
|
||||||
|
});
|
||||||
|
created++;
|
||||||
|
|
||||||
|
// Also queue email reminder
|
||||||
|
if (boss) {
|
||||||
|
await boss.send('send-event-reminder', {
|
||||||
|
userId: event.userId,
|
||||||
|
eventId: event.id,
|
||||||
|
clientId: client.id,
|
||||||
|
eventTitle: event.title,
|
||||||
|
clientName: `${client.firstName} ${client.lastName}`,
|
||||||
|
daysUntil,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log(`[jobs] checkUpcomingEvents: created ${created} notifications`);
|
||||||
|
} catch (error) {
|
||||||
|
console.error('[jobs] checkUpcomingEvents error:', error);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Job: Send email reminder to advisor
|
||||||
|
async function sendEventReminder(job: PgBoss.Job<{
|
||||||
|
userId: string;
|
||||||
|
eventId: string;
|
||||||
|
clientId: string;
|
||||||
|
eventTitle: string;
|
||||||
|
clientName: string;
|
||||||
|
daysUntil: number;
|
||||||
|
}>) {
|
||||||
|
const { userId, eventTitle, clientName, daysUntil } = job.data;
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Get user email
|
||||||
|
const [user] = await db.select({ email: users.email, name: users.name })
|
||||||
|
.from(users)
|
||||||
|
.where(eq(users.id, userId))
|
||||||
|
.limit(1);
|
||||||
|
|
||||||
|
if (!user?.email) {
|
||||||
|
console.log(`[jobs] sendEventReminder: no email for user ${userId}`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const subject = daysUntil === 0
|
||||||
|
? `Reminder: ${eventTitle} is today!`
|
||||||
|
: `Reminder: ${eventTitle} in ${daysUntil} day${daysUntil !== 1 ? 's' : ''}`;
|
||||||
|
|
||||||
|
const content = `Hi ${user.name},\n\nThis is a reminder that "${eventTitle}" for ${clientName} is ${daysUntil === 0 ? 'today' : `coming up in ${daysUntil} day${daysUntil !== 1 ? 's' : ''}`}.\n\nLog in to your Network App to prepare.\n\nBest,\nThe Network App`;
|
||||||
|
|
||||||
|
await sendEmail({
|
||||||
|
to: user.email,
|
||||||
|
subject,
|
||||||
|
content,
|
||||||
|
});
|
||||||
|
|
||||||
|
console.log(`[jobs] Sent event reminder to ${user.email}: ${subject}`);
|
||||||
|
} catch (error) {
|
||||||
|
console.error(`[jobs] sendEventReminder error:`, error);
|
||||||
|
// Don't throw - email failures shouldn't retry aggressively
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user