refactor nkode-core
This commit is contained in:
158
pkg/nkode-core/email/queue.go
Normal file
158
pkg/nkode-core/email/queue.go
Normal file
@@ -0,0 +1,158 @@
|
||||
package email
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/service/ses"
|
||||
"github.com/aws/aws-sdk-go-v2/service/ses/types"
|
||||
"github.com/patrickmn/go-cache"
|
||||
config2 "go-nkode/config"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Client interface {
|
||||
SendEmail(Email) error
|
||||
}
|
||||
|
||||
type Email struct {
|
||||
Sender string
|
||||
Recipient string
|
||||
Subject string
|
||||
Content string
|
||||
}
|
||||
|
||||
type TestEmailClient struct{}
|
||||
|
||||
func (c *TestEmailClient) SendEmail(email Email) error {
|
||||
fmt.Printf("Sending email to %s\n", email.Recipient)
|
||||
return nil
|
||||
}
|
||||
|
||||
type SESClient struct {
|
||||
ResetCache *cache.Cache
|
||||
}
|
||||
|
||||
const (
|
||||
emailRetryExpiration = 5 * time.Minute
|
||||
sesCleanupInterval = 10 * time.Minute
|
||||
)
|
||||
|
||||
func NewSESClient() SESClient {
|
||||
return SESClient{
|
||||
ResetCache: cache.New(emailRetryExpiration, sesCleanupInterval),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SESClient) SendEmail(email Email) error {
|
||||
if _, exists := s.ResetCache.Get(email.Recipient); exists {
|
||||
log.Printf("email already sent to %s with subject %s", email.Recipient, email.Subject)
|
||||
return config2.ErrEmailAlreadySent
|
||||
}
|
||||
|
||||
// Load AWS configuration
|
||||
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1"))
|
||||
if err != nil {
|
||||
errMsg := fmt.Sprintf("unable to load SDK config, %v", err)
|
||||
log.Print(errMsg)
|
||||
return err
|
||||
}
|
||||
|
||||
// Create an SES client
|
||||
sesClient := ses.NewFromConfig(cfg)
|
||||
|
||||
// Construct the email message
|
||||
input := &ses.SendEmailInput{
|
||||
Destination: &types.Destination{
|
||||
ToAddresses: []string{email.Recipient},
|
||||
},
|
||||
Message: &types.Message{
|
||||
Body: &types.Body{
|
||||
Html: &types.Content{
|
||||
Data: aws.String(email.Content),
|
||||
},
|
||||
},
|
||||
Subject: &types.Content{
|
||||
Data: aws.String(email.Subject),
|
||||
},
|
||||
},
|
||||
Source: aws.String(email.Sender),
|
||||
}
|
||||
|
||||
if err = s.ResetCache.Add(email.Recipient, nil, emailRetryExpiration); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Send the email
|
||||
resp, err := sesClient.SendEmail(context.TODO(), input)
|
||||
if err != nil {
|
||||
s.ResetCache.Delete(email.Recipient)
|
||||
errMsg := fmt.Sprintf("failed to send email, %v", err)
|
||||
log.Print(errMsg)
|
||||
return err
|
||||
}
|
||||
|
||||
// Output the message ID of the sent email
|
||||
fmt.Printf("UserEmail sent successfully, Message ID: %s\n", *resp.MessageId)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Queue represents the email queue with rate limiting
|
||||
type Queue struct {
|
||||
stop bool
|
||||
emailQueue chan Email // Email queue
|
||||
rateLimit <-chan time.Time // Rate limiter
|
||||
client Client // SES client to send emails
|
||||
wg sync.WaitGroup // To wait for all emails to be processed
|
||||
FailedSendCount int
|
||||
}
|
||||
|
||||
// NewEmailQueue creates a new rate-limited email queue
|
||||
func NewEmailQueue(bufferSize int, emailsPerSecond int, client Client) *Queue {
|
||||
// Create a ticker that ticks every second to limit the rate of sending emails
|
||||
rateLimit := time.Tick(time.Second / time.Duration(emailsPerSecond))
|
||||
|
||||
return &Queue{
|
||||
stop: false,
|
||||
emailQueue: make(chan Email, bufferSize),
|
||||
rateLimit: rateLimit,
|
||||
client: client,
|
||||
FailedSendCount: 0,
|
||||
}
|
||||
}
|
||||
|
||||
func (q *Queue) AddEmail(email Email) {
|
||||
if q.stop {
|
||||
log.Printf("email %s with subject %s not add. Stopping queue", email.Recipient, email.Subject)
|
||||
return
|
||||
}
|
||||
q.wg.Add(1)
|
||||
q.emailQueue <- email
|
||||
}
|
||||
|
||||
func (q *Queue) Start() {
|
||||
q.stop = false
|
||||
go func() {
|
||||
for email := range q.emailQueue {
|
||||
<-q.rateLimit
|
||||
q.sendEmail(email)
|
||||
q.wg.Done()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (q *Queue) sendEmail(email Email) {
|
||||
if err := q.client.SendEmail(email); err != nil {
|
||||
q.FailedSendCount += 1
|
||||
log.Printf("Failed to send email to %s: %v\n", email.Recipient, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (q *Queue) Stop() {
|
||||
q.stop = true
|
||||
q.wg.Wait()
|
||||
close(q.emailQueue)
|
||||
}
|
||||
Reference in New Issue
Block a user