Skip to content

Add tablemover #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,10 @@ Examples:

```
tablerestorer -database 'mysql_user:mysql_password@tcp(127.0.0.1)/db_name' -dry-run -filter 'table(name LIKE "1%" OR id > 1000)'
```
```

## tablemover

Copies tables content from one database to another.

// TODO
110 changes: 110 additions & 0 deletions cmd/tablemover/tablemover.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package main

import (
"flag"
"fmt"
"log"
"os"
"strings"
"sync"
"time"

"github.com/BrightLocal/MySQLBackup/mylogin_reader"
"github.com/BrightLocal/MySQLBackup/table_mover"
)

type hostConfig struct {
Hostname string
Port int
Database string
Login string
Username string
Password string
DSN string
}

type moverConfig struct {
Src hostConfig
Dst hostConfig
Tables []string
}

func main() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
cfg := &moverConfig{}
flag.StringVar(&cfg.Src.Hostname, "src-host", "localhost", "Source host name")
flag.StringVar(&cfg.Dst.Hostname, "dst-host", "localhost", "Destination host name")
flag.IntVar(&cfg.Src.Port, "src-port", 3306, "Source port number")
flag.IntVar(&cfg.Dst.Port, "dst-port", 3306, "Destination port number")
flag.StringVar(&cfg.Src.Database, "src-db", "", "Source database name")
flag.StringVar(&cfg.Dst.Database, "dst-db", "", "Destination database name")
flag.StringVar(&cfg.Src.Login, "src-login", "", "Source db login path")
flag.StringVar(&cfg.Dst.Login, "dst-login", "", "Destination db login path")
flag.StringVar(&cfg.Src.Username, "src-user", "root", "Source db user name (incompatible with -src-login)")
flag.StringVar(&cfg.Dst.Username, "dst-user", "root", "Destination db user name (incompatible with -dst-login)")
flag.StringVar(&cfg.Src.Password, "src-password", "", "Source db password")
flag.StringVar(&cfg.Dst.Password, "dst-password", "", "Destination db password")
var tables string
flag.StringVar(&tables, "tables", "", "Comma separated list of tables to copy")
flag.Parse()
if tables == "" {
print("No tables specified\n")
flag.Usage()
os.Exit(1)
}
cfg.Tables = strings.Split(tables, ",")
if cfg.Src.Login == "" && cfg.Src.Username == "" {
print("Use either login or username/password for source database\n")
flag.Usage()
os.Exit(1)
}
if cfg.Dst.Login == "" && cfg.Dst.Username == "" {
print("Use either login or username/password for destination database\n")
flag.Usage()
os.Exit(1)
}
if cfg.Src.Login != "" {
reader := mylogin_reader.Read()
var err error
cfg.Src.DSN, err = reader.GetDSN(cfg.Src.Login)
if err != nil {
fmt.Printf("Error reading login: %s", err)
os.Exit(1)
}
} else {
if cfg.Src.Password == "" {
cfg.Src.DSN = fmt.Sprintf("%s@tcp(%s:%d)/%s", cfg.Src.Username, cfg.Src.Hostname, cfg.Src.Port, cfg.Src.Database)
} else {
cfg.Src.DSN = fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", cfg.Src.Username, cfg.Src.Password, cfg.Src.Hostname, cfg.Src.Port, cfg.Src.Database)
}
}
if cfg.Dst.Login != "" {
reader := mylogin_reader.Read()
var err error
cfg.Dst.DSN, err = reader.GetDSN(cfg.Dst.Login)
if err != nil {
fmt.Printf("Error reading login: %s", err)
os.Exit(1)
}
} else {
if cfg.Dst.Password == "" {
cfg.Dst.DSN = fmt.Sprintf("%s@tcp(%s:%d)/%s", cfg.Dst.Username, cfg.Dst.Hostname, cfg.Dst.Port, cfg.Dst.Database)
} else {
cfg.Dst.DSN = fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", cfg.Dst.Username, cfg.Dst.Password, cfg.Dst.Hostname, cfg.Dst.Port, cfg.Dst.Database)
}
}
wg := sync.WaitGroup{}
for _, table := range cfg.Tables {
wg.Add(1)
go func(t string) {
start := time.Now()
if err := table_mover.New(cfg.Src.DSN, cfg.Dst.DSN).Move(t); err != nil {
log.Printf("Error moving table %q: %s", t, err)
} else {
log.Printf("Table %q moved successfully in %s", t, time.Now().Sub(start))
}
wg.Done()
}(table)
}
wg.Wait()
}
231 changes: 231 additions & 0 deletions table_mover/mover.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package table_mover

import (
"fmt"
"github.com/kr/pretty"
"log"
"os"
"strings"
"sync"
"time"

_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
)

type Mover struct {
srcDSN string
dstDSN string
src *sqlx.DB
dst *sqlx.DB
log *log.Logger
stmtExists *sqlx.Stmt
stmtSelect *sqlx.Stmt
stmtInsert *sqlx.Stmt
}

func New(src, dst string) *Mover {
return &Mover{
srcDSN: src,
dstDSN: dst,
}
}

const pageSize = 1000

/*
[*] Check if src table exists
[*] Check if dst table exists
[ ] Create dst table if does not exist
[*] Compare src and dst schema
[*] Detect PK column
[*] Select all PKs from src table
[*] Check is PK present in dst table
[*] Copy missing rows from src to dst
*/
func (m *Mover) Move(table string) error {
m.log = log.New(os.Stdout, fmt.Sprintf("[%s] ", table), log.Ltime|log.Lmicroseconds|log.Lshortfile)
m.connect()
var err error
srcTable, err := m.getTable(table, m.src)
if err != nil {
return errors.Wrapf(err, "error reading source table %q", table)
}
m.log.Printf("Detected primary key %q", srcTable.Primary)
dstTable, err := m.getTable(table, m.dst)
if err != nil {
return errors.Wrapf(err, "error reading destination table %q", table)
}
if ok, err := srcTable.Identical(dstTable); !ok {
return errors.Wrapf(err, "tables %s are not identical", table)
}
// Ready to copy
if m.stmtExists, err = m.dst.Preparex(
fmt.Sprintf( //language=MySQL
"SELECT COUNT(*) FROM `%s` WHERE `%s` = ?",
dstTable.Name, srcTable.PK(),
),
); err != nil {
return err
}
if m.stmtSelect, err = m.src.Preparex(
fmt.Sprintf( //language=MySQL
"SELECT * FROM `%s` WHERE `%s` = ? LIMIT 1",
srcTable.Name, srcTable.PK(),
),
); err != nil {
return err
}
if m.stmtInsert, err = m.dst.Preparex(dstTable.insert()); err != nil {
return err
}
var startIndex *string
if err := m.dst.QueryRow(
fmt.Sprintf( //language=MySQL
"SELECT MAX(`%s`) FROM `%s`",
srcTable.PK(), dstTable.Name),
).Scan(&startIndex); err != nil {
return err
}
if startIndex == nil {
zero := "0"
startIndex = &zero
}
m.log.Printf("Starting with pk %s", *startIndex)
pks := make(chan string)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
if err := m.readPKs(srcTable, *startIndex, pks); err != nil {
m.log.Printf("Error reading PKs: %s", err)
}
close(pks)
wg.Done()
}()
n := 0
t := 0
defer m.log.Printf("Moved %d rows", n)
tick := time.Now().Add(time.Minute)
for pk := range pks {
if !m.pkExists(pk) {
if err := m.migratePK(pk); err != nil {
return errors.Wrapf(err, "error migrating pk %s", pk)
}
n++
}
t++
if time.Now().After(tick) {
m.log.Printf("Moved %d rows, skipped %d, lastest key: %s", n, t-n, pk)
tick = time.Now().Add(time.Minute)
}
}
wg.Wait()
return nil
}

func (m *Mover) connect() {
var err error
m.src, err = sqlx.Connect("mysql", m.srcDSN)
if err != nil {
m.log.Fatalf("Error connecting to source database: %s", err)
}
if err = m.src.Ping(); err != nil {
m.log.Fatalf("Error pinging source database: %s", err)
}
m.dst, err = sqlx.Connect("mysql", m.dstDSN)
if err != nil {
m.log.Fatalf("Error connecting to destination database: %s", err)
}
if err = m.dst.Ping(); err != nil {
m.log.Fatalf("Error pinging destination database: %s", err)
}
}

func (Mover) getTable(tableName string, db *sqlx.DB) (*table, error) {
rows, err := db.Query(fmt.Sprintf("EXPLAIN `%s`", tableName))
if err != nil {
return nil, err
}
defer func() { _ = rows.Close() }()
var t table
t.Name = tableName
for rows.Next() {
var (
field string
kind string
null string
key string
def *string
extra string
)
if err := rows.Scan(&field, &kind, &null, &key, &def, &extra); err != nil {
return nil, err
}
t.Columns = append(t.Columns, column{
Field: field,
Type: kind,
Null: null == "YES",
Key: key,
Default: def,
Extra: extra,
})
if key == "PRI" {
t.Primary = field
}
}
return &t, nil
}

func (m *Mover) readPKs(t *table, start interface{}, emit chan string) error {
var max int
if err := m.src.QueryRow(
fmt.Sprintf( //language=MySQL
"SELECT MAX(`%s`) FROM `%s`",
t.PK(), t.Name,
),
).Scan(&max); err != nil {
return err
}
pagedQuery := fmt.Sprintf( //language=MySQL
"SELECT `%s` FROM `%s` WHERE `%s` > ? ORDER BY `%s` LIMIT ? OFFSET ?",
t.PK(), t.Name, t.PK(), t.PK(),
)
for offset := 0; offset < max; offset += pageSize {
rows, err := m.src.Query(pagedQuery, start, pageSize, offset)
if err != nil {
return err
}
for rows.Next() {
var col string
if err := rows.Scan(&col); err != nil {
return err
}
emit <- col
}
_ = rows.Close()
}
return nil
}

func (m *Mover) pkExists(pk string) bool {
var count int
if err := m.stmtExists.QueryRowx(pk).Scan(&count); err != nil {
m.log.Fatalf("Error scanning: %s", err)
}
return count > 0
}

func (m *Mover) migratePK(pk string) (err error) {
cols, err := m.stmtSelect.QueryRowx(pk).SliceScan()
if err != nil {
return err
}
defer func() {
if err != nil && strings.Contains(err.Error(), "truncated") {
m.log.Printf("Failed row: %# v", pretty.Formatter(cols))
}
} ()
_, err = m.stmtInsert.Exec(cols...)
return err
}
34 changes: 34 additions & 0 deletions table_mover/mover_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package table_mover

import (
"testing"

_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"github.com/kr/pretty"
)

func TestTable(t *testing.T) {
m := New("", "")
db, err := sqlx.Connect("mysql", "bl:TmkhcS4Ft7MkC2Hq@tcp(host01)/brightlocal")
if err != nil {
t.Error(err)
}
table1, err := m.getTable("lsrc_rankings_shard_1", db)
if err != nil {
t.Error(err)
}
table2, err := m.getTable("lsrc_rankings_shard_2", db)
if err != nil {
t.Error(err)
}
if pk := table1.PK(); pk != "ranking_id" {
t.Errorf("Expected 'ranking_id', got %q", pk)
}
if ok, err := table1.Identical(table2); !ok {
t.Errorf("Not identical: %s", err)
t.Logf("%# v", pretty.Formatter(table1))
t.Logf("%# v", pretty.Formatter(table2))
}
t.Logf("%s", table1.insert())
}
Loading