From 57ebd9138644710b74c075d81f6a32f8d96b4048 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Thu, 13 Feb 2025 22:33:25 +0000 Subject: [PATCH 1/3] sdks/go: require type registration in BigQueryIO --- sdks/go/pkg/beam/io/bigqueryio/bigquery.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/sdks/go/pkg/beam/io/bigqueryio/bigquery.go b/sdks/go/pkg/beam/io/bigqueryio/bigquery.go index 1a5e8650052..90390fe5628 100644 --- a/sdks/go/pkg/beam/io/bigqueryio/bigquery.go +++ b/sdks/go/pkg/beam/io/bigqueryio/bigquery.go @@ -27,6 +27,7 @@ import ( "cloud.google.com/go/bigquery" "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/structx" @@ -190,6 +191,9 @@ func mustInferSchema(t reflect.Type) bigquery.Schema { if t.Kind() != reflect.Struct { panic(fmt.Sprintf("schema type must be struct: %v", t)) } + + registerTypeIfNeeded(t) + schema, err := bigquery.InferSchema(reflect.Zero(t).Interface()) if err != nil { panic(errors.Wrapf(err, "invalid schema type: %v", t)) @@ -197,6 +201,17 @@ func mustInferSchema(t reflect.Type) bigquery.Schema { return schema } +func registerTypeIfNeeded(t reflect.Type) { + t = reflectx.SkipPtr(t) + key, ok := runtime.TypeKey(t) + if !ok { + panic(fmt.Sprintf("type %v must be a named type (not anonymous) for registration", t)) + } + if _, registered := runtime.LookupType(key); !registered { + runtime.RegisterType(t) + } +} + func mustParseTable(table string) QualifiedTableName { qn, err := NewQualifiedTableName(table) if err != nil { From 4afab5e84a97804dba58e77d45da8f1b64ca240a Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Thu, 13 Feb 2025 22:33:57 +0000 Subject: [PATCH 2/3] sdks/go: test type register is required BigQueryIO --- .../pkg/beam/io/bigqueryio/bigquery_test.go | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/sdks/go/pkg/beam/io/bigqueryio/bigquery_test.go b/sdks/go/pkg/beam/io/bigqueryio/bigquery_test.go index 3ee5df5f3bd..eea53cf10b8 100644 --- a/sdks/go/pkg/beam/io/bigqueryio/bigquery_test.go +++ b/sdks/go/pkg/beam/io/bigqueryio/bigquery_test.go @@ -16,8 +16,12 @@ package bigqueryio import ( + "errors" "reflect" "testing" + + "cloud.google.com/go/bigquery" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime" ) func TestNewQualifiedTableName(t *testing.T) { @@ -76,3 +80,70 @@ func Test_constructSelectStatementPanic(t *testing.T) { constructSelectStatement(typ, tagKey, table) }) } + +func Test_mustInferSchema(t *testing.T) { + type TestSchema struct { + Name bigquery.NullString `bigquery:"name"` + Active bigquery.NullBool `bigquery:"active"` + Score bigquery.NullFloat64 `bigquery:"score"` + Time bigquery.NullDateTime `bigquery:"time"` + } + + tests := []struct { + name string + input interface{} + wantErr bool + verify func(reflect.Type) error + }{ + { + name: "NewType_ShouldRegisterSuccessfully", + input: TestSchema{}, + wantErr: false, + verify: func(t reflect.Type) error { + // Verify successful type registration in runtime registry. + if key, ok := runtime.TypeKey(t); ok { + if _, registered := runtime.LookupType(key); !registered { + return errors.New("Type was not properly registered") + } + } + return nil + }, + }, + { + name: "AlreadyRegisteredType_ShouldNotPanic", + input: TestSchema{}, + wantErr: false, + verify: func(t reflect.Type) error { + // Verify re-registration of existing type is handled correctly. + mustInferSchema(t) + return nil + }, + }, + { + name: "AnonymousStruct_ShouldPanic", + input: struct{}{}, + wantErr: true, + verify: func(t reflect.Type) error { return nil }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + defer func() { + r := recover() + if (r != nil) != tt.wantErr { + t.Errorf("mustInferSchema() panic = %v, wantErr %v", r, tt.wantErr) + } + }() + + typ := reflect.TypeOf(tt.input) + mustInferSchema(typ) + if tt.wantErr { + t.Fatal("Expected panic did not occur") + } + if err := tt.verify(typ); err != nil { + t.Fatal(err) + } + }) + } +} From e511c10379dac51966aea4b441011552c4b50a90 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Thu, 20 Feb 2025 22:01:16 +0200 Subject: [PATCH 3/3] sdks/go: panic with clear err msg if already init --- sdks/go/pkg/beam/io/bigqueryio/bigquery.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sdks/go/pkg/beam/io/bigqueryio/bigquery.go b/sdks/go/pkg/beam/io/bigqueryio/bigquery.go index 90390fe5628..70a66bd4e0d 100644 --- a/sdks/go/pkg/beam/io/bigqueryio/bigquery.go +++ b/sdks/go/pkg/beam/io/bigqueryio/bigquery.go @@ -207,6 +207,14 @@ func registerTypeIfNeeded(t reflect.Type) { if !ok { panic(fmt.Sprintf("type %v must be a named type (not anonymous) for registration", t)) } + + // Check if Beam has already been initialized. + if beam.Initialized() { + panic(fmt.Sprintf("Type %v must be registered before beam.Init() is called. "+ + "Use beam.RegisterType(%v) in your main setup.", t, t)) + } + + // Register the type if not already registered. if _, registered := runtime.LookupType(key); !registered { runtime.RegisterType(t) }