Skip to content

Commit

Permalink
Merge pull request #122 from planetscale/add-log
Browse files Browse the repository at this point in the history
Convert MySQL datetime string to ISO8601
  • Loading branch information
notfelineit authored Feb 18, 2025
2 parents af3d135 + 3612a67 commit 0c0cb75
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 6 deletions.
9 changes: 8 additions & 1 deletion cmd/internal/planetscale_edge_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,16 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc *
tc.Position = ""
}

isFullSync := syncMode == "full"
vtgateReq := buildVStreamRequest(tabletType, s.Name, tc.Shard, tc.Keyspace, tc.Position, tc.LastKnownPk)
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sRequesting VStream with %+v", preamble, vtgateReq))

if isFullSync {
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sWill stop once COPY COMPLETED event is seen.", preamble))
} else {
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sWill stop once stop position [%+v] is found.", preamble, stopPosition))
}

c, err := vtgateClient.VStream(ctx, vtgateReq)

if err != nil {
Expand All @@ -287,7 +295,6 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc *
keyspaceOrDatabase = ps.Database
}

isFullSync := syncMode == "full"
copyCompletedSeen := false
// Can finish sync once we've synced to the stop position, or finished the VStream COPY phase
canFinishSync := false
Expand Down
2 changes: 1 addition & 1 deletion cmd/internal/planetscale_edge_database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2598,7 +2598,7 @@ func TestRead_FullSync_MaxRetries(t *testing.T) {
assert.Equal(t, 4, vsc.vstreamFnInvokedCount)

logLines := tal.logMessages[LOGLEVEL_INFO]
assert.Equal(t, strings.TrimSpace(fmt.Sprintf("[connect-test:primary:customers shard : -] %v records synced after 3 syncs. Got error [DeadlineExceeded], returning with cursor [shard:\"-\" keyspace:\"connect-test\" position:\"MySQL56/e4e20f06-e28f-11ec-8d20-8e7ac09cb64c:1-10\" last_known_pk:{fields:{name:\"id\" type:INT64 charset:63 flags:53251} rows:{lengths:4 values:\"30\"}}] after gRPC error", 30)), strings.TrimSpace(logLines[len(logLines)-1]))
assert.Equal(t, strings.ReplaceAll(fmt.Sprintf("[connect-test:primary:customers shard : -] %v records synced after 3 syncs. Got error [DeadlineExceeded], returning with cursor [shard:\"-\" keyspace:\"connect-test\" position:\"MySQL56/e4e20f06-e28f-11ec-8d20-8e7ac09cb64c:1-10\" last_known_pk:{fields:{name:\"id\" type:INT64 charset:63 flags:53251} rows:{lengths:4 values:\"30\"}}] after gRPC error", 30), " ", ""), strings.ReplaceAll(logLines[len(logLines)-1], " ", ""))
records := tal.records["connect-test.customers"]
assert.Equal(t, 30, len(records))
}
35 changes: 31 additions & 4 deletions cmd/internal/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"regexp"
"strconv"
"strings"
"time"

"github.com/pkg/errors"
psdbconnect "github.com/planetscale/airbyte-source/proto/psdbconnect/v1alpha1"
"github.com/planetscale/psdb/core/codec"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/proto/query"
)

const (
Expand Down Expand Up @@ -142,7 +144,7 @@ func QueryResultToRecords(qr *sqltypes.Result) []map[string]interface{} {
record := make(map[string]interface{})
for idx, val := range row {
if idx < len(columns) {
record[columns[idx]] = parseValue(val, qr.Fields[idx].GetColumnType())
record[columns[idx]] = parseValue(val, qr.Fields[idx].GetColumnType(), qr.Fields[idx].GetType())
}
}
data = append(data, record)
Expand All @@ -153,11 +155,14 @@ func QueryResultToRecords(qr *sqltypes.Result) []map[string]interface{} {

// After the initial COPY phase, enum and set values may appear as an index instead of a value.
// For example, a value might look like a "1" instead of "apple" in an enum('apple','banana','orange') column)
func parseValue(val sqltypes.Value, columnType string) sqltypes.Value {
if strings.HasPrefix(columnType, "enum") {
func parseValue(val sqltypes.Value, columnType string, queryColumnType query.Type) sqltypes.Value {
switch queryColumnType {
case query.Type_DATETIME, query.Type_DATE, query.Type_TIME:
return formatISO8601(queryColumnType, val)
case query.Type_ENUM:
values := parseEnumOrSetValues(columnType)
return mapEnumValue(val, values)
} else if strings.HasPrefix(columnType, "set") {
case query.Type_SET:
values := parseEnumOrSetValues(columnType)
return mapSetValue(val, values)
}
Expand All @@ -181,6 +186,28 @@ func parseEnumOrSetValues(columnType string) []string {
return values
}

func formatISO8601(mysqlType query.Type, value sqltypes.Value) sqltypes.Value {
parsedDatetime := value.ToString()

var formatString string
var layout string
if mysqlType == query.Type_DATE {
formatString = "2006-01-02"
layout = time.DateOnly
} else {
formatString = "2006-01-02 15:04:05"
layout = time.RFC3339
}
mysqlTime, err := time.Parse(formatString, parsedDatetime)
if err != nil {
// fallback to default value if datetime is not parseable
return value
}
iso8601Datetime := mysqlTime.Format(layout)
formattedValue, _ := sqltypes.NewValue(value.Type(), []byte(iso8601Datetime))
return formattedValue
}

func mapSetValue(value sqltypes.Value, values []string) sqltypes.Value {
parsedValue := value.ToString()
parsedInt, err := strconv.ParseInt(parsedValue, 10, 64)
Expand Down
26 changes: 26 additions & 0 deletions cmd/internal/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,29 @@ func TestCanMapEnumAndSetValues(t *testing.T) {
assert.Equal(t, "active", secondRow["status"].(sqltypes.Value).ToString())
assert.Equal(t, "San Francisco,Oakland", secondRow["locations"].(sqltypes.Value).ToString())
}

func TestCanFormatISO8601Values(t *testing.T) {
datetimeValue, err := sqltypes.NewValue(query.Type_DATETIME, []byte("2025-02-14 08:08:08"))
assert.NoError(t, err)
dateValue, err := sqltypes.NewValue(query.Type_DATE, []byte("2025-02-14"))
assert.NoError(t, err)
timestampValue, err := sqltypes.NewValue(query.Type_TIMESTAMP, []byte("2025-02-14 08:08:08"))
assert.NoError(t, err)
input := sqltypes.Result{
Fields: []*query.Field{
{Name: "datetime_created_at", Type: sqltypes.Datetime, ColumnType: "datetime"},
{Name: "date_created_at", Type: sqltypes.Date, ColumnType: "date"},
{Name: "timestamp_created_at", Type: sqltypes.Time, ColumnType: "timestamp"},
},
Rows: [][]sqltypes.Value{
{datetimeValue, dateValue, timestampValue},
},
}

output := QueryResultToRecords(&input)
assert.Equal(t, 1, len(output))
row := output[0]
assert.Equal(t, "2025-02-14T08:08:08Z", row["datetime_created_at"].(sqltypes.Value).ToString())
assert.Equal(t, "2025-02-14", row["date_created_at"].(sqltypes.Value).ToString())
assert.Equal(t, "2025-02-14T08:08:08Z", row["timestamp_created_at"].(sqltypes.Value).ToString())
}

0 comments on commit 0c0cb75

Please sign in to comment.