From 8f2341d886f0a03286cd3bace1cc52606fae78c3 Mon Sep 17 00:00:00 2001 From: Dmytro Vovk Date: Tue, 11 Dec 2018 19:16:17 +0200 Subject: [PATCH 1/3] Add tablemover --- README.md | 8 +++++++- cmd/tablemover/tablemover.go | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) create mode 100644 cmd/tablemover/tablemover.go diff --git a/README.md b/README.md index 4da6a1b..99ff90e 100644 --- a/README.md +++ b/README.md @@ -84,4 +84,10 @@ Examples: ``` tablerestorer -database 'mysql_user:mysql_password@tcp(127.0.0.1)/db_name' -dry-run -filter 'table(name LIKE "1%" OR id > 1000)' -``` \ No newline at end of file +``` + +## tablemover + +Copies tables content from one database to another. + +// TODO diff --git a/cmd/tablemover/tablemover.go b/cmd/tablemover/tablemover.go new file mode 100644 index 0000000..cfb1e92 --- /dev/null +++ b/cmd/tablemover/tablemover.go @@ -0,0 +1,34 @@ +package main + +import ( + "flag" + "log" +) + +type hostConfig struct { + Hostname string + Port int + Database string + Login string + Username string + Password string + Tables []string +} + +type moverConfig struct { + Src hostConfig + Dst hostConfig +} + +func main() { + log.SetFlags(log.LstdFlags | log.Lshortfile) + cfg := &moverConfig{} + flag.StringVar(&cfg.Src.Hostname, "src-host", "localhost", "Source host name") + flag.StringVar(&cfg.Dst.Hostname, "dst-host", "localhost", "Destination host name") + flag.IntVar(&cfg.Src.Port, "src-port", 3306, "Source port number") + flag.IntVar(&cfg.Dst.Port, "dst-port", 3306, "Destination port number") + flag.StringVar(&cfg.Src.Database, "src-db", "", "Source database name") + flag.StringVar(&cfg.Dst.Database, "dst-db", "", "Destination database name") + flag.Parse() + // TODO +} From 7791c64eb2476239821718d82890aa741cc12bfc Mon Sep 17 00:00:00 2001 From: Dmytro Vovk Date: Wed, 12 Dec 2018 18:42:07 +0200 Subject: [PATCH 2/3] Working basic table mover --- cmd/tablemover/tablemover.go | 84 +++++++++++++- table_mover/mover.go | 211 +++++++++++++++++++++++++++++++++++ table_mover/mover_test.go | 34 ++++++ table_mover/table.go | 60 ++++++++++ 4 files changed, 385 insertions(+), 4 deletions(-) create mode 100644 table_mover/mover.go create mode 100644 table_mover/mover_test.go create mode 100644 table_mover/table.go diff --git a/cmd/tablemover/tablemover.go b/cmd/tablemover/tablemover.go index cfb1e92..b3e56a4 100644 --- a/cmd/tablemover/tablemover.go +++ b/cmd/tablemover/tablemover.go @@ -2,7 +2,15 @@ package main import ( "flag" + "fmt" "log" + "os" + "strings" + "sync" + "time" + + "github.com/BrightLocal/MySQLBackup/mylogin_reader" + "github.com/BrightLocal/MySQLBackup/table_mover" ) type hostConfig struct { @@ -12,12 +20,13 @@ type hostConfig struct { Login string Username string Password string - Tables []string + DSN string } type moverConfig struct { - Src hostConfig - Dst hostConfig + Src hostConfig + Dst hostConfig + Tables []string } func main() { @@ -29,6 +38,73 @@ func main() { flag.IntVar(&cfg.Dst.Port, "dst-port", 3306, "Destination port number") flag.StringVar(&cfg.Src.Database, "src-db", "", "Source database name") flag.StringVar(&cfg.Dst.Database, "dst-db", "", "Destination database name") + flag.StringVar(&cfg.Src.Login, "src-login", "", "Source db login path") + flag.StringVar(&cfg.Dst.Login, "dst-login", "", "Destination db login path") + flag.StringVar(&cfg.Src.Username, "src-user", "root", "Source db user name (incompatible with -src-login)") + flag.StringVar(&cfg.Dst.Username, "dst-user", "root", "Destination db user name (incompatible with -dst-login)") + flag.StringVar(&cfg.Src.Password, "src-password", "", "Source db password") + flag.StringVar(&cfg.Dst.Password, "dst-password", "", "Destination db password") + var tables string + flag.StringVar(&tables, "tables", "", "Comma separated list of tables to copy") flag.Parse() - // TODO + if tables == "" { + print("No tables specified\n") + flag.Usage() + os.Exit(1) + } + cfg.Tables = strings.Split(tables, ",") + if cfg.Src.Login == "" && cfg.Src.Username == "" { + print("Use either login or username/password for source database\n") + flag.Usage() + os.Exit(1) + } + if cfg.Dst.Login == "" && cfg.Dst.Username == "" { + print("Use either login or username/password for destination database\n") + flag.Usage() + os.Exit(1) + } + if cfg.Src.Login != "" { + reader := mylogin_reader.Read() + var err error + cfg.Src.DSN, err = reader.GetDSN(cfg.Src.Login) + if err != nil { + fmt.Printf("Error reading login: %s", err) + os.Exit(1) + } + } else { + if cfg.Src.Password == "" { + cfg.Src.DSN = fmt.Sprintf("%s@tcp(%s:%d)/%s", cfg.Src.Username, cfg.Src.Hostname, cfg.Src.Port, cfg.Src.Database) + } else { + cfg.Src.DSN = fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", cfg.Src.Username, cfg.Src.Password, cfg.Src.Hostname, cfg.Src.Port, cfg.Src.Database) + } + } + if cfg.Dst.Login != "" { + reader := mylogin_reader.Read() + var err error + cfg.Dst.DSN, err = reader.GetDSN(cfg.Dst.Login) + if err != nil { + fmt.Printf("Error reading login: %s", err) + os.Exit(1) + } + } else { + if cfg.Dst.Password == "" { + cfg.Dst.DSN = fmt.Sprintf("%s@tcp(%s:%d)/%s", cfg.Dst.Username, cfg.Dst.Hostname, cfg.Dst.Port, cfg.Dst.Database) + } else { + cfg.Dst.DSN = fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", cfg.Dst.Username, cfg.Dst.Password, cfg.Dst.Hostname, cfg.Dst.Port, cfg.Dst.Database) + } + } + wg := sync.WaitGroup{} + for _, table := range cfg.Tables { + wg.Add(1) + go func(t string) { + start := time.Now() + if err := table_mover.New(cfg.Src.DSN, cfg.Dst.DSN).Move(t); err != nil { + log.Printf("Error moving table %q: %s", t, err) + } else { + log.Printf("Table %q moved successfully in %s", t, time.Now().Sub(start)) + } + wg.Done() + }(table) + } + wg.Wait() } diff --git a/table_mover/mover.go b/table_mover/mover.go new file mode 100644 index 0000000..b939730 --- /dev/null +++ b/table_mover/mover.go @@ -0,0 +1,211 @@ +package table_mover + +import ( + "fmt" + "log" + "os" + "sync" + "time" + + _ "github.com/go-sql-driver/mysql" + "github.com/jmoiron/sqlx" + "github.com/pkg/errors" +) + +type Mover struct { + srcDSN string + dstDSN string + src *sqlx.DB + dst *sqlx.DB + log *log.Logger + stmtExists *sqlx.Stmt + stmtSelect *sqlx.Stmt + stmtInsert *sqlx.Stmt +} + +func New(src, dst string) *Mover { + return &Mover{ + srcDSN: src, + dstDSN: dst, + } +} + +const pageSize = 100 + +/* +[*] Check if src table exists +[*] Check if dst table exists +[ ] Create dst table if does not exist +[*] Compare src and dst schema +[*] Detect PK column +[*] Select all PKs from src table +[*] Check is PK present in dst table +[*] Copy missing rows from src to dst +*/ +func (m *Mover) Move(table string) error { + m.log = log.New(os.Stdout, fmt.Sprintf("[%s] ", table), log.Ltime|log.Lmicroseconds|log.Lshortfile) + m.connect() + var err error + srcTable, err := m.getTable(table, m.src) + if err != nil { + return errors.Wrapf(err, "error reading source table %q", table) + } + m.log.Printf("Detected primary key %q", srcTable.Primary) + dstTable, err := m.getTable(table, m.dst) + if err != nil { + return errors.Wrapf(err, "error reading destination table %q", table) + } + if ok, err := srcTable.Identical(dstTable); !ok { + return errors.Wrapf(err, "tables %s are not identical", table) + } + // Ready to copy + if m.stmtExists, err = m.dst.Preparex( + fmt.Sprintf( //language=MySQL + "SELECT COUNT(*) FROM `%s` WHERE `%s` = ?", + dstTable.Name, srcTable.PK(), + ), + ); err != nil { + return err + } + if m.stmtSelect, err = m.src.Preparex( + fmt.Sprintf( //language=MySQL + "SELECT * FROM `%s` WHERE `%s` = ? LIMIT 1", + srcTable.Name, srcTable.PK(), + ), + ); err != nil { + return err + } + if m.stmtInsert, err = m.dst.Preparex(dstTable.insert()); err != nil { + return err + } + pks := make(chan string) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + if err := m.readPKs(srcTable, pks); err != nil { + m.log.Printf("Error reading PKs: %s", err) + } + close(pks) + wg.Done() + }() + n := 0 + t := 0 + defer m.log.Printf("Moved %d rows", n) + tick := time.Now().Add(time.Minute) + for pk := range pks { + if !m.pkExists(pk) { + if err := m.migratePK(pk); err != nil { + return errors.Wrapf(err, "error migrating pk %s", pk) + } + n++ + } + t++ + if time.Now().After(tick) { + m.log.Printf("Moved %d rows, skipped %d", n, t-n) + tick = time.Now().Add(time.Minute) + } + } + wg.Wait() + return nil +} + +func (m *Mover) connect() { + var err error + m.src, err = sqlx.Connect("mysql", m.srcDSN) + if err != nil { + m.log.Fatalf("Error connecting to source database: %s", err) + } + if err = m.src.Ping(); err != nil { + m.log.Fatalf("Error pinging source database: %s", err) + } + m.dst, err = sqlx.Connect("mysql", m.dstDSN) + if err != nil { + m.log.Fatalf("Error connecting to destination database: %s", err) + } + if err = m.dst.Ping(); err != nil { + m.log.Fatalf("Error pinging destination database: %s", err) + } +} + +func (Mover) getTable(tableName string, db *sqlx.DB) (*table, error) { + rows, err := db.Query(fmt.Sprintf("EXPLAIN `%s`", tableName)) + if err != nil { + return nil, err + } + defer rows.Close() + var t table + t.Name = tableName + for rows.Next() { + var ( + field string + kind string + null string + key string + def *string + extra string + ) + if err := rows.Scan(&field, &kind, &null, &key, &def, &extra); err != nil { + return nil, err + } + t.Columns = append(t.Columns, column{ + Field: field, + Type: kind, + Null: null == "YES", + Key: key, + Default: def, + Extra: extra, + }) + if key == "PRI" { + t.Primary = field + } + } + return &t, nil +} + +func (m *Mover) readPKs(t *table, emit chan string) error { + var max int + if err := m.src.QueryRow( + fmt.Sprintf( //language=MySQL + "SELECT MAX(`%s`) FROM `%s`", + t.PK(), t.Name, + ), + ).Scan(&max); err != nil { + return err + } + pagedQuery := fmt.Sprintf( //language=MySQL + "SELECT `%s` FROM `%s` LIMIT ? OFFSET ?", + t.PK(), t.Name, + ) + for offset := 0; offset < max; offset += pageSize { + rows, err := m.src.Query(pagedQuery, pageSize, offset) + if err != nil { + return err + } + for rows.Next() { + var col string + if err := rows.Scan(&col); err != nil { + return err + } + emit <- col + } + rows.Close() + } + return nil +} + +func (m *Mover) pkExists(pk string) bool { + var count int + if err := m.stmtExists.QueryRowx(pk).Scan(&count); err != nil { + m.log.Fatalf("Error scanning: %s", err) + } + return count > 0 +} + +func (m *Mover) migratePK(pk string) error { + cols, err := m.stmtSelect.QueryRowx(pk).SliceScan() + if err != nil { + return err + } + _, err = m.stmtInsert.Exec(cols...) + return err +} diff --git a/table_mover/mover_test.go b/table_mover/mover_test.go new file mode 100644 index 0000000..dc16684 --- /dev/null +++ b/table_mover/mover_test.go @@ -0,0 +1,34 @@ +package table_mover + +import ( + "testing" + + _ "github.com/go-sql-driver/mysql" + "github.com/jmoiron/sqlx" + "github.com/kr/pretty" +) + +func TestTable(t *testing.T) { + m := New("", "") + db, err := sqlx.Connect("mysql", "bl:TmkhcS4Ft7MkC2Hq@tcp(host01)/brightlocal") + if err != nil { + t.Error(err) + } + table1, err := m.getTable("lsrc_rankings_shard_1", db) + if err != nil { + t.Error(err) + } + table2, err := m.getTable("lsrc_rankings_shard_2", db) + if err != nil { + t.Error(err) + } + if pk := table1.PK(); pk != "ranking_id" { + t.Errorf("Expected 'ranking_id', got %q", pk) + } + if ok, err := table1.Identical(table2); !ok { + t.Errorf("Not identical: %s", err) + t.Logf("%# v", pretty.Formatter(table1)) + t.Logf("%# v", pretty.Formatter(table2)) + } + t.Logf("%s", table1.insert()) +} diff --git a/table_mover/table.go b/table_mover/table.go new file mode 100644 index 0000000..1d98af5 --- /dev/null +++ b/table_mover/table.go @@ -0,0 +1,60 @@ +package table_mover + +import ( + "errors" + "fmt" + "strings" +) + +type column struct { + Field string + Type string + Null bool + Key string + Default *string + Extra string +} + +// Same enough, not completely +func (c column) Same(col column) bool { + return c.Field == col.Field && + c.Type == col.Type && + c.Null == col.Null +} + +type table struct { + Name string + Columns []column + Primary string +} + +func (t table) PK() string { + for _, col := range t.Columns { + if col.Key == "PRI" { + return col.Field + } + } + return "" +} + +func (t table) Identical(other *table) (bool, error) { + if len(t.Columns) != len(other.Columns) { + return false, errors.New("different number or Columns") + } + for i, col := range t.Columns { + if same := col.Same(other.Columns[i]); !same { + return false, fmt.Errorf("column %s is different", col.Field) + } + } + return true, nil +} + +func (t table) insert() string { + q := "INSERT INTO `" + t.Name + "` (" + cols := make([]string, len(t.Columns)) + for i, col := range t.Columns { + cols[i] = "`" + col.Field + "`" + } + q += strings.Join(cols, ",") + return q + ")VALUES(" + strings.Trim(strings.Repeat("?,", len(cols)), ",") + ")" +} From bdeb878b856854bf94a255c38b7dd69f37b28d19 Mon Sep 17 00:00:00 2001 From: Dmytro Vovk Date: Tue, 18 Dec 2018 12:02:14 +0200 Subject: [PATCH 3/3] Sort by PK --- table_mover/mover.go | 40 ++++++++++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/table_mover/mover.go b/table_mover/mover.go index b939730..6e5b0ce 100644 --- a/table_mover/mover.go +++ b/table_mover/mover.go @@ -2,8 +2,10 @@ package table_mover import ( "fmt" + "github.com/kr/pretty" "log" "os" + "strings" "sync" "time" @@ -30,7 +32,7 @@ func New(src, dst string) *Mover { } } -const pageSize = 100 +const pageSize = 1000 /* [*] Check if src table exists @@ -78,11 +80,24 @@ func (m *Mover) Move(table string) error { if m.stmtInsert, err = m.dst.Preparex(dstTable.insert()); err != nil { return err } + var startIndex *string + if err := m.dst.QueryRow( + fmt.Sprintf( //language=MySQL + "SELECT MAX(`%s`) FROM `%s`", + srcTable.PK(), dstTable.Name), + ).Scan(&startIndex); err != nil { + return err + } + if startIndex == nil { + zero := "0" + startIndex = &zero + } + m.log.Printf("Starting with pk %s", *startIndex) pks := make(chan string) wg := sync.WaitGroup{} wg.Add(1) go func() { - if err := m.readPKs(srcTable, pks); err != nil { + if err := m.readPKs(srcTable, *startIndex, pks); err != nil { m.log.Printf("Error reading PKs: %s", err) } close(pks) @@ -101,7 +116,7 @@ func (m *Mover) Move(table string) error { } t++ if time.Now().After(tick) { - m.log.Printf("Moved %d rows, skipped %d", n, t-n) + m.log.Printf("Moved %d rows, skipped %d, lastest key: %s", n, t-n, pk) tick = time.Now().Add(time.Minute) } } @@ -132,7 +147,7 @@ func (Mover) getTable(tableName string, db *sqlx.DB) (*table, error) { if err != nil { return nil, err } - defer rows.Close() + defer func() { _ = rows.Close() }() var t table t.Name = tableName for rows.Next() { @@ -162,7 +177,7 @@ func (Mover) getTable(tableName string, db *sqlx.DB) (*table, error) { return &t, nil } -func (m *Mover) readPKs(t *table, emit chan string) error { +func (m *Mover) readPKs(t *table, start interface{}, emit chan string) error { var max int if err := m.src.QueryRow( fmt.Sprintf( //language=MySQL @@ -173,11 +188,11 @@ func (m *Mover) readPKs(t *table, emit chan string) error { return err } pagedQuery := fmt.Sprintf( //language=MySQL - "SELECT `%s` FROM `%s` LIMIT ? OFFSET ?", - t.PK(), t.Name, + "SELECT `%s` FROM `%s` WHERE `%s` > ? ORDER BY `%s` LIMIT ? OFFSET ?", + t.PK(), t.Name, t.PK(), t.PK(), ) for offset := 0; offset < max; offset += pageSize { - rows, err := m.src.Query(pagedQuery, pageSize, offset) + rows, err := m.src.Query(pagedQuery, start, pageSize, offset) if err != nil { return err } @@ -188,7 +203,7 @@ func (m *Mover) readPKs(t *table, emit chan string) error { } emit <- col } - rows.Close() + _ = rows.Close() } return nil } @@ -201,11 +216,16 @@ func (m *Mover) pkExists(pk string) bool { return count > 0 } -func (m *Mover) migratePK(pk string) error { +func (m *Mover) migratePK(pk string) (err error) { cols, err := m.stmtSelect.QueryRowx(pk).SliceScan() if err != nil { return err } + defer func() { + if err != nil && strings.Contains(err.Error(), "truncated") { + m.log.Printf("Failed row: %# v", pretty.Formatter(cols)) + } + } () _, err = m.stmtInsert.Exec(cols...) return err }