@@ -7,8 +7,11 @@ import (
7
7
8
8
"github.com/stretchr/testify/require"
9
9
10
+ "github.com/PeerDB-io/peerdb/flow/connectors"
10
11
"github.com/PeerDB-io/peerdb/flow/e2e"
11
12
"github.com/PeerDB-io/peerdb/flow/generated/protos"
13
+ "github.com/PeerDB-io/peerdb/flow/shared"
14
+ "github.com/PeerDB-io/peerdb/flow/shared/types"
12
15
peerflow "github.com/PeerDB-io/peerdb/flow/workflows"
13
16
)
14
17
@@ -547,3 +550,120 @@ func (s ClickHouseSuite) Test_MySQL_Specific_Geometric_Types() {
547
550
env .Cancel (s .t .Context ())
548
551
e2e .RequireEnvCanceled (s .t , env )
549
552
}
553
+
554
+ func (s ClickHouseSuite ) Test_MySQL_Schema_Changes () {
555
+ if _ , ok := s .source .(* e2e.MySqlSource ); ! ok {
556
+ s .t .Skip ("only applies to mysql" )
557
+ }
558
+
559
+ t := s .T ()
560
+ destinationSchemaConnector , ok := s .DestinationConnector ().(connectors.GetTableSchemaConnector )
561
+ if ! ok {
562
+ t .Skip ("skipping test because destination connector does not implement GetTableSchemaConnector" )
563
+ }
564
+
565
+ srcTable := "test_mysql_schema_changes"
566
+ dstTable := "test_mysql_schema_changes_dst"
567
+ srcTableName := e2e .AttachSchema (s , srcTable )
568
+ dstTableName := s .DestinationTable (dstTable )
569
+ secondSrcTable := "test_mysql_schema_changes_second"
570
+ secondDstTable := "test_mysql_schema_changes_second_dst"
571
+ secondSrcTableName := e2e .AttachSchema (s , secondSrcTable )
572
+
573
+ require .NoError (t , s .Source ().Exec (t .Context (), fmt .Sprintf (`
574
+ CREATE TABLE IF NOT EXISTS %s (
575
+ id SERIAL PRIMARY KEY,
576
+ c1 BIGINT
577
+ );
578
+ ` , srcTableName )))
579
+ require .NoError (t , s .Source ().Exec (t .Context (), fmt .Sprintf (`
580
+ CREATE TABLE IF NOT EXISTS %s (
581
+ id SERIAL PRIMARY KEY
582
+ );
583
+ ` , secondSrcTableName )))
584
+
585
+ connectionGen := e2e.FlowConnectionGenerationConfig {
586
+ FlowJobName : e2e .AddSuffix (s , srcTable ),
587
+ TableMappings : e2e .TableMappings (s , srcTable , dstTable , secondSrcTable , secondDstTable ),
588
+ Destination : s .Peer ().Name ,
589
+ }
590
+ flowConnConfig := connectionGen .GenerateFlowConnectionConfigs (s )
591
+
592
+ // wait for PeerFlowStatusQuery to finish setup
593
+ // and then insert and mutate schema repeatedly.
594
+ tc := e2e .NewTemporalClient (t )
595
+ env := e2e .ExecutePeerflow (t .Context (), tc , peerflow .CDCFlowWorkflow , flowConnConfig , nil )
596
+ e2e .SetupCDCFlowStatusQuery (t , env , flowConnConfig )
597
+ e2e .EnvNoError (t , env , s .Source ().Exec (t .Context (), fmt .Sprintf (`INSERT INTO %s(c1) VALUES(1)` , srcTableName )))
598
+
599
+ e2e .EnvWaitForEqualTablesWithNames (env , s , "normalize reinsert" , srcTable , dstTable , "id,c1" )
600
+
601
+ expectedTableSchema := & protos.TableSchema {
602
+ TableIdentifier : e2e .ExpectedDestinationTableName (s , dstTable ),
603
+ Columns : []* protos.FieldDescription {
604
+ {
605
+ Name : e2e .ExpectedDestinationIdentifier (s , "id" ),
606
+ Type : string (types .QValueKindNumeric ),
607
+ TypeModifier : - 1 ,
608
+ },
609
+ {
610
+ Name : e2e .ExpectedDestinationIdentifier (s , "c1" ),
611
+ Type : string (types .QValueKindNumeric ),
612
+ TypeModifier : - 1 ,
613
+ },
614
+ {
615
+ Name : "_PEERDB_IS_DELETED" ,
616
+ Type : string (types .QValueKindBoolean ),
617
+ TypeModifier : - 1 ,
618
+ },
619
+ {
620
+ Name : "_PEERDB_SYNCED_AT" ,
621
+ Type : string (types .QValueKindTimestamp ),
622
+ TypeModifier : - 1 ,
623
+ },
624
+ },
625
+ }
626
+ output , err := destinationSchemaConnector .GetTableSchema (t .Context (), nil , shared .InternalVersion_Latest , protos .TypeSystem_Q ,
627
+ []* protos.TableMapping {{SourceTableIdentifier : dstTableName }})
628
+ e2e .EnvNoError (t , env , err )
629
+ e2e .EnvTrue (t , env , e2e .CompareTableSchemas (expectedTableSchema , output [dstTableName ]))
630
+
631
+ // alter source table, add column addedColumn and insert another row.
632
+ e2e .EnvNoError (t , env , s .Source ().Exec (t .Context (), fmt .Sprintf ("ALTER TABLE %s ADD COLUMN `addedColumn` BIGINT" , srcTableName )))
633
+ // so that the batch finishes, insert a row into the second source table.
634
+ e2e .EnvNoError (t , env , s .Source ().Exec (t .Context (), fmt .Sprintf (`INSERT INTO %s VALUES(DEFAULT)` , secondSrcTableName )))
635
+ e2e .EnvWaitForEqualTablesWithNames (env , s , "normalize altered row" , srcTable , dstTable , "id,c1,coalesce(`addedColumn`,0) `addedColumn`" )
636
+ expectedTableSchema = & protos.TableSchema {
637
+ TableIdentifier : e2e .ExpectedDestinationTableName (s , dstTable ),
638
+ Columns : []* protos.FieldDescription {
639
+ {
640
+ Name : e2e .ExpectedDestinationIdentifier (s , "id" ),
641
+ Type : string (types .QValueKindNumeric ),
642
+ TypeModifier : - 1 ,
643
+ },
644
+ {
645
+ Name : e2e .ExpectedDestinationIdentifier (s , "c1" ),
646
+ Type : string (types .QValueKindNumeric ),
647
+ TypeModifier : - 1 ,
648
+ },
649
+ {
650
+ Name : "_PEERDB_SYNCED_AT" ,
651
+ Type : string (types .QValueKindTimestamp ),
652
+ TypeModifier : - 1 ,
653
+ },
654
+ {
655
+ Name : e2e .ExpectedDestinationIdentifier (s , "addedColumn" ),
656
+ Type : string (types .QValueKindNumeric ),
657
+ TypeModifier : - 1 ,
658
+ },
659
+ },
660
+ }
661
+ output , err = destinationSchemaConnector .GetTableSchema (t .Context (), nil , shared .InternalVersion_Latest , protos .TypeSystem_Q ,
662
+ []* protos.TableMapping {{SourceTableIdentifier : dstTableName }})
663
+ e2e .EnvNoError (t , env , err )
664
+ e2e .EnvTrue (t , env , e2e .CompareTableSchemas (expectedTableSchema , output [dstTableName ]))
665
+ e2e .EnvEqualTablesWithNames (env , s , srcTable , dstTable , "id,c1,coalesce(`addedColumn`,0) `addedColumn`" )
666
+
667
+ env .Cancel (t .Context ())
668
+ e2e .RequireEnvCanceled (t , env )
669
+ }
0 commit comments