Skip to content

Commit 45025cd

Browse files
feat(spanner): Add support for MYSQLDUMP in import command: (GoogleCloudPlatform#1089)
1. This enables importing data and schema from mysql dump. 2. It updates the provided database with the schema in the Mysql Dump. 3. Unit test covering the entire flow of Import command is added.
1 parent 5936a21 commit 45025cd

23 files changed

+1203
-119
lines changed

accessors/spanner/mocks.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ package spanneraccessor
1515

1616
import (
1717
"context"
18+
spannerclient "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/clients/spanner/client"
1819

1920
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
2021
)
@@ -38,6 +39,10 @@ type SpannerAccessorMock struct {
3839
DropDatabaseMock func(ctx context.Context, dbURI string) error
3940
ValidateDMLMock func(ctx context.Context, query string) (bool, error)
4041
TableExistsMock func(ctx context.Context, tableName string) (bool, error)
42+
GetDatabaseNameMock func() string
43+
RefreshMock func(ctx context.Context, dbURI string)
44+
SetSpannerClientMock func(spannerClient spannerclient.SpannerClient)
45+
GetSpannerClientMock func() spannerclient.SpannerClient
4146
}
4247

4348
func (sam *SpannerAccessorMock) GetDatabaseDialect(ctx context.Context, dbURI string) (string, error) {
@@ -99,3 +104,18 @@ func (sam *SpannerAccessorMock) ValidateDML(ctx context.Context, query string) (
99104
func (sam *SpannerAccessorMock) TableExists(ctx context.Context, tableName string) (bool, error) {
100105
return sam.TableExistsMock(ctx, tableName)
101106
}
107+
func (sam *SpannerAccessorMock) GetDatabaseName() string {
108+
return sam.GetDatabaseNameMock()
109+
}
110+
111+
func (sam *SpannerAccessorMock) Refresh(ctx context.Context, dbURI string) {
112+
sam.RefreshMock(ctx, dbURI)
113+
}
114+
115+
func (sam *SpannerAccessorMock) SetSpannerClient(spannerClient spannerclient.SpannerClient) {
116+
sam.SetSpannerClientMock(spannerClient)
117+
}
118+
119+
func (sam *SpannerAccessorMock) GetSpannerClient() spannerclient.SpannerClient {
120+
return sam.GetSpannerClientMock()
121+
}

accessors/spanner/spanner_accessor.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,14 @@ type SpannerAccessor interface {
8080
ValidateDML(ctx context.Context, query string) (bool, error)
8181

8282
TableExists(ctx context.Context, tableName string) (bool, error)
83+
84+
GetDatabaseName() string
85+
86+
Refresh(ctx context.Context, dbURI string)
87+
88+
SetSpannerClient(spannerClient spannerclient.SpannerClient)
89+
90+
GetSpannerClient() spannerclient.SpannerClient
8391
}
8492

8593
// This implements the SpannerAccessor interface. This is the primary implementation that should be used in all places other than tests.
@@ -508,6 +516,18 @@ func (sp *SpannerAccessorImpl) ValidateDML(ctx context.Context, query string) (b
508516
}
509517
}
510518

519+
func (sp *SpannerAccessorImpl) GetDatabaseName() string {
520+
return sp.SpannerClient.DatabaseName()
521+
}
522+
511523
func (sp *SpannerAccessorImpl) Refresh(ctx context.Context, dbURI string) {
512524
sp.SpannerClient.Refresh(ctx, dbURI)
513525
}
526+
527+
func (sp *SpannerAccessorImpl) SetSpannerClient(spannerClient spannerclient.SpannerClient) {
528+
sp.SpannerClient = spannerClient
529+
}
530+
531+
func (sp *SpannerAccessorImpl) GetSpannerClient() spannerclient.SpannerClient {
532+
return sp.SpannerClient
533+
}
Lines changed: 56 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ import (
1818
"context"
1919
"flag"
2020
"fmt"
21+
"github.com/GoogleCloudPlatform/spanner-migration-tool/import_file"
22+
"github.com/GoogleCloudPlatform/spanner-migration-tool/sources/spanner"
2123
"time"
2224

2325
spanneraccessor "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/spanner"
2426
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants"
25-
"github.com/GoogleCloudPlatform/spanner-migration-tool/import_data"
2627
"github.com/GoogleCloudPlatform/spanner-migration-tool/logger"
27-
"github.com/GoogleCloudPlatform/spanner-migration-tool/sources/spanner"
2828
"github.com/google/subcommands"
2929
"go.uber.org/zap"
3030
)
@@ -46,7 +46,7 @@ func (cmd *ImportDataCmd) SetFlags(set *flag.FlagSet) {
4646
set.StringVar(&cmd.databaseName, "database-name", "", "Spanner database name")
4747
set.StringVar(&cmd.tableName, "table-name", "", "Spanner table name")
4848
set.StringVar(&cmd.sourceUri, "source-uri", "", "URI of the file to import")
49-
set.StringVar(&cmd.sourceFormat, "source-format", "", "Format of the file to import. Valid values {csv}")
49+
set.StringVar(&cmd.sourceFormat, "source-format", "", "Format of the file to import. Valid values {csv, mysqldump}")
5050
set.StringVar(&cmd.schemaUri, "schema-uri", "", "URI of the file with schema for the csv to import. Only used for csv format.")
5151
set.StringVar(&cmd.csvLineDelimiter, "csv-line-delimiter", "", "Token to be used as line delimiter for csv format. Defaults to '\\n'. Only used for csv format.")
5252
set.StringVar(&cmd.csvFieldDelimiter, "csv-field-delimiter", "", "Token to be used as field delimiter for csv format. Defaults to ','. Only used for csv format.")
@@ -57,60 +57,97 @@ func (cmd *ImportDataCmd) Execute(ctx context.Context, f *flag.FlagSet, args ...
5757
logger.Log.Debug(fmt.Sprintf("instanceId %s, dbName %s, schemaUri %s\n", cmd.instanceId, cmd.databaseName, cmd.schemaUri))
5858

5959
dbURI := fmt.Sprintf("projects/%s/instances/%s/databases/%s", cmd.project, cmd.instanceId, cmd.databaseName)
60-
infoSchema, err := spanner.NewInfoSchemaImplWithSpannerClient(ctx, dbURI, constants.DIALECT_GOOGLESQL)
61-
if err != nil {
62-
logger.Log.Error(fmt.Sprintf("Unable to read Spanner schema %v", err))
63-
return subcommands.ExitFailure
64-
}
6560

6661
switch cmd.sourceFormat {
6762
case constants.CSV:
68-
err := cmd.handleCsv(ctx, infoSchema)
63+
//TODO: handle POSTGRESQL
64+
dialect := constants.DIALECT_GOOGLESQL
65+
err := cmd.handleCsv(ctx, dbURI, dialect)
6966
if err != nil {
7067
logger.Log.Error(fmt.Sprintf("Unable to handle Csv %v", err))
7168
return subcommands.ExitFailure
7269
}
7370
return subcommands.ExitSuccess
71+
case constants.MYSQLDUMP:
72+
err := cmd.handleDatabaseDumpFile(ctx, dbURI, constants.MYSQLDUMP, constants.DIALECT_GOOGLESQL)
73+
if err != nil {
74+
logger.Log.Error(fmt.Sprintf("Unable to handle MYSQL Dump %v. Please reachout to the support team.", err))
75+
return subcommands.ExitFailure
76+
}
77+
return subcommands.ExitSuccess
7478
default:
7579
logger.Log.Warn(fmt.Sprintf("format %s not supported yet", cmd.sourceFormat))
7680
}
7781
return subcommands.ExitFailure
7882
}
7983

80-
func (cmd *ImportDataCmd) handleCsv(ctx context.Context, infoSchema *spanner.InfoSchemaImpl) error {
81-
//TODO: handle POSTGRESQL
82-
dialect := constants.DIALECT_GOOGLESQL
83-
84-
dbURI := fmt.Sprintf("projects/%s/instances/%s/databases/%s", cmd.project, cmd.instanceId, cmd.databaseName)
84+
func (cmd *ImportDataCmd) handleCsv(ctx context.Context, dbURI string, dialect string) error {
85+
// TODO: move this to dependent classes
8586
sp, err := spanneraccessor.NewSpannerAccessorClientImplWithSpannerClient(ctx, dbURI)
8687
if err != nil {
87-
logger.Log.Error(fmt.Sprintf("Unable to instantiate spanner client %v", err))
88+
logger.Log.Error(fmt.Sprintf("Unable to read Spanner schema %v", err))
89+
return fmt.Errorf("unable to read Spanner schema %v", err)
90+
}
91+
infoSchema, err := spanner.NewInfoSchemaImplWithSpannerClient(ctx, dbURI, dialect)
92+
if err != nil {
93+
logger.Log.Error(fmt.Sprintf("Unable to read Spanner schema %v", err))
8894
return err
8995
}
9096

9197
startTime := time.Now()
92-
csvSchema := import_data.CsvSchemaImpl{ProjectId: cmd.project, InstanceId: cmd.instanceId,
98+
csvSchema := import_file.CsvSchemaImpl{ProjectId: cmd.project, InstanceId: cmd.instanceId,
9399
TableName: cmd.tableName, DbName: cmd.databaseName, SchemaUri: cmd.schemaUri, CsvFieldDelimiter: cmd.csvFieldDelimiter}
94100
err = csvSchema.CreateSchema(ctx, dialect, sp)
95101

96102
endTime1 := time.Now()
97103
elapsedTime := endTime1.Sub(startTime)
98-
fmt.Println("Schema creation took ", elapsedTime.Seconds(), " secs")
104+
logger.Log.Info(fmt.Sprintf("Schema creation took %f secs", elapsedTime.Seconds()))
99105
if err != nil {
100106
return err
101107
}
102108

103-
csvData := import_data.CsvDataImpl{ProjectId: cmd.project, InstanceId: cmd.instanceId,
109+
csvData := import_file.CsvDataImpl{ProjectId: cmd.project, InstanceId: cmd.instanceId,
104110
TableName: cmd.tableName, DbName: cmd.databaseName, SourceUri: cmd.sourceUri, CsvFieldDelimiter: cmd.csvFieldDelimiter}
105111
err = csvData.ImportData(ctx, infoSchema, dialect)
106112

107113
endTime2 := time.Now()
108114
elapsedTime = endTime2.Sub(endTime1)
109-
fmt.Println("Data import took ", elapsedTime.Seconds(), " secs")
115+
logger.Log.Info(fmt.Sprintf("Data import took %f secs", elapsedTime.Seconds()))
110116
return err
111117

112118
}
113119

120+
func (cmd *ImportDataCmd) handleDatabaseDumpFile(ctx context.Context, dbUri, sourceFormat string, dialect string) error {
121+
122+
importDump, err := import_file.NewImportFromDump(ctx, cmd.project, cmd.instanceId, cmd.databaseName, cmd.sourceUri, sourceFormat, dbUri)
123+
if err != nil {
124+
return fmt.Errorf("can't open dump file or create spanner client: %v", err)
125+
}
126+
127+
defer importDump.Close()
128+
129+
schemaStartTime := time.Now()
130+
conv, err := importDump.CreateSchema(ctx, dialect)
131+
if err != nil {
132+
return fmt.Errorf("can't create schema: %v", err)
133+
}
134+
135+
schemaEndTime := time.Now()
136+
elapsedTime := schemaEndTime.Sub(schemaStartTime)
137+
logger.Log.Info(fmt.Sprintf("Schema creation took %f secs", elapsedTime.Seconds()))
138+
139+
err = importDump.ImportData(ctx, conv)
140+
141+
dataEndTime := time.Now()
142+
elapsedTime = dataEndTime.Sub(schemaEndTime)
143+
logger.Log.Info(fmt.Sprintf("Data import took %f secs", elapsedTime.Seconds()))
144+
if err != nil {
145+
return fmt.Errorf("can't import data: %v", err)
146+
}
147+
return nil
148+
149+
}
150+
114151
func init() {
115152
logger.Log, _ = zap.NewDevelopment()
116153
}

cmd/import_data_test.go

Lines changed: 0 additions & 39 deletions
This file was deleted.

0 commit comments

Comments
 (0)