94 lines
1.7 KiB
Go
94 lines
1.7 KiB
Go
package sqlc
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
)
|
|
|
|
const writeBufferSize = 100
|
|
|
|
type SqlcGeneric func(*Queries, context.Context, any) error
|
|
|
|
type WriteTx struct {
|
|
ErrChan chan error
|
|
Query SqlcGeneric
|
|
Args interface{}
|
|
}
|
|
|
|
type Queue struct {
|
|
Queries *Queries
|
|
Db *sql.DB
|
|
WriteQueue chan WriteTx
|
|
wg sync.WaitGroup
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
func NewQueue(sqlDb *sql.DB, ctx context.Context) (*Queue, error) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
sqldb := &Queue{
|
|
Queries: New(sqlDb),
|
|
Db: sqlDb,
|
|
WriteQueue: make(chan WriteTx, writeBufferSize),
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
|
|
return sqldb, nil
|
|
}
|
|
|
|
func (d *Queue) Start() {
|
|
d.wg.Add(1)
|
|
defer d.wg.Done()
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-d.ctx.Done():
|
|
return
|
|
case writeTx := <-d.WriteQueue:
|
|
err := writeTx.Query(d.Queries, d.ctx, writeTx.Args)
|
|
writeTx.ErrChan <- err
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (d *Queue) Stop() error {
|
|
d.cancel()
|
|
d.wg.Wait()
|
|
close(d.WriteQueue)
|
|
return d.Db.Close()
|
|
}
|
|
|
|
func (d *Queue) EnqueueWriteTx(queryFunc SqlcGeneric, args any) error {
|
|
select {
|
|
case <-d.ctx.Done():
|
|
return errors.New("database is shutting down")
|
|
default:
|
|
}
|
|
|
|
errChan := make(chan error, 1)
|
|
writeTx := WriteTx{
|
|
Query: queryFunc,
|
|
Args: args,
|
|
ErrChan: errChan,
|
|
}
|
|
d.WriteQueue <- writeTx
|
|
return <-errChan
|
|
}
|
|
|
|
func OpenSqliteDb(dbPath string) (*sql.DB, error) {
|
|
sqliteDb, err := sql.Open("sqlite3", dbPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open database: %w", err)
|
|
}
|
|
|
|
if err := sqliteDb.Ping(); err != nil {
|
|
return nil, fmt.Errorf("failed to connect to database: %w", err)
|
|
}
|
|
return sqliteDb, nil
|
|
}
|