From 4a42a4465c8ff5496229f8883b9d1532bf7c9cab Mon Sep 17 00:00:00 2001 From: GrailFinder Date: Sun, 7 Apr 2024 09:53:04 +0300 Subject: Feat: db connection and migrations --- internal/database/sql/main.go | 141 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 141 insertions(+) create mode 100644 internal/database/sql/main.go (limited to 'internal/database/sql/main.go') diff --git a/internal/database/sql/main.go b/internal/database/sql/main.go new file mode 100644 index 0000000..80d5f5c --- /dev/null +++ b/internal/database/sql/main.go @@ -0,0 +1,141 @@ +package database + +import ( + "apjournal/internal/database/migrations" + "os" + "time" + + "github.com/jmoiron/sqlx" + + // driver postgres for migrations + "log/slog" + + "github.com/golang-migrate/migrate" + _ "github.com/golang-migrate/migrate/database/postgres" + bindata "github.com/golang-migrate/migrate/source/go_bindata" + _ "github.com/jackc/pgx/v5/stdlib" // register pgx driver + "github.com/pkg/errors" +) + +var log = slog.New(slog.NewJSONHandler(os.Stdout, nil)) + +type DB struct { + Conn *sqlx.DB + URI string +} + +func (d *DB) CloseAll() error { + for _, conn := range []*sqlx.DB{d.Conn} { + if err := closeConn(conn); err != nil { + return err + } + } + return nil +} + +func closeConn(conn *sqlx.DB) error { + return conn.Close() +} + +func Init(DBURI string) (*DB, error) { + var result DB + var err error + result.Conn, err = openDBConnection(DBURI, "pgx") + if err != nil { + return nil, err + } + result.URI = DBURI + + if err := testConnection(result.Conn); err != nil { + return nil, err + } + return &result, nil +} + +func InitWithMigrate(DBURI string, up bool) (*DB, error) { + var ( + result DB + err error + ) + result.Conn, err = openDBConnection(DBURI, "pgx") + if err != nil { + return nil, err + } + result.URI = DBURI + + if err = testConnection(result.Conn); err != nil { + return nil, err + } + if err = result.Migrate(DBURI, up); err != nil { + return nil, err + } + return &result, nil +} + +func openDBConnection(dbURI, driver string) (*sqlx.DB, error) { + conn, err := sqlx.Open(driver, dbURI) + if err != nil { + return nil, err + } + return conn, nil +} + +func testConnection(conn *sqlx.DB) error { + err := conn.Ping() + if err != nil { + return errors.Wrap(err, "can't ping database") + } + return nil +} + +func (db *DB) Migrate(url string, up bool) error { + source := bindata.Resource(migrations.AssetNames(), migrations.Asset) + driver, err := bindata.WithInstance(source) + if err != nil { + return errors.WithStack(errors.WithMessage(err, + "unable to instantiate driver from bindata")) + } + migration, err := migrate.NewWithSourceInstance("go-bindata", + driver, url) + if err != nil { + return errors.WithStack(errors.WithMessage(err, + "unable to start migration")) + } + if up { + if err = migration.Up(); err != nil && err.Error() != "no change" { + return errors.WithStack(errors.WithMessage(err, + "unable to migrate up")) + } + } else { + if err = migration.Down(); err != nil && + err.Error() != "no change" { + return errors.WithStack(errors.WithMessage(err, + "unable to migrate down")) + } + } + return nil +} + +func (d *DB) PingRoutine(interval time.Duration) { + ticker := time.NewTicker(interval) + done := make(chan bool) + + for { + select { + case <-done: + return + case t := <-ticker.C: + if err := testConnection(d.Conn); err != nil { + log.Error("failed to ping postrges db", "error", err, "ping_at", t) + // reconnect + if err := closeConn(d.Conn); err != nil { + log.Error("failed to close db connection", "error", err, "ping_at", t) + } + d.Conn, err = openDBConnection(d.URI, "pgx") + if err != nil { + log.Error("failed to reconnect", "error", err, "ping_at", t) + } + } + } + } +} -- cgit v1.2.3