implement email queue

This commit is contained in:
2024-10-06 09:57:29 -05:00
parent a95c0ed9b0
commit bb915f8f0a
19 changed files with 270 additions and 46 deletions

View File

@@ -10,7 +10,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/ses/types"
)
func ResetUserEmail(userEmail Email, customerId CustomerId) error {
func ResetUserEmail(userEmail UserEmail, customerId CustomerId) error {
// Load AWS configuration
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1"))
if err != nil {
@@ -29,7 +29,7 @@ func ResetUserEmail(userEmail Email, customerId CustomerId) error {
// Define email subject and body
subject := "nKode Reset"
htmlBody := fmt.Sprintf("<h1>Hello!</h1><p>Click the link to reset your nKode.</p><a href=\"http://%s?token=%s\">Reset nKode</a>", FrontendHost, nkodeResetJwt)
htmlBody := fmt.Sprintf("<h1>Hello!</h1><p>Click the link to reset your nKode.</p><a href=\"%s?token=%s\">Reset nKode</a>", FrontendHost, nkodeResetJwt)
// Construct the email message
input := &ses.SendEmailInput{
@@ -56,6 +56,6 @@ func ResetUserEmail(userEmail Email, customerId CustomerId) error {
}
// Output the message ID of the sent email
fmt.Printf("Email sent successfully, Message ID: %s\n", *resp.MessageId)
fmt.Printf("UserEmail sent successfully, Message ID: %s\n", *resp.MessageId)
return nil
}

View File

@@ -1,6 +1,5 @@
package core
const (
BackendHost = "localhost:8080"
FrontendHost = "localhost:8090"
FrontendHost = "https://nkode.tech"
)

152
core/email_queue.go Normal file
View File

@@ -0,0 +1,152 @@
package core
import (
"context"
"errors"
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/ses"
"github.com/aws/aws-sdk-go-v2/service/ses/types"
"github.com/patrickmn/go-cache"
"log"
"sync"
"time"
)
type EmailClient interface {
SendEmail(Email) error
}
// Email represents a dummy email structure
type Email struct {
Sender string
Recipient string
Subject string
Content string
}
type TestEmailClient struct{}
// SendEmail simulates sending an email via AWS SES
func (c *TestEmailClient) SendEmail(email Email) error {
// Simulate sending email (replace with actual AWS SES API call)
fmt.Printf("Sending email to %s\n", email.Recipient)
return nil
}
type SESClient struct {
ResetCache *cache.Cache
}
const (
defaultExpiration = 5 * time.Minute
cleanupInterval = 10 * time.Minute
)
func NewSESClient() SESClient {
return SESClient{
ResetCache: cache.New(defaultExpiration, cleanupInterval),
}
}
func (s *SESClient) SendEmail(email Email) error {
if _, exists := s.ResetCache.Get(email.Recipient); exists {
return fmt.Errorf("email already sent to %s with subject %s", email.Recipient, email.Subject)
}
// Load AWS configuration
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1"))
if err != nil {
errMsg := fmt.Sprintf("unable to load SDK config, %v", err)
log.Print(errMsg)
return errors.New(errMsg)
}
// Create an SES client
sesClient := ses.NewFromConfig(cfg)
// Construct the email message
input := &ses.SendEmailInput{
Destination: &types.Destination{
ToAddresses: []string{email.Recipient},
},
Message: &types.Message{
Body: &types.Body{
Html: &types.Content{
Data: aws.String(email.Content),
},
},
Subject: &types.Content{
Data: aws.String(email.Subject),
},
},
Source: aws.String(email.Sender),
}
// Send the email
resp, err := sesClient.SendEmail(context.TODO(), input)
if err != nil {
return fmt.Errorf("failed to send email, %v", err)
}
// Output the message ID of the sent email
fmt.Printf("UserEmail sent successfully, Message ID: %s\n", *resp.MessageId)
return nil
}
// EmailQueue represents the email queue with rate limiting
type EmailQueue struct {
emailQueue chan Email // Email queue
rateLimit <-chan time.Time // Rate limiter
client EmailClient // SES client to send emails
wg sync.WaitGroup // To wait for all emails to be processed
FailedSendCount int
}
// NewEmailQueue creates a new rate-limited email queue
func NewEmailQueue(bufferSize int, emailsPerSecond int, client EmailClient) *EmailQueue {
// Create a ticker that ticks every second to limit the rate of sending emails
rateLimit := time.Tick(time.Second / time.Duration(emailsPerSecond))
return &EmailQueue{
emailQueue: make(chan Email, bufferSize),
rateLimit: rateLimit,
client: client,
FailedSendCount: 0,
}
}
// AddEmail queues a new email to be sent
func (q *EmailQueue) AddEmail(email Email) {
q.wg.Add(1)
q.emailQueue <- email
}
// Start begins processing the email queue with rate limiting
func (q *EmailQueue) Start() {
// Worker goroutine that processes emails from the queue
go func() {
for email := range q.emailQueue {
<-q.rateLimit // Wait for the rate limiter to allow the next email
q.sendEmail(email)
q.wg.Done() // Mark the email as processed
}
}()
}
// sendEmail sends an email using the SES client
func (q *EmailQueue) sendEmail(email Email) {
if err := q.client.SendEmail(email); err != nil {
q.FailedSendCount += 1
log.Printf("Failed to send email to %s: %v\n", email.Recipient, err)
}
}
// Stop stops the queue after all emails have been processed
func (q *EmailQueue) Stop() {
// Wait for all emails to be processed
q.wg.Wait()
// Close the email queue
close(q.emailQueue)
}

29
core/email_queue_test.go Normal file
View File

@@ -0,0 +1,29 @@
package core
import (
"fmt"
"github.com/stretchr/testify/assert"
"testing"
)
func TestEmailQueue(t *testing.T) {
queue := NewEmailQueue(100, 14, &TestEmailClient{})
// Start the queue processing
queue.Start()
// Enqueue some emails
for i := 1; i <= 28; i++ {
email := Email{
Sender: "test@example.com",
Recipient: fmt.Sprintf("user%d@example.com", i),
Subject: "test subject",
Content: "This is a test email",
}
queue.AddEmail(email)
}
// Stop the queue after all emails are processed
queue.Stop()
assert.Equal(t, queue.FailedSendCount, 0)
}

View File

@@ -27,7 +27,7 @@ func (db *InMemoryDb) GetCustomer(id CustomerId) (*Customer, error) {
return &customer, nil
}
func (db *InMemoryDb) GetUser(username Email, customerId CustomerId) (*User, error) {
func (db *InMemoryDb) GetUser(username UserEmail, customerId CustomerId) (*User, error) {
key := userIdKey(customerId, username)
userId, exists := db.userIdMap[key]
if !exists {
@@ -129,7 +129,7 @@ func (db *InMemoryDb) GetSvgStringInterface(idxs SvgIdInterface) ([]string, erro
return make([]string, len(idxs)), nil
}
func userIdKey(customerId CustomerId, username Email) string {
func userIdKey(customerId CustomerId, username UserEmail) string {
key := fmt.Sprintf("%s:%s", customerId, username)
return key
}

View File

@@ -116,7 +116,7 @@ func ClaimExpired(claims jwt.RegisteredClaims) error {
return errors.New("claim expired")
}
func ResetNKodeToken(userEmail Email, customerId CustomerId) (string, error) {
func ResetNKodeToken(userEmail UserEmail, customerId CustomerId) (string, error) {
resetClaims := ResetNKodeClaims{
true,
jwt.RegisteredClaims{

View File

@@ -19,7 +19,7 @@ func TestJwtClaims(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, refreshToken.Subject, email)
assert.NoError(t, ClaimExpired(*refreshToken))
resetNKode, err := ResetNKodeToken(Email(email), customerId)
resetNKode, err := ResetNKodeToken(UserEmail(email), customerId)
assert.NoError(t, err)
resetToken, err := ParseRestNKodeToken(resetNKode)
assert.NoError(t, err)

View File

@@ -4,17 +4,20 @@ import (
"errors"
"fmt"
"github.com/google/uuid"
"os"
)
type NKodeAPI struct {
Db DbAccessor
SignupSessions map[SessionId]UserSignSession
EmailQueue *EmailQueue
}
func NewNKodeAPI(db DbAccessor) NKodeAPI {
func NewNKodeAPI(db DbAccessor, queue *EmailQueue) NKodeAPI {
return NKodeAPI{
Db: db,
SignupSessions: make(map[SessionId]UserSignSession),
EmailQueue: queue,
}
}
@@ -34,7 +37,7 @@ func (n *NKodeAPI) CreateNewCustomer(nkodePolicy NKodePolicy, id *CustomerId) (*
return &newCustomer.Id, nil
}
func (n *NKodeAPI) GenerateSignupResetInterface(userEmail Email, customerId CustomerId, kp KeypadDimension, reset bool) (*GenerateSignupResetInterfaceResp, error) {
func (n *NKodeAPI) GenerateSignupResetInterface(userEmail UserEmail, customerId CustomerId, kp KeypadDimension, reset bool) (*GenerateSignupResetInterfaceResp, error) {
svgIdxInterface, err := n.Db.RandomSvgIdxInterface(kp)
if err != nil {
return nil, err
@@ -104,7 +107,7 @@ func (n *NKodeAPI) ConfirmNKode(customerId CustomerId, sessionId SessionId, keyS
return err
}
func (n *NKodeAPI) GetLoginInterface(userEmail Email, customerId CustomerId) (*GetLoginInterfaceResp, error) {
func (n *NKodeAPI) GetLoginInterface(userEmail UserEmail, customerId CustomerId) (*GetLoginInterfaceResp, error) {
user, err := n.Db.GetUser(userEmail, customerId)
if err != nil {
return nil, err
@@ -133,7 +136,7 @@ func (n *NKodeAPI) GetLoginInterface(userEmail Email, customerId CustomerId) (*G
return &resp, nil
}
func (n *NKodeAPI) Login(customerId CustomerId, userEmail Email, keySelection KeySelection) (*AuthenticationTokens, error) {
func (n *NKodeAPI) Login(customerId CustomerId, userEmail UserEmail, keySelection KeySelection) (*AuthenticationTokens, error) {
customer, err := n.Db.GetCustomer(customerId)
if err != nil {
return nil, err
@@ -179,7 +182,7 @@ func (n *NKodeAPI) GetSvgStringInterface(svgId SvgIdInterface) ([]string, error)
return n.Db.GetSvgStringInterface(svgId)
}
func (n *NKodeAPI) RefreshToken(userEmail Email, customerId CustomerId, refreshToken string) (string, error) {
func (n *NKodeAPI) RefreshToken(userEmail UserEmail, customerId CustomerId, refreshToken string) (string, error) {
user, err := n.Db.GetUser(userEmail, customerId)
if err != nil {
return "", err
@@ -201,7 +204,7 @@ func (n *NKodeAPI) RefreshToken(userEmail Email, customerId CustomerId, refreshT
return EncodeAndSignClaims(newAccessClaims)
}
func (n *NKodeAPI) ResetNKode(userEmail Email, customerId CustomerId) error {
func (n *NKodeAPI) ResetNKode(userEmail UserEmail, customerId CustomerId) error {
user, err := n.Db.GetUser(userEmail, customerId)
if err != nil {
return fmt.Errorf("error getting user in rest nkode %v", err)
@@ -209,5 +212,22 @@ func (n *NKodeAPI) ResetNKode(userEmail Email, customerId CustomerId) error {
if user == nil {
return nil
}
return ResetUserEmail(userEmail, customerId)
nkodeResetJwt, err := ResetNKodeToken(userEmail, customerId)
if err != nil {
return errors.New(fmt.Sprintf("unable to load SDK config, %v", err))
}
frontendHost := os.Getenv("FRONTEND_HOST")
if frontendHost == "" {
frontendHost = FrontendHost
}
htmlBody := fmt.Sprintf("<h1>Hello!</h1><p>Click the link to reset your nKode.</p><a href=\"%s?token=%s\">Reset nKode</a>", frontendHost, nkodeResetJwt)
email := Email{
Sender: "no-reply@nkode.tech",
Recipient: string(userEmail),
Subject: "nKode Reset",
Content: htmlBody,
}
n.EmailQueue.AddEmail(email)
return nil
}

View File

@@ -25,14 +25,20 @@ func TestNKodeAPI(t *testing.T) {
}
func testNKodeAPI(t *testing.T, db DbAccessor) {
bufferSize := 100
emailsPerSec := 14
testClient := TestEmailClient{}
queue := NewEmailQueue(bufferSize, emailsPerSec, &testClient)
queue.Start()
defer queue.Stop()
attrsPerKey := 5
numbOfKeys := 4
for idx := 0; idx < 1; idx++ {
userEmail := Email("test_username" + util.GenerateRandomString(12) + "@example.com")
userEmail := UserEmail("test_username" + util.GenerateRandomString(12) + "@example.com")
passcodeLen := 4
nkodePolicy := NewDefaultNKodePolicy()
keypadSize := KeypadDimension{AttrsPerKey: attrsPerKey, NumbOfKeys: numbOfKeys}
nkodeApi := NewNKodeAPI(db)
nkodeApi := NewNKodeAPI(db, queue)
customerId, err := nkodeApi.CreateNewCustomer(nkodePolicy, nil)
assert.NoError(t, err)
signupResponse, err := nkodeApi.GenerateSignupResetInterface(userEmail, *customerId, keypadSize, false)

View File

@@ -57,7 +57,7 @@ VALUES (?)
}
func MakeSvgFiles() string {
jsonFiles, err := GetAllFiles("./sqlite-init/json")
jsonFiles, err := GetAllFiles("./core//sqlite-init/json")
if err != nil {
log.Fatalf("Error getting JSON files: %v", err)
}
@@ -158,11 +158,16 @@ CREATE TABLE IF NOT EXISTS customer (
expiration INTEGER NOT NULL,
attribute_values BLOB NOT NULL,
set_values BLOB NOT NULL
-- created_at TEXT NOT NULL,
-- last_renew TEXT NOT NULL,
);
CREATE TABLE IF NOT EXISTS user (
id TEXT NOT NULL PRIMARY KEY,
username TEXT NOT NULL,
-- email TEXT NOT NULL,
-- first_name TEXT NOT NULL,
-- last_name TEXT NOT NULL,
renew INT NOT NULL,
refresh_token TEXT,
customer_id TEXT NOT NULL,
@@ -187,6 +192,8 @@ CREATE TABLE IF NOT EXISTS user (
idx_interface BLOB NOT NULL,
svg_id_interface BLOB NOT NULL,
-- created_at TEXT NOT NULL,
-- last_login TEXT NOT NULL,
FOREIGN KEY (customer_id) REFERENCES customers(id),
UNIQUE(customer_id, username)

View File

@@ -186,7 +186,7 @@ func (d *SqliteDB) GetCustomer(id CustomerId) (*Customer, error) {
return &customer, nil
}
func (d *SqliteDB) GetUser(username Email, customerId CustomerId) (*User, error) {
func (d *SqliteDB) GetUser(username UserEmail, customerId CustomerId) (*User, error) {
tx, err := d.db.Begin()
if err != nil {
return nil, err

View File

@@ -40,7 +40,7 @@ func testSignupLoginRenew(t *testing.T, db DbAccessor) {
assert.NoError(t, err)
err = db.WriteNewUser(*userOrig)
assert.NoError(t, err)
user, err := db.GetUser(Email(username), customer.Id)
user, err := db.GetUser(UserEmail(username), customer.Id)
assert.NoError(t, err)
assert.Equal(t, userOrig, user)

View File

@@ -94,14 +94,14 @@ func CustomerIdToString(customerId CustomerId) string {
type SessionId uuid.UUID
type UserId uuid.UUID
type Email string
type UserEmail string
func ParseEmail(email string) (Email, error) {
func ParseEmail(email string) (UserEmail, error) {
_, err := mail.ParseAddress(email)
if err != nil {
return "", err
}
return Email(email), err
return UserEmail(email), err
}
@@ -124,7 +124,7 @@ type EncipheredNKode struct {
type DbAccessor interface {
GetCustomer(CustomerId) (*Customer, error)
GetUser(Email, CustomerId) (*User, error)
GetUser(UserEmail, CustomerId) (*User, error)
WriteNewCustomer(Customer) error
WriteNewUser(User) error
UpdateUserNKode(User) error

View File

@@ -10,7 +10,7 @@ import (
type User struct {
Id UserId
CustomerId CustomerId
Email Email
Email UserEmail
EncipheredPasscode EncipheredNKode
Kp KeypadDimension
CipherKeys UserCipherKeys
@@ -136,7 +136,7 @@ func NewUser(customer Customer, userEmail string, passcodeIdx []int, ui UserInte
}
newUser := User{
Id: UserId(uuid.New()),
Email: Email(userEmail),
Email: UserEmail(userEmail),
EncipheredPasscode: *encipheredNKode,
CipherKeys: *newKeys,
Interface: ui,

View File

@@ -17,12 +17,12 @@ type UserSignSession struct {
SetIdxInterface IdxInterface
ConfirmIdxInterface IdxInterface
SetKeySelection KeySelection
UserEmail Email
UserEmail UserEmail
Reset bool
Expire int
}
func NewSignupResetSession(userEmail Email, kp KeypadDimension, customerId CustomerId, svgInterface SvgIdInterface, reset bool) (*UserSignSession, error) {
func NewSignupResetSession(userEmail UserEmail, kp KeypadDimension, customerId CustomerId, svgInterface SvgIdInterface, reset bool) (*UserSignSession, error) {
loginInterface, err := NewUserInterface(&kp, svgInterface)
if err != nil {
return nil, err

1
go.mod
View File

@@ -31,6 +31,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang-jwt/jwt/v5 v5.2.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

2
go.sum
View File

@@ -40,6 +40,8 @@ github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHW
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=

View File

@@ -8,10 +8,17 @@ import (
"net/http"
)
const (
emailQueueBufferSize = 100
maxEmailsPerSecond = 13 // SES allows 14 but I don't want to push it
)
func main() {
db := core.NewSqliteDB("nkode.db")
defer db.CloseDb()
nkodeApi := core.NewNKodeAPI(db)
sesClient := core.NewSESClient()
emailQueue := core.NewEmailQueue(emailQueueBufferSize, maxEmailsPerSecond, &sesClient)
nkodeApi := core.NewNKodeAPI(db, emailQueue)
AddDefaultCustomer(nkodeApi)
handler := core.NKodeHandler{Api: nkodeApi}
mux := http.NewServeMux()

View File

@@ -1,7 +1,8 @@
package core
package test
import (
"github.com/stretchr/testify/assert"
"go-nkode/core"
"testing"
)
@@ -12,27 +13,27 @@ func TestCustomer(t *testing.T) {
}
func testNewCustomerAttributes(t *testing.T) {
_, nil := NewCustomerAttributes()
_, nil := core.NewCustomerAttributes()
assert.NoError(t, nil)
}
func testCustomerValidKeyEntry(t *testing.T) {
kp := KeypadDimension{AttrsPerKey: 10, NumbOfKeys: 9}
nkodePolicy := NewDefaultNKodePolicy()
customer, err := NewCustomer(nkodePolicy)
kp := core.KeypadDimension{AttrsPerKey: 10, NumbOfKeys: 9}
nkodePolicy := core.NewDefaultNKodePolicy()
customer, err := core.NewCustomer(nkodePolicy)
assert.NoError(t, err)
mockSvgInterface := make(SvgIdInterface, kp.TotalAttrs())
userInterface, err := NewUserInterface(&kp, mockSvgInterface)
mockSvgInterface := make(core.SvgIdInterface, kp.TotalAttrs())
userInterface, err := core.NewUserInterface(&kp, mockSvgInterface)
assert.NoError(t, err)
userEmail := "testing@example.com"
passcodeIdx := []int{0, 1, 2, 3}
user, err := NewUser(*customer, userEmail, passcodeIdx, *userInterface, kp)
user, err := core.NewUser(*customer, userEmail, passcodeIdx, *userInterface, kp)
assert.NoError(t, err)
userLoginInterface, err := user.GetLoginInterface()
assert.NoError(t, err)
selectedKeys, err := SelectKeyByAttrIdx(userLoginInterface, passcodeIdx, kp)
selectedKeys, err := core.SelectKeyByAttrIdx(userLoginInterface, passcodeIdx, kp)
assert.NoError(t, err)
validatedPasscode, err := ValidKeyEntry(*user, *customer, selectedKeys)
validatedPasscode, err := core.ValidKeyEntry(*user, *customer, selectedKeys)
assert.NoError(t, err)
assert.Equal(t, len(validatedPasscode), len(passcodeIdx))
for idx := range validatedPasscode {
@@ -41,16 +42,16 @@ func testCustomerValidKeyEntry(t *testing.T) {
}
func testCustomerIsValidNKode(t *testing.T) {
kp := KeypadDimension{AttrsPerKey: 10, NumbOfKeys: 7}
nkodePolicy := NewDefaultNKodePolicy()
customer, err := NewCustomer(nkodePolicy)
kp := core.KeypadDimension{AttrsPerKey: 10, NumbOfKeys: 7}
nkodePolicy := core.NewDefaultNKodePolicy()
customer, err := core.NewCustomer(nkodePolicy)
assert.NoError(t, err)
mockSvgInterface := make(SvgIdInterface, kp.TotalAttrs())
userInterface, err := NewUserInterface(&kp, mockSvgInterface)
mockSvgInterface := make(core.SvgIdInterface, kp.TotalAttrs())
userInterface, err := core.NewUserInterface(&kp, mockSvgInterface)
assert.NoError(t, err)
userEmail := "testing123@example.com"
passcodeIdx := []int{0, 1, 2, 3}
user, err := NewUser(*customer, userEmail, passcodeIdx, *userInterface, kp)
user, err := core.NewUser(*customer, userEmail, passcodeIdx, *userInterface, kp)
assert.NoError(t, err)
err = customer.IsValidNKode(user.Kp, passcodeIdx)
assert.NoError(t, err)