package database import ( "apjournal/internal/database/migrations" "os" "time" "github.com/jmoiron/sqlx" // driver postgres for migrations "log/slog" "github.com/golang-migrate/migrate" _ "github.com/golang-migrate/migrate/database/postgres" bindata "github.com/golang-migrate/migrate/source/go_bindata" _ "github.com/jackc/pgx/v5/stdlib" // register pgx driver "github.com/pkg/errors" ) var log = slog.New(slog.NewJSONHandler(os.Stdout, nil)) type DB struct { Conn *sqlx.DB URI string } func (d *DB) CloseAll() error { for _, conn := range []*sqlx.DB{d.Conn} { if err := closeConn(conn); err != nil { return err } } return nil } func closeConn(conn *sqlx.DB) error { return conn.Close() } func Init(DBURI string) (*DB, error) { var result DB var err error result.Conn, err = openDBConnection(DBURI, "pgx") if err != nil { return nil, err } result.URI = DBURI if err := testConnection(result.Conn); err != nil { return nil, err } return &result, nil } func InitWithMigrate(DBURI string, up bool) (*DB, error) { var ( result DB err error ) result.Conn, err = openDBConnection(DBURI, "pgx") if err != nil { return nil, err } result.URI = DBURI if err = testConnection(result.Conn); err != nil { return nil, err } if err = result.Migrate(DBURI, up); err != nil { return nil, err } return &result, nil } func openDBConnection(dbURI, driver string) (*sqlx.DB, error) { conn, err := sqlx.Open(driver, dbURI) if err != nil { return nil, err } return conn, nil } func testConnection(conn *sqlx.DB) error { err := conn.Ping() if err != nil { return errors.Wrap(err, "can't ping database") } return nil } func (db *DB) Migrate(url string, up bool) error { source := bindata.Resource(migrations.AssetNames(), migrations.Asset) driver, err := bindata.WithInstance(source) if err != nil { return errors.WithStack(errors.WithMessage(err, "unable to instantiate driver from bindata")) } migration, err := migrate.NewWithSourceInstance("go-bindata", driver, url) if err != nil { return errors.WithStack(errors.WithMessage(err, "unable to start migration")) } if up { if err = migration.Up(); err != nil && err.Error() != "no change" { return errors.WithStack(errors.WithMessage(err, "unable to migrate up")) } } else { if err = migration.Down(); err != nil && err.Error() != "no change" { return errors.WithStack(errors.WithMessage(err, "unable to migrate down")) } } return nil } func (d *DB) PingRoutine(interval time.Duration) { ticker := time.NewTicker(interval) done := make(chan bool) for { select { case <-done: return case t := <-ticker.C: if err := testConnection(d.Conn); err != nil { log.Error("failed to ping postrges db", "error", err, "ping_at", t) // reconnect if err := closeConn(d.Conn); err != nil { log.Error("failed to close db connection", "error", err, "ping_at", t) } d.Conn, err = openDBConnection(d.URI, "pgx") if err != nil { log.Error("failed to reconnect", "error", err, "ping_at", t) } } } } }