From 5253251d285d308d46482fa6d6ee65a1641883ff Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Thu, 18 Jan 2024 17:26:23 +0100 Subject: [PATCH 1/5] Introduce `QueryBuilder` type --- database/query_builder.go | 244 ++++++++++++++++++++++++++++++++ database/query_builder_test.go | 247 +++++++++++++++++++++++++++++++++ 2 files changed, 491 insertions(+) create mode 100644 database/query_builder.go create mode 100644 database/query_builder_test.go diff --git a/database/query_builder.go b/database/query_builder.go new file mode 100644 index 00000000..b87ccd1f --- /dev/null +++ b/database/query_builder.go @@ -0,0 +1,244 @@ +package database + +import ( + "fmt" + "golang.org/x/exp/slices" + "reflect" + "sort" + "strings" +) + +// QueryBuilder is an addon for the [DB] type that takes care of all the database statement building shenanigans. +// The recommended use of QueryBuilder is to only use it to generate a single query at a time and not two different +// ones. If for instance you want to generate `INSERT` and `SELECT` queries, it is best to use two different +// QueryBuilder instances. You can use the DB#QueryBuilder() method to get fully initialised instances each time. +type QueryBuilder struct { + subject any + columns []string + excludedColumns []string + + // Indicates whether the generated columns should be sorted in ascending order before generating the + // actual statements. This is intended for unit tests only and shouldn't be necessary for production code. + sort bool +} + +// SetColumns sets the DB columns to be used when building the statements. +// When you do not want the columns to be extracted dynamically, you can use this method to specify them manually. +// Returns the current *[QueryBuilder] receiver and allows you to chain some method calls. +func (qb *QueryBuilder) SetColumns(columns ...string) *QueryBuilder { + qb.columns = columns + return qb +} + +// SetExcludedColumns excludes the given columns from all the database statements. +// Returns the current *[QueryBuilder] receiver and allows you to chain some method calls. +func (qb *QueryBuilder) SetExcludedColumns(columns ...string) *QueryBuilder { + qb.excludedColumns = columns + return qb +} + +// Delete returns a DELETE statement for the query builders subject filtered by ID. +func (qb *QueryBuilder) Delete() string { + return qb.DeleteBy("id") +} + +// DeleteBy returns a DELETE statement for the query builders subject filtered by the given column. +func (qb *QueryBuilder) DeleteBy(column string) string { + return fmt.Sprintf(`DELETE FROM "%s" WHERE "%s" IN (?)`, TableName(qb.subject), column) +} + +// Insert returns an INSERT INTO statement for the query builders subject. +func (qb *QueryBuilder) Insert(db *DB) (string, int) { + columns := qb.BuildColumns(db) + + return fmt.Sprintf( + `INSERT INTO "%s" ("%s") VALUES (:%s)`, + TableName(qb.subject), + strings.Join(columns, `", "`), + strings.Join(columns, ", :"), + ), len(columns) +} + +// InsertIgnore returns an INSERT statement for the query builders subject for +// which the database ignores rows that have already been inserted. +func (qb *QueryBuilder) InsertIgnore(db *DB) (string, int) { + columns := qb.BuildColumns(db) + + var clause string + switch db.DriverName() { + case MySQL: + // MySQL treats UPDATE id = id as a no-op. + clause = fmt.Sprintf(`ON DUPLICATE KEY UPDATE "%[1]s" = "%[1]s"`, columns[0]) + case PostgreSQL: + clause = fmt.Sprintf("ON CONFLICT ON CONSTRAINT %s DO NOTHING", qb.getPgsqlOnConflictConstraint()) + default: + panic("Driver unsupported: " + db.DriverName()) + } + + return fmt.Sprintf( + `INSERT INTO "%s" ("%s") VALUES (:%s) %s`, + TableName(qb.subject), + strings.Join(columns, `", "`), + strings.Join(columns, ", :"), + clause, + ), len(columns) +} + +// Select returns a SELECT statement from the query builders subject and the already set columns. +// If no columns are set, they will be extracted from the query builders subject. +// When the query builders subject is of type Scoper, a WHERE clause is appended to the statement. +func (qb *QueryBuilder) Select(db *DB) string { + var scoper Scoper + if sc, ok := qb.subject.(Scoper); ok { + scoper = sc + } + + return qb.SelectScoped(db, scoper) +} + +// SelectScoped returns a SELECT statement from the query builders subject and the already set columns filtered +// by the given scoper/column. When no columns are set, they will be extracted from the query builders subject. +// The argument scoper must either be of type Scoper, string or nil to get SELECT statements without a WHERE clause. +func (qb *QueryBuilder) SelectScoped(db *DB, scoper any) string { + query := fmt.Sprintf(`SELECT "%s" FROM "%s"`, strings.Join(qb.BuildColumns(db), `", "`), TableName(qb.subject)) + where, placeholders := qb.Where(db, scoper) + if placeholders > 0 { + query += ` WHERE ` + where + } + + return query +} + +// Update returns an UPDATE statement for the query builders subject filter by ID column. +func (qb *QueryBuilder) Update(db *DB) (string, int) { + return qb.UpdateScoped(db, "id") +} + +// UpdateScoped returns an UPDATE statement for the query builders subject filtered by the given column/scoper. +// The argument scoper must either be of type Scoper, string or nil to get UPDATE statements without a WHERE clause. +func (qb *QueryBuilder) UpdateScoped(db *DB, scoper any) (string, int) { + columns := qb.BuildColumns(db) + set := make([]string, 0, len(columns)) + + for _, col := range columns { + set = append(set, fmt.Sprintf(`"%[1]s" = :%[1]s`, col)) + } + + placeholders := len(columns) + query := `UPDATE "%s" SET %s` + if where, count := qb.Where(db, scoper); count > 0 { + placeholders += count + query += ` WHERE ` + where + } + + return fmt.Sprintf(query, TableName(qb.subject), strings.Join(set, ", ")), placeholders +} + +// Upsert returns an upsert statement for the query builders subject. +func (qb *QueryBuilder) Upsert(db *DB) (string, int) { + var updateColumns []string + if upserter, ok := qb.subject.(Upserter); ok { + updateColumns = db.columnMap.Columns(upserter.Upsert()) + } else { + updateColumns = qb.BuildColumns(db) + } + + return qb.UpsertColumns(db, updateColumns...) +} + +// UpsertColumns returns an upsert statement for the query builders subject and the specified update columns. +func (qb *QueryBuilder) UpsertColumns(db *DB, updateColumns ...string) (string, int) { + var clause, setFormat string + switch db.DriverName() { + case MySQL: + clause = "ON DUPLICATE KEY UPDATE" + setFormat = `"%[1]s" = VALUES("%[1]s")` + case PostgreSQL: + clause = fmt.Sprintf("ON CONFLICT ON CONSTRAINT %s DO UPDATE SET", qb.getPgsqlOnConflictConstraint()) + setFormat = `"%[1]s" = EXCLUDED."%[1]s"` + default: + panic("Driver unsupported: " + db.DriverName()) + } + + set := make([]string, 0, len(updateColumns)) + for _, col := range updateColumns { + set = append(set, fmt.Sprintf(setFormat, col)) + } + + insertColumns := qb.BuildColumns(db) + + return fmt.Sprintf( + `INSERT INTO "%s" ("%s") VALUES (:%s) %s %s`, + TableName(qb.subject), + strings.Join(insertColumns, `", "`), + strings.Join(insertColumns, ", :"), + clause, + strings.Join(set, ", "), + ), len(insertColumns) +} + +// Where returns a WHERE clause with named placeholder conditions built from the +// specified scoper/column combined with the AND operator. +func (qb *QueryBuilder) Where(db *DB, subject any) (string, int) { + t := reflect.TypeOf(subject) + if t == nil { // Subject is a nil interface value. + return "", 0 + } + + var columns []string + if t.Kind() == reflect.String { + columns = []string{subject.(string)} + } else if t.Kind() == reflect.Struct || t.Kind() == reflect.Pointer { + if scoper, ok := subject.(Scoper); ok { + return qb.Where(db, scoper.Scope()) + } + + columns = db.columnMap.Columns(subject) + } else { // This should never happen unless someone wants to do some silly things. + panic(fmt.Sprintf("qb.Where: unknown subject type provided: %q", t.Kind().String())) + } + + where := make([]string, 0, len(columns)) + for _, col := range columns { + where = append(where, fmt.Sprintf(`"%[1]s" = :%[1]s`, col)) + } + + return strings.Join(where, ` AND `), len(columns) +} + +// BuildColumns returns all the Query Builder columns (if specified), otherwise they are +// determined dynamically using its subject. Additionally, it checks whether columns need +// to be excluded and proceeds accordingly. +func (qb *QueryBuilder) BuildColumns(db *DB) []string { + var columns []string + if len(qb.columns) > 0 { + columns = qb.columns + } else { + columns = db.columnMap.Columns(qb.subject) + } + + if len(qb.excludedColumns) > 0 { + columns = slices.DeleteFunc(slices.Clone(columns), func(column string) bool { + return slices.Contains(qb.excludedColumns, column) + }) + } + + if qb.sort { + // The order in which the columns appear is not guaranteed as we extract the columns dynamically + // from the struct. So, we've to sort them here to be able to test the generated statements. + sort.Strings(columns) + } + + return slices.Clip(columns) +} + +// getPgsqlOnConflictConstraint returns the constraint name of the current [QueryBuilder]'s subject. +// If the subject does not implement the PgsqlOnConflictConstrainter interface, it will simply return +// the table name prefixed with `pk_`. +func (qb *QueryBuilder) getPgsqlOnConflictConstraint() string { + if constrainter, ok := qb.subject.(PgsqlOnConflictConstrainter); ok { + return constrainter.PgsqlOnConflictConstraint() + } + + return "pk_" + TableName(qb.subject) +} diff --git a/database/query_builder_test.go b/database/query_builder_test.go new file mode 100644 index 00000000..bcedee9c --- /dev/null +++ b/database/query_builder_test.go @@ -0,0 +1,247 @@ +package database + +import ( + "github.com/creasty/defaults" + "github.com/icinga/icinga-go-library/logging" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + "testing" + "time" +) + +func TestQueryBuilder(t *testing.T) { + t.Parallel() + + t.Run("MySQL", func(t *testing.T) { + t.Parallel() + + c := &Config{} + require.NoError(t, defaults.Set(c), "applying config default should not fail") + + db, err := NewDbFromConfig(c, logging.NewLogger(zaptest.NewLogger(t).Sugar(), time.Hour), RetryConnectorCallbacks{}) + require.NoError(t, err) + require.Equal(t, MySQL, db.DriverName()) + runTests(t, db) + }) + + t.Run("PostgreSQL", func(t *testing.T) { + t.Parallel() + + c := &Config{Type: "pgsql"} + require.NoError(t, defaults.Set(c), "applying config default should not fail") + + db, err := NewDbFromConfig(c, logging.NewLogger(zaptest.NewLogger(t).Sugar(), time.Hour), RetryConnectorCallbacks{}) + require.NoError(t, err) + require.Equal(t, PostgreSQL, db.DriverName()) + runTests(t, db) + + t.Run("OnConflictConstrainter", func(t *testing.T) { + t.Parallel() + + qb := &QueryBuilder{subject: &pgsqlConstraintName{}} + qb.SetColumns("a") + + stmt, columns := qb.Upsert(db) + assert.Equal(t, 1, columns) + assert.Equal(t, `INSERT INTO "test" ("a") VALUES (:a) ON CONFLICT ON CONSTRAINT idx_custom_constraint DO UPDATE SET "a" = EXCLUDED."a"`, stmt) + + stmt, columns = qb.InsertIgnore(db) + assert.Equal(t, 1, columns) + assert.Equal(t, `INSERT INTO "test" ("a") VALUES (:a) ON CONFLICT ON CONSTRAINT idx_custom_constraint DO NOTHING`, stmt) + }) + }) +} + +func runTests(t *testing.T, db *DB) { + t.Run("SetColumns", func(t *testing.T) { + t.Parallel() + + qb := &QueryBuilder{subject: "test"} + qb.SetColumns("a", "b") + assert.Equal(t, []string{"a", "b"}, qb.columns) + }) + + t.Run("ExcludeColumns", func(t *testing.T) { + t.Parallel() + + qb := &QueryBuilder{subject: &test{}} + qb.SetExcludedColumns("a", "b") + assert.Equal(t, []string{"a", "b"}, qb.excludedColumns) + }) + + t.Run("DeleteStatements", func(t *testing.T) { + t.Parallel() + + qb := &QueryBuilder{subject: &test{}} + assert.Equal(t, `DELETE FROM "test" WHERE "id" IN (?)`, qb.Delete()) + assert.Equal(t, `DELETE FROM "test" WHERE "foo" IN (?)`, qb.DeleteBy("foo")) + }) + + t.Run("WhereClause", func(t *testing.T) { + t.Parallel() + + // Is invalid column (1) + qb := &QueryBuilder{subject: 1} + assert.PanicsWithValue(t, "qb.Where: unknown subject type provided: \"int\"", func() { _, _ = qb.Where(db, 1) }) + + var nilPtr Scoper // Interface nil value + qb = &QueryBuilder{subject: nilPtr} + clause, placeholder := qb.Where(db, nilPtr) + assert.Equal(t, 0, placeholder) + assert.Empty(t, clause) + + clause, placeholder = qb.Where(db, "id") + assert.Equal(t, 1, placeholder) + assert.Equal(t, "\"id\" = :id", clause) + + assertScoperID := func(clause string, placeholder int) { + assert.Equal(t, 1, placeholder) + assert.Equal(t, "\"scoper_id\" = :scoper_id", clause) + } + + var reference test + qb = &QueryBuilder{subject: &reference} + clause, placeholder = qb.Where(db, &reference) + assertScoperID(clause, placeholder) + + nonNilPtr := new(test) + qb = &QueryBuilder{subject: nilPtr} + clause, placeholder = qb.Where(db, nonNilPtr) + assertScoperID(clause, placeholder) + }) + + t.Run("InsertStatements", func(t *testing.T) { + t.Parallel() + + qb := &QueryBuilder{subject: &test{}} + qb.sort = true + qb.SetExcludedColumns("random") + + stmt, columns := qb.Insert(db) + assert.Equal(t, 2, columns) + assert.Equal(t, `INSERT INTO "test" ("name", "value") VALUES (:name, :value)`, stmt) + + qb.SetExcludedColumns("a", "b") + qb.SetColumns("a", "b", "c", "d") + + stmt, columns = qb.Insert(db) + assert.Equal(t, 2, columns) + assert.Equal(t, `INSERT INTO "test" ("c", "d") VALUES (:c, :d)`, stmt) + + stmt, columns = qb.InsertIgnore(db) + assert.Equal(t, 2, columns) + if db.DriverName() == MySQL { + assert.Equal(t, `INSERT INTO "test" ("c", "d") VALUES (:c, :d) ON DUPLICATE KEY UPDATE "c" = "c"`, stmt) + } else { + assert.Equal(t, `INSERT INTO "test" ("c", "d") VALUES (:c, :d) ON CONFLICT ON CONSTRAINT pk_test DO NOTHING`, stmt) + } + }) + + t.Run("SelectStatements", func(t *testing.T) { + t.Parallel() + + qb := &QueryBuilder{subject: &test{}} + qb.sort = true + + stmt := qb.Select(db) + expected := `SELECT "name", "random", "value" FROM "test" WHERE "scoper_id" = :scoper_id` + assert.Equal(t, expected, stmt) + + qb.SetColumns("name", "random", "value") + + stmt = qb.SelectScoped(db, "name") + assert.Equal(t, `SELECT "name", "random", "value" FROM "test" WHERE "name" = :name`, stmt) + }) + + t.Run("UpdateStatements", func(t *testing.T) { + t.Parallel() + + qb := &QueryBuilder{subject: &test{}} + qb.sort = true + qb.SetExcludedColumns("random") + + stmt, placeholders := qb.Update(db) + assert.Equal(t, 3, placeholders) + + expected := `UPDATE "test" SET "name" = :name, "value" = :value WHERE "id" = :id` + assert.Equal(t, expected, stmt) + + stmt, placeholders = qb.UpdateScoped(db, (&test{}).Scope()) + assert.Equal(t, 3, placeholders) + assert.Equal(t, `UPDATE "test" SET "name" = :name, "value" = :value WHERE "scoper_id" = :scoper_id`, stmt) + + qb.SetExcludedColumns("a", "b") + qb.SetColumns("a", "b", "c", "d") + + stmt, placeholders = qb.UpdateScoped(db, "c") + assert.Equal(t, 3, placeholders) + assert.Equal(t, 3, placeholders) + assert.Equal(t, `UPDATE "test" SET "c" = :c, "d" = :d WHERE "c" = :c`, stmt) + }) + + t.Run("UpsertStatements", func(t *testing.T) { + t.Parallel() + + qb := &QueryBuilder{subject: &test{}} + qb.sort = true + qb.SetExcludedColumns("random") + + stmt, columns := qb.Upsert(db) + assert.Equal(t, 2, columns) + + expected := `INSERT INTO "test" ("name", "value") VALUES (:name, :value)` + if db.DriverName() == MySQL { + assert.Equal(t, expected+` ON DUPLICATE KEY UPDATE "name" = VALUES("name"), "value" = VALUES("value")`, stmt) + } else { + assert.Equal(t, expected+` ON CONFLICT ON CONSTRAINT pk_test DO UPDATE SET "name" = EXCLUDED."name", "value" = EXCLUDED."value"`, stmt) + } + + qb.SetExcludedColumns("a", "b") + qb.SetColumns("a", "b", "c", "d") + + expected = `INSERT INTO "test" ("c", "d") VALUES (:c, :d)` + stmt, columns = qb.Upsert(db) + assert.Equal(t, 2, columns) + if db.DriverName() == MySQL { + assert.Equal(t, expected+` ON DUPLICATE KEY UPDATE "c" = VALUES("c"), "d" = VALUES("d")`, stmt) + } else { + assert.Equal(t, expected+` ON CONFLICT ON CONSTRAINT pk_test DO UPDATE SET "c" = EXCLUDED."c", "d" = EXCLUDED."d"`, stmt) + } + + qb.SetExcludedColumns("a") + + expected = `INSERT INTO "test" ("b", "c", "d") VALUES (:b, :c, :d)` + stmt, columns = qb.UpsertColumns(db, "b", "c") + assert.Equal(t, 3, columns) + if db.DriverName() == MySQL { + assert.Equal(t, expected+` ON DUPLICATE KEY UPDATE "b" = VALUES("b"), "c" = VALUES("c")`, stmt) + } else { + assert.Equal(t, expected+` ON CONFLICT ON CONSTRAINT pk_test DO UPDATE SET "b" = EXCLUDED."b", "c" = EXCLUDED."c"`, stmt) + } + }) +} + +type test struct { + Name string + Value string + Random string +} + +func (t *test) Scope() any { + return struct { + ScoperID string + }{} +} + +type pgsqlConstraintName struct { + *test +} + +func (p *pgsqlConstraintName) PgsqlOnConflictConstraint() string { + return "idx_custom_constraint" +} + +func (p *pgsqlConstraintName) TableName() string { + return "test" +} From c7dfddd3ecc2769281841ab7dd6bd4852897aadd Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Thu, 18 Jan 2024 17:26:39 +0100 Subject: [PATCH 2/5] DB: Make use of the `QueryBuilder` --- database/db.go | 126 +++++-------------------------------------------- 1 file changed, 11 insertions(+), 115 deletions(-) diff --git a/database/db.go b/database/db.go index a33cd56a..1edabaea 100644 --- a/database/db.go +++ b/database/db.go @@ -258,6 +258,11 @@ func (db *DB) MarshalLogObject(encoder zapcore.ObjectEncoder) error { return nil } +// QueryBuilder returns a fully initialised and ready to use *[QueryBuilder] instance for the given subject/struct. +func (db *DB) QueryBuilder(subject any) *QueryBuilder { + return &QueryBuilder{subject: subject} +} + // BuildColumns returns all columns of the given struct. func (db *DB) BuildColumns(subject interface{}) []string { return slices.Clone(db.columnMap.Columns(subject)) @@ -265,143 +270,34 @@ func (db *DB) BuildColumns(subject interface{}) []string { // BuildDeleteStmt returns a DELETE statement for the given struct. func (db *DB) BuildDeleteStmt(from interface{}) string { - return fmt.Sprintf( - `DELETE FROM "%s" WHERE id IN (?)`, - TableName(from), - ) + return db.QueryBuilder(from).Delete() } // BuildInsertStmt returns an INSERT INTO statement for the given struct. func (db *DB) BuildInsertStmt(into interface{}) (string, int) { - columns := db.columnMap.Columns(into) - - return fmt.Sprintf( - `INSERT INTO "%s" ("%s") VALUES (%s)`, - TableName(into), - strings.Join(columns, `", "`), - fmt.Sprintf(":%s", strings.Join(columns, ", :")), - ), len(columns) + return db.QueryBuilder(into).Insert(db) } // BuildInsertIgnoreStmt returns an INSERT statement for the specified struct for // which the database ignores rows that have already been inserted. func (db *DB) BuildInsertIgnoreStmt(into interface{}) (string, int) { - table := TableName(into) - columns := db.columnMap.Columns(into) - var clause string - - switch db.DriverName() { - case MySQL: - // MySQL treats UPDATE id = id as a no-op. - clause = fmt.Sprintf(`ON DUPLICATE KEY UPDATE "%s" = "%s"`, columns[0], columns[0]) - case PostgreSQL: - var constraint string - if constrainter, ok := into.(PgsqlOnConflictConstrainter); ok { - constraint = constrainter.PgsqlOnConflictConstraint() - } else { - constraint = "pk_" + table - } - - clause = fmt.Sprintf("ON CONFLICT ON CONSTRAINT %s DO NOTHING", constraint) - } - - return fmt.Sprintf( - `INSERT INTO "%s" ("%s") VALUES (%s) %s`, - table, - strings.Join(columns, `", "`), - fmt.Sprintf(":%s", strings.Join(columns, ", :")), - clause, - ), len(columns) + return db.QueryBuilder(into).InsertIgnore(db) } // BuildSelectStmt returns a SELECT query that creates the FROM part from the given table struct // and the column list from the specified columns struct. func (db *DB) BuildSelectStmt(table interface{}, columns interface{}) string { - q := fmt.Sprintf( - `SELECT "%s" FROM "%s"`, - strings.Join(db.columnMap.Columns(columns), `", "`), - TableName(table), - ) - - if scoper, ok := table.(Scoper); ok { - where, _ := db.BuildWhere(scoper.Scope()) - q += ` WHERE ` + where - } - - return q + return db.QueryBuilder(table).SetColumns(db.columnMap.Columns(columns)...).Select(db) } // BuildUpdateStmt returns an UPDATE statement for the given struct. func (db *DB) BuildUpdateStmt(update interface{}) (string, int) { - columns := db.columnMap.Columns(update) - set := make([]string, 0, len(columns)) - - for _, col := range columns { - set = append(set, fmt.Sprintf(`"%s" = :%s`, col, col)) - } - - return fmt.Sprintf( - `UPDATE "%s" SET %s WHERE id = :id`, - TableName(update), - strings.Join(set, ", "), - ), len(columns) + 1 // +1 because of WHERE id = :id + return db.QueryBuilder(update).Update(db) } // BuildUpsertStmt returns an upsert statement for the given struct. func (db *DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders int) { - insertColumns := db.columnMap.Columns(subject) - table := TableName(subject) - var updateColumns []string - - if upserter, ok := subject.(Upserter); ok { - updateColumns = db.columnMap.Columns(upserter.Upsert()) - } else { - updateColumns = insertColumns - } - - var clause, setFormat string - switch db.DriverName() { - case MySQL: - clause = "ON DUPLICATE KEY UPDATE" - setFormat = `"%[1]s" = VALUES("%[1]s")` - case PostgreSQL: - var constraint string - if constrainter, ok := subject.(PgsqlOnConflictConstrainter); ok { - constraint = constrainter.PgsqlOnConflictConstraint() - } else { - constraint = "pk_" + table - } - - clause = fmt.Sprintf("ON CONFLICT ON CONSTRAINT %s DO UPDATE SET", constraint) - setFormat = `"%[1]s" = EXCLUDED."%[1]s"` - } - - set := make([]string, 0, len(updateColumns)) - - for _, col := range updateColumns { - set = append(set, fmt.Sprintf(setFormat, col)) - } - - return fmt.Sprintf( - `INSERT INTO "%s" ("%s") VALUES (%s) %s %s`, - table, - strings.Join(insertColumns, `", "`), - fmt.Sprintf(":%s", strings.Join(insertColumns, ",:")), - clause, - strings.Join(set, ","), - ), len(insertColumns) -} - -// BuildWhere returns a WHERE clause with named placeholder conditions built from the specified struct -// combined with the AND operator. -func (db *DB) BuildWhere(subject interface{}) (string, int) { - columns := db.columnMap.Columns(subject) - where := make([]string, 0, len(columns)) - for _, col := range columns { - where = append(where, fmt.Sprintf(`"%s" = :%s`, col, col)) - } - - return strings.Join(where, ` AND `), len(columns) + return db.QueryBuilder(subject).Upsert(db) } // OnSuccess is a callback for successful (bulk) DML operations. From 2a133da2c8edd1ee59368de2dce415b29dd70c75 Mon Sep 17 00:00:00 2001 From: Johannes Rauh Date: Mon, 27 Nov 2023 16:18:58 +0100 Subject: [PATCH 3/5] Structs for upsert and delete --- database/optionally.go | 135 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 135 insertions(+) create mode 100644 database/optionally.go diff --git a/database/optionally.go b/database/optionally.go new file mode 100644 index 00000000..73f6a847 --- /dev/null +++ b/database/optionally.go @@ -0,0 +1,135 @@ +package database + +import ( + "context" + "fmt" + "github.com/icinga/icinga-go-library/com" + "github.com/pkg/errors" +) + +// Upsert inserts new rows into a table or updates rows of a table if the primary key already exists. +type Upsert interface { + // Stream bulk upserts the specified entities via NamedBulkExec. + // If not explicitly specified, the upsert statement is created using + // BuildUpsertStmt with the first entity from the entities stream. + Stream(ctx context.Context, entities <-chan Entity) error +} + +// UpsertOption is a functional option for NewUpsert. +type UpsertOption func(u *upsert) + +// WithOnUpsert adds callback(s) to bulk upserts. Entities for which the +// operation was performed successfully are passed to the callbacks. +func WithOnUpsert(onUpsert ...OnSuccess[Entity]) UpsertOption { + return func(u *upsert) { + u.onUpsert = onUpsert + } +} + +// WithStatement uses the specified statement for bulk upserts instead of automatically creating one. +func WithStatement(stmt string, placeholders int) UpsertOption { + return func(u *upsert) { + u.stmt = stmt + u.placeholders = placeholders + } +} + +// NewUpsert creates a new Upsert initalized with a database. +func NewUpsert(db *DB, options ...UpsertOption) Upsert { + u := &upsert{db: db} + + for _, option := range options { + option(u) + } + + return u +} + +type upsert struct { + db *DB + onUpsert []OnSuccess[Entity] + stmt string + placeholders int +} + +func (u *upsert) Stream(ctx context.Context, entities <-chan Entity) error { + first, forward, err := com.CopyFirst(ctx, entities) + if err != nil { + return errors.Wrap(err, "can't copy first entity") + } + + sem := u.db.GetSemaphoreForTable(TableName(first)) + var stmt string + var placeholders int + + if u.stmt != "" { + stmt = u.stmt + placeholders = u.placeholders + } else { + stmt, placeholders = u.db.BuildUpsertStmt(first) + } + + return u.db.NamedBulkExec( + ctx, stmt, u.db.BatchSizeByPlaceholders(placeholders), sem, + forward, SplitOnDupId[Entity], u.onUpsert..., + ) +} + +// Delete deletes rows of a table. +type Delete interface { + // Stream bulk deletes rows from the table specified in from using the given args stream via BulkExec. + // Unless explicitly specified, the DELETE statement is created using BuildDeleteStmt. + Stream(ctx context.Context, from any, args <-chan any) error +} + +// DeleteOption is a functional option for NewDelete. +type DeleteOption func(options *delete) + +// WithOnDelete adds callback(s) to bulk deletes. Arguments for which the +// operation was performed successfully are passed to the callbacks. +func WithOnDelete(onDelete ...OnSuccess[any]) DeleteOption { + return func(d *delete) { + d.onDelete = onDelete + } +} + +// ByColumn uses the given column for the WHERE clause that the rows must +// satisfy in order to be deleted, instead of automatically using ID. +func ByColumn(column string) DeleteOption { + return func(d *delete) { + d.column = column + } +} + +// NewDelete creates a new Delete initalized with a database. +func NewDelete(db *DB, options ...DeleteOption) Delete { + d := &delete{db: db} + + for _, option := range options { + option(d) + } + + return d +} + +type delete struct { + db *DB + column string + onDelete []OnSuccess[any] +} + +func (d *delete) Stream(ctx context.Context, from any, args <-chan any) error { + var stmt string + + if d.column != "" { + stmt = fmt.Sprintf(`DELETE FROM "%s" WHERE %s IN (?)`, TableName(from), d.column) + } else { + stmt = d.db.BuildDeleteStmt(from) + } + + sem := d.db.GetSemaphoreForTable(TableName(from)) + + return d.db.BulkExec( + ctx, stmt, d.db.Options.MaxPlaceholdersPerStatement, sem, args, d.onDelete..., + ) +} From 279942dbdf3b5c18e4d5536f3b95b16c6c54b4fa Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Thu, 19 Sep 2024 17:29:31 +0200 Subject: [PATCH 4/5] Extend functional query builders --- database/optionally.go | 293 ++++++++++++++++++++++++++++------------- 1 file changed, 201 insertions(+), 92 deletions(-) diff --git a/database/optionally.go b/database/optionally.go index 73f6a847..3a667d2e 100644 --- a/database/optionally.go +++ b/database/optionally.go @@ -2,134 +2,243 @@ package database import ( "context" - "fmt" "github.com/icinga/icinga-go-library/com" "github.com/pkg/errors" ) -// Upsert inserts new rows into a table or updates rows of a table if the primary key already exists. -type Upsert interface { - // Stream bulk upserts the specified entities via NamedBulkExec. - // If not explicitly specified, the upsert statement is created using - // BuildUpsertStmt with the first entity from the entities stream. - Stream(ctx context.Context, entities <-chan Entity) error -} +// QueryType represents the type of database query, expressed as an enum-like integer value. +type QueryType int -// UpsertOption is a functional option for NewUpsert. -type UpsertOption func(u *upsert) +const ( + // SelectQuery represents a SQL SELECT query type, used for retrieving data from a database. + SelectQuery QueryType = iota -// WithOnUpsert adds callback(s) to bulk upserts. Entities for which the -// operation was performed successfully are passed to the callbacks. -func WithOnUpsert(onUpsert ...OnSuccess[Entity]) UpsertOption { - return func(u *upsert) { - u.onUpsert = onUpsert - } + // InsertQuery represents the constant value for an INSERT database query. + InsertQuery + + // UpsertQuery represents the constant value used for an UPSERT (INSERT or UPDATE) database query. + UpsertQuery + + // UpdateQuery represents the constant value for an UPDATE database query. + UpdateQuery + + // DeleteQuery represents the constant value for a DELETE query. + DeleteQuery +) + +// Queryable defines methods for bulk executing database entities such as upsert, insert, and update. +type Queryable interface { + // Stream bulk executes database Entity(ies) for the following three database query types. + // * Upsert - Stream consumes from the provided entities channel and bulk upserts them via DB.NamedBulkExec. + // If not explicitly specified via WithStatement, the upsert statement is generated dynamically via the + // QueryBuilder. The bulk size is controlled via Options.MaxPlaceholdersPerStatement and concurrency + // via the Options.MaxConnectionsPerTable. + // * Insert(Ignore) - Stream does likewise for insert statement and bulk inserts the entities via DB.NamedBulkExec. + // If not explicitly specified via WithStatement, the insert statement is generated dynamically via the + // QueryBuilder. The bulk size is controlled via Options.MaxPlaceholdersPerStatement and concurrency + // via the Options.MaxConnectionsPerTable. + // * Update - Stream bulk updates the entities via DB.NamedBulkExecTx. If not explicitly specified via + // WithStatement, the update statement is generated dynamically via the QueryBuilder. The bulk size is + // controlled via Options.MaxRowsPerTransaction and concurrency via the Options.MaxConnectionsPerTable. + // Entities for which the query ran successfully will be passed to the onSuccess handlers (if provided). + Stream(ctx context.Context, entities <-chan Entity, onSuccess ...OnSuccess[Entity]) error + + // StreamAny bulk executes the streamed items of type any using the [DB.BulkExec] method. + StreamAny(ctx context.Context, args <-chan any, onSuccess ...OnSuccess[any]) error } -// WithStatement uses the specified statement for bulk upserts instead of automatically creating one. -func WithStatement(stmt string, placeholders int) UpsertOption { - return func(u *upsert) { - u.stmt = stmt - u.placeholders = placeholders - } +// NewSelect initializes a new Queryable object of type SelectQuery for a given [DB], subject. +func NewSelect(db *DB, subject any, options ...QueryableOption) Queryable { + return newQuery(db, subject, append([]QueryableOption{withSetQueryType(SelectQuery)}, options...)...) } -// NewUpsert creates a new Upsert initalized with a database. -func NewUpsert(db *DB, options ...UpsertOption) Upsert { - u := &upsert{db: db} +// NewInsert initializes a new Queryable object of type InsertQuery for a given [DB], subject. +func NewInsert(db *DB, subject any, options ...QueryableOption) Queryable { + return newQuery(db, subject, append([]QueryableOption{withSetQueryType(InsertQuery)}, options...)...) +} - for _, option := range options { - option(u) - } +// NewUpsert initializes a new Queryable object of type UpsertQuery for a given [DB], subject. +func NewUpsert(db *DB, subject any, options ...QueryableOption) Queryable { + return newQuery(db, subject, append([]QueryableOption{withSetQueryType(UpsertQuery)}, options...)...) +} - return u +// NewUpdate initializes a new Queryable object of type UpdateQuery for a given [DB], subject. +func NewUpdate(db *DB, subject any, options ...QueryableOption) Queryable { + return newQuery(db, subject, append([]QueryableOption{withSetQueryType(UpdateQuery)}, options...)...) } -type upsert struct { - db *DB - onUpsert []OnSuccess[Entity] - stmt string - placeholders int +// NewDelete initializes a new Queryable object of type DeleteQuery for a given [DB], subject. +func NewDelete(db *DB, subject any, options ...QueryableOption) Queryable { + return newQuery(db, subject, append([]QueryableOption{withSetQueryType(DeleteQuery)}, options...)...) } -func (u *upsert) Stream(ctx context.Context, entities <-chan Entity) error { - first, forward, err := com.CopyFirst(ctx, entities) - if err != nil { - return errors.Wrap(err, "can't copy first entity") - } +// queryable represents a database query type with customizable behavior for dynamic and static SQL statements. +type queryable struct { + db *DB - sem := u.db.GetSemaphoreForTable(TableName(first)) - var stmt string - var placeholders int + // qb is the query builder used to construct SQL statements for various database + // statements if, and only if stmt is not set. + qb *QueryBuilder - if u.stmt != "" { - stmt = u.stmt - placeholders = u.placeholders - } else { - stmt, placeholders = u.db.BuildUpsertStmt(first) - } + // qtype defines the type of database query (e.g., SELECT, INSERT) to perform, influencing query construction behavior. + qtype QueryType + + // scoper is used to dynamically generate scoped database queries if, and only if stmt is not set. + scoper any + + // stmt is used to cache statically provided database statements. + stmt string - return u.db.NamedBulkExec( - ctx, stmt, u.db.BatchSizeByPlaceholders(placeholders), sem, - forward, SplitOnDupId[Entity], u.onUpsert..., - ) + // placeholders is used to determine the entities bulk/chunk size for statically provided statements. + placeholders int + + // ignoreOnError is only used to generate special insert statements that silently suppress duplicate key errors. + ignoreOnError bool } -// Delete deletes rows of a table. -type Delete interface { - // Stream bulk deletes rows from the table specified in from using the given args stream via BulkExec. - // Unless explicitly specified, the DELETE statement is created using BuildDeleteStmt. - Stream(ctx context.Context, from any, args <-chan any) error +// Assert that *queryable type satisfies the Queryable interface. +var _ Queryable = (*queryable)(nil) + +// Stream implements the [Queryable.Stream] method. +func (q *queryable) Stream(ctx context.Context, entities <-chan Entity, onSuccess ...OnSuccess[Entity]) error { + sem := q.db.GetSemaphoreForTable(TableName(q.qb.subject)) + stmt, placeholders := q.buildStmt() + batchSize := q.db.BatchSizeByPlaceholders(placeholders) + + switch q.qtype { + case SelectQuery: // TODO: support select statements? + case InsertQuery: + return q.db.NamedBulkExec(ctx, stmt, batchSize, sem, entities, com.NeverSplit[Entity], onSuccess...) + case UpsertQuery: + return q.db.NamedBulkExec(ctx, stmt, batchSize, sem, entities, SplitOnDupId[Entity], onSuccess...) + case UpdateQuery: + return q.db.NamedBulkExecTx(ctx, stmt, q.db.Options.MaxRowsPerTransaction, sem, entities) + case DeleteQuery: + return errors.Errorf("can't stream entities for 'DELETE' query") + } + + return errors.Errorf("unsupported query type: %v", q.qtype) } -// DeleteOption is a functional option for NewDelete. -type DeleteOption func(options *delete) +// StreamAny implements the [Queryable.StreamAny] method. +func (q *queryable) StreamAny(ctx context.Context, args <-chan any, onSuccess ...OnSuccess[any]) error { + stmt, _ := q.buildStmt() + sem := q.db.GetSemaphoreForTable(TableName(q.qb.subject)) -// WithOnDelete adds callback(s) to bulk deletes. Arguments for which the -// operation was performed successfully are passed to the callbacks. -func WithOnDelete(onDelete ...OnSuccess[any]) DeleteOption { - return func(d *delete) { - d.onDelete = onDelete - } + return q.db.BulkExec(ctx, stmt, q.db.Options.MaxPlaceholdersPerStatement, sem, args, onSuccess...) } -// ByColumn uses the given column for the WHERE clause that the rows must -// satisfy in order to be deleted, instead of automatically using ID. -func ByColumn(column string) DeleteOption { - return func(d *delete) { - d.column = column +// buildStmt constructs the SQL statement based on the type of query (Select, Insert, Upsert, Update, Delete). +// It also determines the number of placeholders to be used in the statement. +func (q *queryable) buildStmt() (string, int) { + if q.stmt != "" { + return q.stmt, q.placeholders } -} -// NewDelete creates a new Delete initalized with a database. -func NewDelete(db *DB, options ...DeleteOption) Delete { - d := &delete{db: db} + var stmt string + var placeholders int - for _, option := range options { - option(d) + switch q.qtype { + case SelectQuery: // TODO: support select statements? + case InsertQuery: + if q.ignoreOnError { + stmt, placeholders = q.qb.InsertIgnore(q.db) + } else { + stmt, placeholders = q.qb.Insert(q.db) + } + case UpsertQuery: + if q.stmt != "" { + stmt, placeholders = q.stmt, q.placeholders + } else { + stmt, placeholders = q.qb.Upsert(q.db) + } + case UpdateQuery: + stmt = q.stmt + if stmt == "" { + if q.scoper != nil && q.scoper.(string) != "" { + stmt, _ = q.qb.UpdateScoped(q.db, q.scoper) + } else { + stmt, _ = q.qb.Update(q.db) + } + } + case DeleteQuery: + if q.stmt != "" { + stmt, placeholders = q.stmt, q.placeholders + } else if q.scoper != "" { + stmt = q.qb.DeleteBy(q.scoper.(string)) + } else { + stmt = q.qb.Delete() + } } - return d + return stmt, placeholders } -type delete struct { - db *DB - column string - onDelete []OnSuccess[any] -} +// newQuery initializes a new Queryable object for a given [DB], subject, and query type. +// It also applies optional query options to the just created queryable object. +// +// Note: If the query type is not explicitly set using WithSetQueryType, it will default to SELECT queries. +func newQuery(db *DB, subject any, options ...QueryableOption) Queryable { + q := &queryable{db: db, qb: &QueryBuilder{subject: subject}} + for _, option := range options { + option(q) + } -func (d *delete) Stream(ctx context.Context, from any, args <-chan any) error { - var stmt string + return q +} - if d.column != "" { - stmt = fmt.Sprintf(`DELETE FROM "%s" WHERE %s IN (?)`, TableName(from), d.column) - } else { - stmt = d.db.BuildDeleteStmt(from) +// QueryableOption describes the base functional specification for all the queryable types. +type QueryableOption func(*queryable) + +// withSetQueryType sets the type of database query to be executed/generated. +func withSetQueryType(qtype QueryType) QueryableOption { return func(q *queryable) { q.qtype = qtype } } + +// WithStatement configures a static SQL statement and its associated placeholders for a queryable entity. +// +// Note that using WithStatement always suppresses all other available queryable options and unlike +// some other options, this can be used to explicitly provide a custom query for all kinds of DB stmts. +// +// Returns a function that lazily modifies a given queryable type by setting its stmt and placeholders fields. +func WithStatement(stmt string, placeholders int) QueryableOption { + return func(q *queryable) { + q.stmt = stmt + q.placeholders = placeholders } +} + +// WithColumns statically configures the DB columns to be used for building the database statements. +// +// Setting the queryable columns while using WithStatement has no behavioural effects, thus these columns are never +// used. Additionally, for upsert statements, WithColumns not only defines the columns to be actually inserted but +// the columns to be updated when a duplicate key error occurs as well. However, to maintain the compatibility with +// legacy implementations, a query subject that implements the Upserter interface takes a higher precedence over +// those explicitly set columns for the "update on duplicate key error" part. +// +// Note that using this option for Delete statements has no effect as well, hence its usage is discouraged. +// +// Returns a function that lazily modifies a given queryable type by setting its columns. +func WithColumns(columns ...string) QueryableOption { + return func(q *queryable) { q.qb.SetColumns(columns...) } +} - sem := d.db.GetSemaphoreForTable(TableName(from)) +// WithoutColumns returns a QueryableOption callback that excludes the DB columns from the generated DB statements. +// +// Setting the excludable columns while using WithStatement has no behavioural effects, so these columns may or may +// not be excluded depending on the explicitly set statement. Also, note that using this option for Delete statements +// has no effect as well, hence its usage is prohibited. +func WithoutColumns(columns ...string) QueryableOption { + return func(q *queryable) { q.qb.SetExcludedColumns(columns...) } +} - return d.db.BulkExec( - ctx, stmt, d.db.Options.MaxPlaceholdersPerStatement, sem, args, d.onDelete..., - ) +// WithIgnoreOnError returns a InsertOption callback that sets the ignoreOnError flag DB insert statements. +// +// When this flag is set, the dynamically generated insert statement will cause to suppress all duplicate key errors. +// +// Setting this flag while using WithStatement has no behavioural effects, so the final database statement +// may or may not silently suppress "duplicate key errors" depending on the explicitly set statement. +func WithIgnoreOnError() QueryableOption { return func(q *queryable) { q.ignoreOnError = true } } + +// WithByColumn returns a functional option for DeleteOption or UpdateOption, setting the scoper to the provided column. +func WithByColumn(column string) QueryableOption { + return func(q *queryable) { q.scoper = column } } From 295535fb0e88ef7557f94ac9bd328f764bf88602 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Thu, 19 Sep 2024 17:24:15 +0200 Subject: [PATCH 5/5] Test functional query builders --- database/optionally_test.go | 163 ++++++++++++++++++++++++++++++++++++ 1 file changed, 163 insertions(+) create mode 100644 database/optionally_test.go diff --git a/database/optionally_test.go b/database/optionally_test.go new file mode 100644 index 00000000..d0567f5f --- /dev/null +++ b/database/optionally_test.go @@ -0,0 +1,163 @@ +package database + +import ( + "github.com/creasty/defaults" + "github.com/icinga/icinga-go-library/logging" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + "testing" + "time" +) + +// testEntity represents a mock structure for Entity. +type testEntity struct { + Id ID `db:"id"` + Name string `db:"name"` +} + +func (t *testEntity) TableName() string { return "test_subject" } +func (t *testEntity) PgsqlOnConflictConstraint() string { return "pgsql_constrainter" } + +func TestFunctionalQueries(t *testing.T) { + t.Parallel() + + t.Run("MySQL/MariaDB", func(t *testing.T) { + t.Parallel() + + c := &Config{} + require.NoError(t, defaults.Set(c), "applying config default should not fail") + + db, err := NewDbFromConfig(c, logging.NewLogger(zaptest.NewLogger(t).Sugar(), time.Minute), RetryConnectorCallbacks{}) + require.NoError(t, err) + require.Equal(t, MySQL, db.DriverName()) + + runFunctionalTests(t, db) + }) + t.Run("PostgreSQL", func(t *testing.T) { + t.Parallel() + + c := &Config{Type: "pgsql"} + require.NoError(t, defaults.Set(c), "applying config default should not fail") + + db, err := NewDbFromConfig(c, logging.NewLogger(zaptest.NewLogger(t).Sugar(), time.Minute), RetryConnectorCallbacks{}) + require.NoError(t, err) + require.Equal(t, PostgreSQL, db.DriverName()) + + runFunctionalTests(t, db) + }) +} + +func runFunctionalTests(t *testing.T, db *DB) { + t.Run("WithStatement", func(t *testing.T) { + t.Parallel() + + subject := &testEntity{} + + q := NewInsert(db, subject, WithStatement("INSERT INTO test_subject (id, name) VALUES (:id, :name)", 2)).(*queryable) + stmt, placeholders := q.buildStmt() + assert.Equal(t, 2, placeholders) + assert.Equal(t, "INSERT INTO test_subject (id, name) VALUES (:id, :name)", stmt) + + var upsert string + if db.DriverName() == PostgreSQL { + upsert = "INSERT INTO test_subject (id, name) VALUES (:id, :name) ON CONFLICT ON CONSTRAINT pgsql_constrainter DO UPDATE SET name = EXCLUDED.name" + } else { + upsert = "INSERT INTO test_subject (id, name) VALUES (:id, :name) ON DUPLICATE KEY UPDATE name = VALUES(name)" + } + q = NewUpsert(db, subject, WithStatement(upsert, 2)).(*queryable) + stmt, placeholders = q.buildStmt() + assert.Equal(t, 2, placeholders) + assert.Equal(t, upsert, stmt) + + q = NewUpdate(db, subject, WithStatement("UPDATE test_subject SET name = :name WHERE id = :id", 0)).(*queryable) + stmt, placeholders = q.buildStmt() + assert.Equal(t, 0, placeholders) + assert.Equal(t, "UPDATE test_subject SET name = :name WHERE id = :id", stmt) + + q = NewDelete(db, subject, WithStatement("DELETE FROM test_subject WHERE id = :id", 1)).(*queryable) + stmt, placeholders = q.buildStmt() + assert.Equal(t, 1, placeholders) + assert.Equal(t, "DELETE FROM test_subject WHERE id = :id", stmt) + }) + + t.Run("WithColumns", func(t *testing.T) { + t.Parallel() + + subject := &testEntity{} + + q := NewInsert(db, subject, WithColumns("name")).(*queryable) + stmt, placeholders := q.buildStmt() + assert.Equal(t, 1, placeholders) + assert.Equal(t, "INSERT INTO \"test_subject\" (\"name\") VALUES (:name)", stmt) + + q = NewUpsert(db, subject, WithColumns("name")).(*queryable) + stmt, placeholders = q.buildStmt() + assert.Equal(t, 1, placeholders) + if db.DriverName() == PostgreSQL { + assert.Equal(t, "INSERT INTO \"test_subject\" (\"name\") VALUES (:name) ON CONFLICT ON CONSTRAINT pgsql_constrainter DO UPDATE SET \"name\" = EXCLUDED.\"name\"", stmt) + } else { + assert.Equal(t, "INSERT INTO \"test_subject\" (\"name\") VALUES (:name) ON DUPLICATE KEY UPDATE \"name\" = VALUES(\"name\")", stmt) + } + + q = NewUpdate(db, subject, WithColumns("name")).(*queryable) + stmt, placeholders = q.buildStmt() + assert.Equal(t, 0, placeholders) + assert.Equal(t, "UPDATE \"test_subject\" SET \"name\" = :name WHERE \"id\" = :id", stmt) + }) + + t.Run("WithoutColumns", func(t *testing.T) { + t.Parallel() + + subject := &testEntity{} + + q := NewInsert(db, subject, WithoutColumns("id")).(*queryable) + stmt, placeholders := q.buildStmt() + assert.Equal(t, 1, placeholders) + assert.Equal(t, "INSERT INTO \"test_subject\" (\"name\") VALUES (:name)", stmt) + + q = NewUpsert(db, subject, WithoutColumns("id")).(*queryable) + stmt, placeholders = q.buildStmt() + assert.Equal(t, 1, placeholders) + if db.DriverName() == PostgreSQL { + assert.Equal(t, "INSERT INTO \"test_subject\" (\"name\") VALUES (:name) ON CONFLICT ON CONSTRAINT pgsql_constrainter DO UPDATE SET \"name\" = EXCLUDED.\"name\"", stmt) + } else { + assert.Equal(t, "INSERT INTO \"test_subject\" (\"name\") VALUES (:name) ON DUPLICATE KEY UPDATE \"name\" = VALUES(\"name\")", stmt) + } + + q = NewUpdate(db, subject, WithoutColumns("id")).(*queryable) + stmt, placeholders = q.buildStmt() + assert.Equal(t, 0, placeholders) + assert.Equal(t, "UPDATE \"test_subject\" SET \"name\" = :name WHERE \"id\" = :id", stmt) + }) + + t.Run("WithByColumns", func(t *testing.T) { + t.Parallel() + + subject := &testEntity{} + + q := NewUpdate(db, subject, WithoutColumns("id"), WithByColumn("name")).(*queryable) + stmt, placeholders := q.buildStmt() + assert.Equal(t, 0, placeholders) + assert.Equal(t, "UPDATE \"test_subject\" SET \"name\" = :name WHERE \"name\" = :name", stmt) + + q = NewDelete(db, subject, WithByColumn("name")).(*queryable) + stmt, placeholders = q.buildStmt() + assert.Equal(t, 0, placeholders) + assert.Equal(t, "DELETE FROM \"test_subject\" WHERE \"name\" IN (?)", stmt) + }) + + t.Run("WithIgnoreOnError", func(t *testing.T) { + t.Parallel() + + if db.DriverName() != PostgreSQL { + t.Skipf("Skipping IgnoreOnError test case for %q driver", db.DriverName()) + } + + subject := &testEntity{} + q := NewInsert(db, subject, WithColumns("id", "name"), WithIgnoreOnError()).(*queryable) + stmt, placeholders := q.buildStmt() + assert.Equal(t, 2, placeholders) + assert.Equal(t, "INSERT INTO \"test_subject\" (\"id\", \"name\") VALUES (:id, :name) ON CONFLICT ON CONSTRAINT pgsql_constrainter DO NOTHING", stmt) + }) +}