migrate nkode-core

This commit is contained in:
2025-01-21 13:18:46 -06:00
parent 4dbb4c48c8
commit 1f10af0081
38 changed files with 4167 additions and 0 deletions

150
email/queue.go Normal file
View File

@@ -0,0 +1,150 @@
package email
import (
"context"
"fmt"
config2 "git.infra.nkode.tech/dkelly/nkode-core/config"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/ses"
"github.com/aws/aws-sdk-go-v2/service/ses/types"
"github.com/patrickmn/go-cache"
"log"
"sync"
"time"
)
type Client interface {
SendEmail(Email) error
}
type Email struct {
Sender string
Recipient string
Subject string
Content string
}
type TestEmailClient struct{}
func (c *TestEmailClient) SendEmail(email Email) error {
fmt.Printf("Sending email to %s\n", email.Recipient)
return nil
}
type SESClient struct {
ResetCache *cache.Cache
}
const (
emailRetryExpiration = 5 * time.Minute
sesCleanupInterval = 10 * time.Minute
)
func NewSESClient() SESClient {
return SESClient{
ResetCache: cache.New(emailRetryExpiration, sesCleanupInterval),
}
}
func (s *SESClient) SendEmail(email Email) error {
if _, exists := s.ResetCache.Get(email.Recipient); exists {
log.Printf("email already sent to %s with subject %s", email.Recipient, email.Subject)
return config2.ErrEmailAlreadySent
}
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1"))
if err != nil {
errMsg := fmt.Sprintf("unable to load SDK config, %v", err)
log.Print(errMsg)
return err
}
sesClient := ses.NewFromConfig(cfg)
input := &ses.SendEmailInput{
Destination: &types.Destination{
ToAddresses: []string{email.Recipient},
},
Message: &types.Message{
Body: &types.Body{
Html: &types.Content{
Data: aws.String(email.Content),
},
},
Subject: &types.Content{
Data: aws.String(email.Subject),
},
},
Source: aws.String(email.Sender),
}
if err = s.ResetCache.Add(email.Recipient, nil, emailRetryExpiration); err != nil {
return err
}
resp, err := sesClient.SendEmail(context.TODO(), input)
if err != nil {
s.ResetCache.Delete(email.Recipient)
errMsg := fmt.Sprintf("failed to send email, %v", err)
log.Print(errMsg)
return err
}
fmt.Printf("UserEmail sent successfully, Message ID: %s\n", *resp.MessageId)
return nil
}
type Queue struct {
stop bool
emailQueue chan Email
rateLimit <-chan time.Time
client Client
wg sync.WaitGroup
FailedSendCount int
}
func NewEmailQueue(bufferSize int, emailsPerSecond int, client Client) *Queue {
rateLimit := time.Tick(time.Second / time.Duration(emailsPerSecond))
return &Queue{
stop: false,
emailQueue: make(chan Email, bufferSize),
rateLimit: rateLimit,
client: client,
FailedSendCount: 0,
}
}
func (q *Queue) AddEmail(email Email) {
if q.stop {
log.Printf("email %s with subject %s not add. Stopping queue", email.Recipient, email.Subject)
return
}
q.wg.Add(1)
q.emailQueue <- email
}
func (q *Queue) Start() {
q.stop = false
go func() {
for email := range q.emailQueue {
<-q.rateLimit
q.sendEmail(email)
q.wg.Done()
}
}()
}
func (q *Queue) sendEmail(email Email) {
if err := q.client.SendEmail(email); err != nil {
q.FailedSendCount += 1
log.Printf("Failed to send email to %s: %v\n", email.Recipient, err)
}
}
func (q *Queue) Stop() {
q.stop = true
q.wg.Wait()
close(q.emailQueue)
}

29
email/queue_test.go Normal file
View File

@@ -0,0 +1,29 @@
package email
import (
"fmt"
"github.com/stretchr/testify/assert"
"testing"
)
func TestEmailQueue(t *testing.T) {
queue := NewEmailQueue(100, 14, &TestEmailClient{})
// Start the queue processing
queue.Start()
// Enqueue some emails
for i := 1; i <= 28; i++ {
email := Email{
Sender: "test@example.com",
Recipient: fmt.Sprintf("user%d@example.com", i),
Subject: "test subject",
Content: "This is a test email",
}
queue.AddEmail(email)
}
// Stop the queue after all emails are processed
queue.Stop()
assert.Equal(t, queue.FailedSendCount, 0)
}