Files
go-nkode/internal/sqlc/sqlite_queue.go
2025-08-01 10:49:46 -05:00

102 lines
2.0 KiB
Go

package sqlc
import (
"context"
"database/sql"
"errors"
"fmt"
"sync"
)
const writeBufferSize = 100
type GenericQuery func(*Queries, context.Context, any) (any, error)
type WriteTx struct {
ErrChan chan error
ReturnChan chan any
Query GenericQuery
Args any
}
type Queue struct {
Queries *Queries
Db *sql.DB
WriteQueue chan WriteTx
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
func NewQueue(ctx context.Context, sqlDb *sql.DB) (*Queue, error) {
ctx, cancel := context.WithCancel(context.Background())
sqldb := &Queue{
Queries: New(sqlDb),
Db: sqlDb,
WriteQueue: make(chan WriteTx, writeBufferSize),
ctx: ctx,
cancel: cancel,
}
return sqldb, nil
}
func (d *Queue) Start() {
d.wg.Add(1)
go func() {
// TODO: I think this might be a naive approach.
defer d.wg.Done()
for {
select {
case <-d.ctx.Done():
return
case writeTx := <-d.WriteQueue:
ret, err := writeTx.Query(d.Queries, d.ctx, writeTx.Args)
writeTx.ErrChan <- err
writeTx.ReturnChan <- ret
close(writeTx.ErrChan)
close(writeTx.ReturnChan)
}
}
}()
}
func (d *Queue) Stop() error {
d.cancel()
d.wg.Wait()
close(d.WriteQueue)
return d.Db.Close()
}
func (d *Queue) EnqueueWriteTx(queryFunc GenericQuery, args any) (any, error) {
select {
case <-d.ctx.Done():
return nil, errors.New("database is shutting down")
default:
}
errChan := make(chan error, 1)
retChan := make(chan any, 1)
writeTx := WriteTx{
Query: queryFunc,
Args: args,
ErrChan: errChan,
ReturnChan: retChan,
}
d.WriteQueue <- writeTx
err := <-errChan
val := <-retChan
return val, err
}
func OpenSqliteDb(dbPath string) (*sql.DB, error) {
sqliteDb, err := sql.Open("sqlite3", dbPath)
if err != nil {
return nil, fmt.Errorf("failed to open database: %w", err)
}
if err := sqliteDb.Ping(); err != nil {
return nil, fmt.Errorf("failed to connect to database: %w", err)
}
return sqliteDb, nil
}