diff --git a/internal/sql/CiPipelineMaterial.go b/internal/sql/CiPipelineMaterial.go index b2634e38..bdb22942 100644 --- a/internal/sql/CiPipelineMaterial.go +++ b/internal/sql/CiPipelineMaterial.go @@ -23,7 +23,7 @@ import ( ) type CiPipelineMaterial struct { - tableName struct{} `sql:"ci_pipeline_material"` + tableName struct{} `sql:"ci_pipeline_material" pg:",discard_unknown_columns"` Id int `sql:"id"` GitMaterialId int `sql:"git_material_id"` //id stored in db GitMaterial( foreign key) Type SourceType `sql:"type"` @@ -32,20 +32,24 @@ type CiPipelineMaterial struct { LastSeenHash string `sql:"last_seen_hash,notnull"` CommitAuthor string `sql:"commit_author"` CommitDate time.Time `sql:"commit_date"` - - CommitHistory string `sql:"commit_history"` //last five commit for caching purpose1 - Errored bool `sql:"errored,notnull"` - ErrorMsg string `sql:"error_msg,notnull"` -} + CommitHistory string `sql:"commit_history"` //last five commit for caching purpose1 + Errored bool `sql:"errored,notnull"` + ErrorMsg string `sql:"error_msg,notnull"` +} type CiPipelineMaterialRepository interface { FindByGitMaterialId(gitMaterialId int) ([]*CiPipelineMaterial, error) Update(material []*CiPipelineMaterial) error + UpdateWithTransaction(materials []*CiPipelineMaterial, tx *pg.Tx) error FindByIds(ids []int) ([]*CiPipelineMaterial, error) FindById(id int) (*CiPipelineMaterial, error) Exists(id int) (bool, error) Save(material []*CiPipelineMaterial) ([]*CiPipelineMaterial, error) + FindSimilarCiPipelineMaterials(repoUrl string, branch string) ([]*CiPipelineMaterial, error) + FindAllCiPipelineMaterialsReferencingGivenMaterial(gitMaterialId int) ([]*CiPipelineMaterial, error) + UpdateErroredCiPipelineMaterialsReferencingGivenGitMaterial(gitMaterialId int, branch string, material *CiPipelineMaterial) error + UpdateCiPipelineMaterialsReferencingGivenGitMaterial(gitMaterialId int, branch string, material *CiPipelineMaterial) error } type CiPipelineMaterialRepositoryImpl struct { @@ -90,6 +94,33 @@ func (impl CiPipelineMaterialRepositoryImpl) Update(materials []*CiPipelineMater return err } +func (impl CiPipelineMaterialRepositoryImpl) UpdateWithTransaction(materials []*CiPipelineMaterial, tx *pg.Tx) error { + + for _, material := range materials { + _, err := tx.Model(material). + WherePK(). + UpdateNotNull() + + if err != nil { + return err + } + } + return nil +} + +func (impl CiPipelineMaterialRepositoryImpl) FindSimilarCiPipelineMaterials(repoUrl string, branch string) ([]*CiPipelineMaterial, error) { + materials := make([]*CiPipelineMaterial, 0) + err := impl.dbConnection.Model(&materials). + Join("INNER JOIN git_material gm ON gm.id = ci_pipeline_material.git_material_id"). + Where("gm.url = ?", repoUrl). + Where("gm.deleted = ?", false). + Where("ci_pipeline_material.value = ?", branch). + Where("ci_pipeline_material.active = ?", true). + Select() + + return materials, err +} + func (impl CiPipelineMaterialRepositoryImpl) FindByIds(ids []int) (materials []*CiPipelineMaterial, err error) { err = impl.dbConnection.Model(&materials). Where("id in (?)", pg.In(ids)). @@ -104,3 +135,72 @@ func (impl CiPipelineMaterialRepositoryImpl) FindById(id int) (*CiPipelineMateri Where("active = ?", true).Select() return materials, err } + +func (impl CiPipelineMaterialRepositoryImpl) FindAllCiPipelineMaterialsReferencingGivenMaterial(gitMaterialId int) ([]*CiPipelineMaterial, error) { + ciPipelineMaterials := make([]*CiPipelineMaterial, 0) + //err := impl.dbConnection.Model(&ciPipelineMaterials). + // ColumnExpr("DISTINCT ci_pipeline_material.value"). + // Column("ci_pipeline_material.*"). + // Join("INNER JOIN git_material gm ON gm.id = ci_pipeline_material.git_material_id"). + // Where("gm.ref_git_material_id = ?", gitMaterialId). + // Where("gm.deleted = ?", false). + // Where("ci_pipeline_material.active = ?", true). + // Where("ci_pipeline_material.type = ?", "SOURCE_TYPE_BRANCH_FIXED"). + // Select() + + query := "select * from (" + + "select *, row_number() over (partition by value order by commit_date desc) as row_num from ci_pipeline_material cpm " + + "inner join git_material gm on gm.id = cpm.git_material_id " + + "where (gm.ref_git_material_id = ?) and (gm.deleted = false) and (cpm.active = true) and (cpm.\"type\" = 'SOURCE_TYPE_BRANCH_FIXED')" + + ") materials where row_num <= 1" + + _, err := impl.dbConnection.Query(&ciPipelineMaterials, query, gitMaterialId) + return ciPipelineMaterials, err +} + +func (impl CiPipelineMaterialRepositoryImpl) UpdateErroredCiPipelineMaterialsReferencingGivenGitMaterial(gitMaterialId int, branch string, material *CiPipelineMaterial) error { + + //_, err := impl.dbConnection.Model(material). + // Column("ci_pipeline_material.errored", "ci_pipeline_material.error_msg"). + // Join("INNER JOIN git_material gm ON gm.id = ci_pipeline_material.git_material_id"). + // Where("gm.ref_git_material_id = ?", gitMaterialId). + // Where("ci_pipeline_material.value = ?", branch). + // Update() + + query := "UPDATE ci_pipeline_material cpm " + + "SET errored = ?, " + + "error_msg = ? " + + "FROM git_material gm WHERE (cpm.git_material_id = gm.id) " + + "AND (gm.ref_git_material_id = ?) " + + "AND (cpm.value = ?)" + + _, err := impl.dbConnection.Query(material, query, material.Errored, material.ErrorMsg, gitMaterialId, branch) + return err +} + +func (impl CiPipelineMaterialRepositoryImpl) UpdateCiPipelineMaterialsReferencingGivenGitMaterial(gitMaterialId int, branch string, material *CiPipelineMaterial) error { + + //_, err := impl.dbConnection.Model(material). + // Column("ci_pipeline_material.last_seen_hash", "ci_pipeline_material.commit_author", "ci_pipeline_material.commit_date", + // "ci_pipeline_material.commit_history", "ci_pipeline_material.errored", "ci_pipeline_material.error_msg"). + // Join("INNER JOIN git_material gm ON gm.id = ci_pipeline_material.git_material_id"). + // Where("gm.ref_git_material_id = ?", gitMaterialId). + // Where("ci_pipeline_material.value = ?", branch). + // Update() + + query := "UPDATE ci_pipeline_material cpm " + + "SET errored = ?, " + + "error_msg = ?, " + + "last_seen_hash = ?, " + + "commit_author = ?, " + + "commit_date = ?, " + + "commit_history = ? " + + "FROM git_material gm WHERE (cpm.git_material_id = gm.id) " + + "AND (gm.ref_git_material_id = ?) " + + "AND (cpm.value = ?)" + + _, err := impl.dbConnection.Query(material, query, material.Errored, material.ErrorMsg, material.LastSeenHash, + material.CommitAuthor, material.CommitDate, material.CommitHistory, gitMaterialId, branch) + + return err +} diff --git a/internal/sql/GitMaterial.go b/internal/sql/GitMaterial.go index b2d1a355..eb1194a4 100644 --- a/internal/sql/GitMaterial.go +++ b/internal/sql/GitMaterial.go @@ -31,9 +31,9 @@ const ( SOURCE_TYPE_WEBHOOK SourceType = "WEBHOOK" ) -//TODO: add support for submodule +// TODO: add support for submodule type GitMaterial struct { - tableName struct{} `sql:"git_material"` + tableName struct{} `sql:"git_material" pg:",discard_unknown_columns"` Id int `sql:"id,pk"` GitProviderId int `sql:"git_provider_id,notnull"` Url string `sql:"url,omitempty"` @@ -48,17 +48,28 @@ type GitMaterial struct { FetchStatus bool `json:"fetch_status"` LastFetchErrorCount int `json:"last_fetch_error_count"` //continues fetch error FetchErrorMessage string `json:"fetch_error_message"` + RefGitMaterialId int `sql:"ref_git_material_id"` GitProvider *GitProvider CiPipelineMaterials []*CiPipelineMaterial } type MaterialRepository interface { + GetConnection() *pg.DB FindById(id int) (*GitMaterial, error) + FindByIdWithCiMaterials(id int) (*GitMaterial, error) Update(material *GitMaterial) error Save(material *GitMaterial) error + SaveWithTransaction(material *GitMaterial, tx *pg.Tx) error FindActive() ([]*GitMaterial, error) FindAll() ([]*GitMaterial, error) FindAllActiveByUrls(urls []string) ([]*GitMaterial, error) + FindAllReferencedGitMaterials() ([]*GitMaterial, error) + FindReferencedGitMaterial(materialId int) (*GitMaterial, error) + GetMaterialWithSameRepoUrl(repoUrl string, limit int) (*GitMaterial, error) + UpdateRefMaterialId(material *GitMaterial, tx *pg.Tx) error + FindByReferenceId(refGitMaterialId int, limit int, excludingIds []int) (*GitMaterial, error) + UpdateWithTransaction(material *GitMaterial, tx *pg.Tx) error + UpdateRefMaterialIdForAllWithSameUrl(url string, refGitMaterialId int, tx *pg.Tx) error } type MaterialRepositoryImpl struct { dbConnection *pg.DB @@ -68,20 +79,37 @@ func NewMaterialRepositoryImpl(dbConnection *pg.DB) *MaterialRepositoryImpl { return &MaterialRepositoryImpl{dbConnection: dbConnection} } +func (repo MaterialRepositoryImpl) GetConnection() *pg.DB { + return repo.dbConnection +} + func (repo MaterialRepositoryImpl) Save(material *GitMaterial) error { err := repo.dbConnection.Insert(material) return err } +func (repo MaterialRepositoryImpl) SaveWithTransaction(material *GitMaterial, tx *pg.Tx) error { + err := tx.Insert(material) + return err +} + func (repo MaterialRepositoryImpl) Update(material *GitMaterial) error { _, err := repo.dbConnection.Model(material).WherePK().Update() return err } +func (repo MaterialRepositoryImpl) UpdateWithTransaction(material *GitMaterial, tx *pg.Tx) error { + _, err := tx.Model(material). + WherePK(). + Update() + + return err +} + func (repo MaterialRepositoryImpl) FindActive() ([]*GitMaterial, error) { var materials []*GitMaterial err := repo.dbConnection.Model(&materials). - Column("git_material.*", "GitProvider", ). + Column("git_material.*", "GitProvider"). Relation("CiPipelineMaterials", func(q *orm.Query) (*orm.Query, error) { return q.Where("active IS TRUE"), nil }). @@ -111,7 +139,20 @@ func (repo MaterialRepositoryImpl) FindById(id int) (*GitMaterial, error) { return &material, err } -func (repo MaterialRepositoryImpl) FindAllActiveByUrls(urls[] string) ([]*GitMaterial, error) { +func (repo MaterialRepositoryImpl) FindByIdWithCiMaterials(id int) (*GitMaterial, error) { + var material GitMaterial + err := repo.dbConnection.Model(&material). + Column("git_material.*", "GitProvider"). + Relation("CiPipelineMaterials", func(q *orm.Query) (*orm.Query, error) { + return q.Where("active IS TRUE"), nil + }). + Where("git_material.id =? ", id). + Where("git_material.deleted =? ", false). + Select() + return &material, err +} + +func (repo MaterialRepositoryImpl) FindAllActiveByUrls(urls []string) ([]*GitMaterial, error) { var materials []*GitMaterial err := repo.dbConnection.Model(&materials). Relation("CiPipelineMaterials", func(q *orm.Query) (*orm.Query, error) { @@ -122,3 +163,81 @@ func (repo MaterialRepositoryImpl) FindAllActiveByUrls(urls[] string) ([]*GitMat Select() return materials, err } + +func (repo MaterialRepositoryImpl) FindAllReferencedGitMaterials() ([]*GitMaterial, error) { + + var materials []*GitMaterial + + err := repo.dbConnection.Model(&materials). + ColumnExpr("DISTINCT(git_material.id)"). + Column("git_material.*", "GitProvider"). + Join("RIGHT JOIN git_material gm ON git_material.id = gm.ref_git_material_id"). + Where("gm.deleted = ? ", false). + Where("gm.checkout_status = ? ", true). + Order("git_material.id ASC"). + Select() + + return materials, err +} + +func (repo MaterialRepositoryImpl) FindReferencedGitMaterial(materialId int) (*GitMaterial, error) { + + material := &GitMaterial{} + err := repo.dbConnection.Model(material). + Column("git_material.*", "GitProvider"). + Join("RIGHT JOIN git_material gm ON git_material.id = gm.ref_git_material_id"). + Where("gm.id = ? ", materialId). + Select() + + return material, err +} + +func (repo MaterialRepositoryImpl) GetMaterialWithSameRepoUrl(repoUrl string, limit int) (*GitMaterial, error) { + + material := &GitMaterial{} + err := repo.dbConnection.Model(material). + Where("url = ?", repoUrl). + Where("deleted = false"). + Limit(limit). + Select() + + return material, err +} + +func (repo MaterialRepositoryImpl) UpdateRefMaterialId(material *GitMaterial, tx *pg.Tx) error { + + _, err := tx.Model(material). + Column("ref_git_material_id"). + WherePK(). + Update() + + return err +} + +func (repo MaterialRepositoryImpl) UpdateRefMaterialIdForAllWithSameUrl(url string, refGitMaterialId int, tx *pg.Tx) error { + + material := &GitMaterial{} + _, err := tx.Model(material). + Set("ref_git_material_id = ?", refGitMaterialId). + Where("url = ?", url). + Where("deleted = ?", false). + Update() + + return err +} + +func (repo MaterialRepositoryImpl) FindByReferenceId(refGitMaterialId int, limit int, excludingIds []int) (*GitMaterial, error) { + var material GitMaterial + qry := repo.dbConnection.Model(&material). + Column("git_material.*", "GitProvider"). + Where("git_material.ref_git_material_id = ? ", refGitMaterialId). + Where("git_material.deleted =? ", false). + Limit(limit) + + if len(excludingIds) > 0 { + qry = qry.Where("git_material.id not in (?)", pg.In(excludingIds)) + } + + err := qry.Select() + return &material, err +} diff --git a/pkg/RepoManages.go b/pkg/RepoManages.go index f625c248..5d518361 100644 --- a/pkg/RepoManages.go +++ b/pkg/RepoManages.go @@ -24,8 +24,10 @@ import ( "github.com/devtron-labs/git-sensor/internal/sql" "github.com/devtron-labs/git-sensor/internal/util" "github.com/devtron-labs/git-sensor/pkg/git" + "github.com/go-pg/pg" _ "github.com/robfig/cron/v3" "go.uber.org/zap" + "time" ) type RepoManager interface { @@ -50,6 +52,8 @@ type RepoManager interface { GetWebhookEventConfig(eventId int) (*git.WebhookEventConfig, error) GetWebhookPayloadDataForPipelineMaterialId(request *git.WebhookPayloadDataRequest) (*git.WebhookPayloadDataResponse, error) GetWebhookPayloadFilterDataForPipelineMaterialId(request *git.WebhookPayloadFilterDataRequest) (*git.WebhookPayloadFilterDataResponse, error) + + CheckoutMaterialWithTransaction(material *sql.GitMaterial, tx *pg.Tx) (*sql.GitMaterial, error) } type RepoManagerImpl struct { @@ -170,6 +174,44 @@ func (impl RepoManagerImpl) InactivateWebhookDataMappingForPipelineMaterials(old return nil } +func (impl RepoManagerImpl) UpdatePipelineMaterialCommitWithTransaction(material *sql.GitMaterial, tx *pg.Tx) error { + newCiMaterials := make([]*sql.CiPipelineMaterial, 0, len(material.CiPipelineMaterials)) + + for _, ciMaterial := range material.CiPipelineMaterials { + + commits, err := impl.repositoryManager.ChangesSince(material.CheckoutLocation, ciMaterial.Value, "", "", 0) + if err != nil { + ciMaterial.Errored = true + ciMaterial.ErrorMsg = err.Error() + ciMaterial.LastSeenHash = "" + + } else { + + // commits found + b, err := json.Marshal(commits) + if err != nil { + ciMaterial.Errored = true + ciMaterial.ErrorMsg = err.Error() + ciMaterial.LastSeenHash = "" + + } else { + ciMaterial.CommitHistory = string(b) + if len(commits) > 0 { + latestCommit := commits[0] + ciMaterial.LastSeenHash = latestCommit.Commit + ciMaterial.CommitAuthor = latestCommit.Author + ciMaterial.CommitDate = latestCommit.Date + } + ciMaterial.Errored = false + ciMaterial.ErrorMsg = "" + } + } + newCiMaterials = append(newCiMaterials, ciMaterial) + } + err := impl.ciPipelineMaterialRepository.UpdateWithTransaction(newCiMaterials, tx) + return err +} + func (impl RepoManagerImpl) updatePipelineMaterialCommit(materials []*sql.CiPipelineMaterial) error { var materialCommits []*sql.CiPipelineMaterial for _, pipelineMaterial := range materials { @@ -238,7 +280,7 @@ func (impl RepoManagerImpl) SaveGitProvider(provider *sql.GitProvider) (*sql.Git return provider, err } -//handle update +// handle update func (impl RepoManagerImpl) AddRepo(materials []*sql.GitMaterial) ([]*sql.GitMaterial, error) { for _, material := range materials { _, err := impl.addRepo(material) @@ -250,12 +292,141 @@ func (impl RepoManagerImpl) AddRepo(materials []*sql.GitMaterial) ([]*sql.GitMat return materials, nil } +func (impl RepoManagerImpl) ReconfigureRefGitMaterialId(currentRef int, tx *pg.Tx) (*sql.GitMaterial, error) { + + excludingIds := []int{currentRef} + nxtMaterial, err := impl.materialRepository.FindByReferenceId(currentRef, 1, excludingIds) + if err != nil && err != pg.ErrNoRows { + impl.logger.Errorw("error while getting next material with same repo url", + "err", err) + + return nil, err + + } else if err == pg.ErrNoRows { + // If no other materials found with same url + impl.logger.Infow("no other git materials found referencing given material id", + "materialId", currentRef) + return nil, nil + } + + // If other materials found + // update ref for all materials with same repo url + err = impl.materialRepository.UpdateRefMaterialIdForAllWithSameUrl(nxtMaterial.Url, nxtMaterial.Id, tx) + if err != nil { + impl.logger.Errorw("error while updating ref git material id", + "err", err) + return nil, err + } else { + impl.logger.Infow("ref updated for other git materials", + "new ref git material id", nxtMaterial.Id) + } + + // refresh git material + nxtMaterial.RefGitMaterialId = nxtMaterial.Id + + // poll and update git material with transaction + repoLock := impl.locker.LeaseLocker(nxtMaterial.Id) + repoLock.Mutex.Lock() + defer func() { + repoLock.Mutex.Unlock() + impl.locker.ReturnLocker(nxtMaterial.Id) + }() + + err = impl.gitWatcher.PollGitMaterialAndNotify(nxtMaterial) + + // update last fetch status, counters and time + nxtMaterial.LastFetchTime = time.Now() + nxtMaterial.FetchStatus = err == nil + if err != nil { + nxtMaterial.LastFetchErrorCount = nxtMaterial.LastFetchErrorCount + 1 + nxtMaterial.FetchErrorMessage = err.Error() + } else { + nxtMaterial.LastFetchErrorCount = 0 + nxtMaterial.FetchErrorMessage = "" + } + + err = impl.materialRepository.UpdateWithTransaction(nxtMaterial, tx) + if err != nil { + impl.logger.Errorw("error in updating fetch status", + "material", nxtMaterial, + "err", err) + + return nil, err + } + return nxtMaterial, nil +} + func (impl RepoManagerImpl) UpdateRepo(material *sql.GitMaterial) (*sql.GitMaterial, error) { - existingMaterial, err := impl.materialRepository.FindById(material.Id) + existingMaterial, err := impl.materialRepository.FindByIdWithCiMaterials(material.Id) if err != nil { impl.logger.Errorw("error in fetching material", err) return nil, err } + + isSelfReferenced := existingMaterial.Id == existingMaterial.RefGitMaterialId + tx, err := impl.materialRepository.GetConnection(). + Begin() + + defer tx.Rollback() + + if err != nil { + impl.logger.Errorw("error while starting transaction", + "err", err) + return nil, err + } + + // if url is updated, ref_git_material_id needs to be updated as well + if material.Url != existingMaterial.Url { + // url is updated + impl.logger.Infow("repo url is updated", + "materialId", material.Id, + "oldUrl", existingMaterial.Url, + "newUrl", material.Url) + + if isSelfReferenced { + // as it is self referenced, need to update ref of other git materials + impl.logger.Infow("git material is referenced by other git materials, changing ref for other git materials", + "materialId", material.Id) + + _, err := impl.ReconfigureRefGitMaterialId(existingMaterial.RefGitMaterialId, tx) + if err != nil { + impl.logger.Errorw("error reconfiguring ref for other materials", + "err", err) + return nil, err + } + } + + materialWithSameUrl, err := impl.materialRepository.GetMaterialWithSameRepoUrl(material.Url, 1) + if err != nil && err != pg.ErrNoRows { + impl.logger.Errorw("error fetching existing git material with same url", + "material", material, + "err", err) + + return nil, err + } + + // if no existing material found + if err == pg.ErrNoRows { + existingMaterial.RefGitMaterialId = existingMaterial.Id + + } else { + // else use existing ones ref id + existingMaterial.RefGitMaterialId = materialWithSameUrl.RefGitMaterialId + } + } + + if material.Deleted && isSelfReferenced { + // as it is self referenced, need to update ref of other git materials + impl.logger.Info("git material delete request. updating ref of other git materials") + + _, err := impl.ReconfigureRefGitMaterialId(existingMaterial.RefGitMaterialId, tx) + if err != nil { + impl.logger.Errorw("error reconfiguring ref for other materials", + "err", err) + return nil, err + } + } + existingMaterial.Name = material.Name existingMaterial.Url = material.Url existingMaterial.GitProviderId = material.GitProviderId @@ -263,7 +434,7 @@ func (impl RepoManagerImpl) UpdateRepo(material *sql.GitMaterial) (*sql.GitMater existingMaterial.CheckoutStatus = false existingMaterial.FetchSubmodules = material.FetchSubmodules - err = impl.materialRepository.Update(existingMaterial) + err = impl.materialRepository.UpdateWithTransaction(existingMaterial, tx) if err != nil { impl.logger.Errorw("error in updating material ", "material", material, "err", err) return nil, err @@ -283,12 +454,20 @@ func (impl RepoManagerImpl) UpdateRepo(material *sql.GitMaterial) (*sql.GitMater } if !existingMaterial.Deleted { - err = impl.checkoutUpdatedRepo(material.Id) + _, err = impl.CheckoutMaterialWithTransaction(existingMaterial, tx) if err != nil { impl.logger.Errorw("error in checking out updated repo", "err", err) return nil, err } } + + err = tx.Commit() + if err != nil { + impl.logger.Errorw("error while committing transaction", + "err", err) + return nil, err + } + return existingMaterial, nil } @@ -307,14 +486,123 @@ func (impl RepoManagerImpl) checkoutUpdatedRepo(materialId int) error { } func (impl RepoManagerImpl) addRepo(material *sql.GitMaterial) (*sql.GitMaterial, error) { - err := impl.materialRepository.Save(material) + + updateRefWithOwnId := false + + // Get the ref_material_id of the material (same url) that already exists + existingMaterial, err := impl.materialRepository.GetMaterialWithSameRepoUrl(material.Url, 1) + if err != nil && err != pg.ErrNoRows { + impl.logger.Errorw("error fetching existing git material with same url", + "material", material, + "err", err) + + return material, err + } + + // if no existing material found + if err == pg.ErrNoRows { + // Use own id as ref + updateRefWithOwnId = true + } else { + // else use existing ones ref id + material.RefGitMaterialId = existingMaterial.RefGitMaterialId + } + + // perform in transaction + tx, err := impl.materialRepository. + GetConnection(). + Begin() + + defer tx.Rollback() + + err = impl.materialRepository.SaveWithTransaction(material, tx) if err != nil { - impl.logger.Errorw("error in saving material ", "material", material, "err", err) + impl.logger.Errorw("error in saving material ", + "material", material, + "err", err) + return material, err } + + // if updateRefWithOwnId is true, update ref column + if !updateRefWithOwnId { + err := tx.Commit() + if err != nil { + impl.logger.Errorw("error while committing changes", + "operation", "save git material", + "err", err) + + return material, err + } + } else { + + material.RefGitMaterialId = material.Id + err := impl.materialRepository.UpdateRefMaterialId(material, tx) + if err != nil { + impl.logger.Errorw("error updating ref material id with self id. performing rollback", + "material", material, + "err", err) + + return material, err + } + + err = tx.Commit() + if err != nil { + impl.logger.Errorw("error while committing changes", + "operation", "updating ref material id", + "err", err) + + return material, err + } + } + return impl.checkoutRepo(material) } +func (impl RepoManagerImpl) CheckoutMaterialWithTransaction(material *sql.GitMaterial, tx *pg.Tx) (*sql.GitMaterial, error) { + + username, password, err := git.GetUserNamePassword(material.GitProvider) + location, err := git.GetLocationForMaterial(material) + if err != nil { + impl.logger.Errorw("error in getting credentials/location", + "materialId", material.Id, + "authMode", material.GitProvider.AuthMode, + "url", material.Url, + "err", err) + + return nil, err + } + + err = impl.repositoryManager.Add(material.GitProviderId, location, material.Url, username, password, + material.GitProvider.AuthMode, material.GitProvider.SshPrivateKey) + + if err == nil { + material.CheckoutLocation = location + material.CheckoutStatus = true + } else { + material.CheckoutStatus = false + material.CheckoutMsgAny = err.Error() + material.FetchErrorMessage = err.Error() + } + + err = impl.materialRepository.UpdateWithTransaction(material, tx) + if err != nil { + impl.logger.Errorw("error in updating material", + "err", err, + "material", material) + return nil, err + } + + err = impl.UpdatePipelineMaterialCommitWithTransaction(material, tx) + if err != nil { + impl.logger.Errorw("error while updating pipeline material commit", + "err", err) + return nil, err + } + + return material, nil +} + func (impl RepoManagerImpl) checkoutRepo(material *sql.GitMaterial) (*sql.GitMaterial, error) { repoLock := impl.locker.LeaseLocker(material.Id) repoLock.Mutex.Lock() @@ -553,20 +841,20 @@ func (impl RepoManagerImpl) GetCommitMetadata(pipelineMaterialId int, gitHash st if err != nil { return nil, err } - gitMaterial, err := impl.materialRepository.FindById(pipelineMaterial.GitMaterialId) + referencedGitMaterial, err := impl.materialRepository.FindReferencedGitMaterial(pipelineMaterial.GitMaterialId) if err != nil { return nil, err } - if !gitMaterial.CheckoutStatus { - return nil, fmt.Errorf("checkout not succeed please checkout first %s", gitMaterial.Url) + if !referencedGitMaterial.CheckoutStatus { + return nil, fmt.Errorf("checkout not succeed please checkout first %s", referencedGitMaterial.Url) } - repoLock := impl.locker.LeaseLocker(gitMaterial.Id) + repoLock := impl.locker.LeaseLocker(referencedGitMaterial.Id) repoLock.Mutex.Lock() defer func() { repoLock.Mutex.Unlock() - impl.locker.ReturnLocker(gitMaterial.Id) + impl.locker.ReturnLocker(referencedGitMaterial.Id) }() - commit, err := impl.repositoryManager.GetCommitMetadata(gitMaterial.CheckoutLocation, gitHash) + commit, err := impl.repositoryManager.GetCommitMetadata(referencedGitMaterial.CheckoutLocation, gitHash) return commit, err } @@ -578,25 +866,25 @@ func (impl RepoManagerImpl) GetLatestCommitForBranch(pipelineMaterialId int, bra return nil, err } - gitMaterial, err := impl.materialRepository.FindById(pipelineMaterial.GitMaterialId) + referencedGitMaterial, err := impl.materialRepository.FindReferencedGitMaterial(pipelineMaterial.GitMaterialId) if err != nil { impl.logger.Errorw("error in getting material ", "gitMaterialId", pipelineMaterial.GitMaterialId, "err", err) return nil, err } - if !gitMaterial.CheckoutStatus { - return nil, fmt.Errorf("checkout not succeed please checkout first %s", gitMaterial.Url) + if !referencedGitMaterial.CheckoutStatus { + return nil, fmt.Errorf("checkout not succeed please checkout first %s", referencedGitMaterial.Url) } - repoLock := impl.locker.LeaseLocker(gitMaterial.Id) + repoLock := impl.locker.LeaseLocker(referencedGitMaterial.Id) repoLock.Mutex.Lock() defer func() { repoLock.Mutex.Unlock() - impl.locker.ReturnLocker(gitMaterial.Id) + impl.locker.ReturnLocker(referencedGitMaterial.Id) }() - userName, password, err := git.GetUserNamePassword(gitMaterial.GitProvider) - updated, repo, err := impl.repositoryManager.Fetch(userName, password, gitMaterial.Url, gitMaterial.CheckoutLocation) + userName, password, err := git.GetUserNamePassword(referencedGitMaterial.GitProvider) + updated, repo, err := impl.repositoryManager.Fetch(userName, password, referencedGitMaterial.Url, referencedGitMaterial.CheckoutLocation) if err != nil { impl.logger.Errorw("error in fetching the repository ", "err", err) @@ -639,27 +927,27 @@ func (impl RepoManagerImpl) GetCommitMetadataForPipelineMaterial(pipelineMateria // fetch gitMaterial gitMaterialId := pipelineMaterial.GitMaterialId - gitMaterial, err := impl.materialRepository.FindById(gitMaterialId) + referencedGitMaterial, err := impl.materialRepository.FindReferencedGitMaterial(gitMaterialId) if err != nil { impl.logger.Errorw("error while fetching gitMaterial", "gitMaterialId", gitMaterialId, "err", err) return nil, err } // validate checkout status of gitMaterial - if !gitMaterial.CheckoutStatus { + if !referencedGitMaterial.CheckoutStatus { impl.logger.Errorw("checkout not success", "gitMaterialId", gitMaterialId) - return nil, fmt.Errorf("checkout not succeed please checkout first %s", gitMaterial.Url) + return nil, fmt.Errorf("checkout not succeed please checkout first %s", referencedGitMaterial.Url) } // lock-unlock - repoLock := impl.locker.LeaseLocker(gitMaterial.Id) + repoLock := impl.locker.LeaseLocker(referencedGitMaterial.Id) repoLock.Mutex.Lock() defer func() { repoLock.Mutex.Unlock() - impl.locker.ReturnLocker(gitMaterial.Id) + impl.locker.ReturnLocker(referencedGitMaterial.Id) }() - commits, err := impl.repositoryManager.ChangesSince(gitMaterial.CheckoutLocation, branchName, "", gitHash, 1) + commits, err := impl.repositoryManager.ChangesSince(referencedGitMaterial.CheckoutLocation, branchName, "", gitHash, 1) if err != nil { impl.logger.Errorw("error while fetching commit info", "pipelineMaterialId", pipelineMaterialId, "gitHash", gitHash, "err", err) return nil, err @@ -683,20 +971,20 @@ func (impl RepoManagerImpl) GetReleaseChanges(request *ReleaseChangesRequest) (* if err != nil { return nil, err } - gitMaterial, err := impl.materialRepository.FindById(pipelineMaterial.GitMaterialId) + referencedGitMaterial, err := impl.materialRepository.FindReferencedGitMaterial(pipelineMaterial.GitMaterialId) if err != nil { return nil, err } - if !gitMaterial.CheckoutStatus { - return nil, fmt.Errorf("checkout not succeed please checkout first %s", gitMaterial.Url) + if !referencedGitMaterial.CheckoutStatus { + return nil, fmt.Errorf("checkout not succeed please checkout first %s", referencedGitMaterial.Url) } - repoLock := impl.locker.LeaseLocker(gitMaterial.Id) + repoLock := impl.locker.LeaseLocker(referencedGitMaterial.Id) repoLock.Mutex.Lock() defer func() { repoLock.Mutex.Unlock() - impl.locker.ReturnLocker(gitMaterial.Id) + impl.locker.ReturnLocker(referencedGitMaterial.Id) }() - gitChanges, err := impl.repositoryManager.ChangesSinceByRepositoryForAnalytics(gitMaterial.CheckoutLocation, pipelineMaterial.Value, request.OldCommit, request.NewCommit) + gitChanges, err := impl.repositoryManager.ChangesSinceByRepositoryForAnalytics(referencedGitMaterial.CheckoutLocation, pipelineMaterial.Value, request.OldCommit, request.NewCommit) if err != nil { impl.logger.Errorw("error in computing changes", "req", request, "err", err) } else { @@ -713,11 +1001,21 @@ type ReleaseChangesRequest struct { } func (impl RepoManagerImpl) RefreshGitMaterial(req *git.RefreshGitMaterialRequest) (*git.RefreshGitMaterialResponse, error) { - material := &sql.GitMaterial{Id: req.GitMaterialId} res := &git.RefreshGitMaterialResponse{} + + referencedMaterial, err := impl.materialRepository.FindReferencedGitMaterial(req.GitMaterialId) + if err != nil { + impl.logger.Errorw("error while getting referenced git material", + "materialId", req.GitMaterialId, + "err", err) + + res.ErrorMsg = err.Error() + return res, err + } + //refresh repo. and notify all pipeline for changes //lock inside watcher itself - material, err := impl.gitWatcher.PollAndUpdateGitMaterial(material) + material, err := impl.gitWatcher.PollAndUpdateGitMaterial(referencedMaterial) if err != nil { res.ErrorMsg = err.Error() } else if material.LastFetchErrorCount > 0 { diff --git a/pkg/git/Watcher.go b/pkg/git/Watcher.go index 78062f2f..1a77db18 100644 --- a/pkg/git/Watcher.go +++ b/pkg/git/Watcher.go @@ -44,6 +44,7 @@ type GitWatcherImpl struct { type GitWatcher interface { PollAndUpdateGitMaterial(material *sql.GitMaterial) (*sql.GitMaterial, error) + PollGitMaterialAndNotify(material *sql.GitMaterial) error } type PollConfig struct { @@ -96,31 +97,80 @@ func NewGitWatcherImpl(repositoryManager RepositoryManager, func (impl GitWatcherImpl) StopCron() { impl.cron.Stop() } + +// Case -> 1. Unique repos (needs regular fetching) +// Case -> 2. Repo with same repo url, and has linked-ci pipeline referencing repo (does not need fetching) +// Case -> 3. Repo with same repo url but no linked-ci (needs fetching only in one repo with same url) + +// Proposed approach +// 1. Cron job added for polling the git repos +// 2. Get the list of unique referenced checkout locations and their git providers +// 3. Perform fetch +// 3.1 Fetch +// 3.1.1 Get credentials +// 3.1.2 Get ref checkout location +// 3.1.3 CLI util fetch +// 3.1.4 Update ci pipeline material for all pipelines having same ref checkout location +// 3.2 Update fetch time and fetch status for material which has same checkout location + +// (Earlier) Things getting updated in DB (For each git material) +// 1. Ci pipeline material -> Error +// 2. Ci pipeline material -> Error message +// 3. Ci pipeline material -> Last seen hash +// 4. Ci pipeline material -> Author +// 5. Ci pipeline material -> Date +// 6. Ci pipeline material -> Commit history +// 7. Git material -> Last fetch time +// 8. Git material -> Last Fetch status + +// Notify +// 1. Topic -> NEW-CI-MATERIAL, Payload -> ci pipeline material bean + +// New implementation func (impl GitWatcherImpl) Watch() { - impl.logger.Infow("starting git watch thread") - materials, err := impl.materialRepo.FindActive() + impl.logger.Infow("Starting git watcher thread") + + // Get the list of git materials which are referenced by other materials + refGitMaterials, err := impl.materialRepo.FindAllReferencedGitMaterials() if err != nil { - impl.logger.Error("error in fetching watchlist", "err", err) + impl.logger.Errorw("Error getting list of referenced git materials", + "err", err) return } - //impl.Publish(materials) - middleware.ActiveGitRepoCount.WithLabelValues().Set(float64(len(materials))) - impl.RunOnWorker(materials) - impl.logger.Infow("stop git watch thread") + impl.RunOnWorker(refGitMaterials) } -func (impl *GitWatcherImpl) RunOnWorker(materials []*sql.GitMaterial) { +func (impl GitWatcherImpl) RunOnWorker(materials []*sql.GitMaterial) { + + // initialized worker pool wp := workerpool.New(impl.pollConfig.PollWorker) + for _, material := range materials { - if len(material.CiPipelineMaterials) == 0 { - impl.logger.Infow("no ci pipeline, skipping", "id", material.Id, "url", material.Url) - continue + + nMaterial := &sql.GitMaterial{ + Id: material.Id, + GitProviderId: material.GitProviderId, + GitProvider: material.GitProvider, + Url: material.Url, + FetchSubmodules: material.FetchSubmodules, + Name: material.Name, + CheckoutLocation: material.CheckoutLocation, + CheckoutStatus: material.CheckoutStatus, + CheckoutMsgAny: material.CheckoutMsgAny, + Deleted: material.Deleted, + LastFetchTime: material.LastFetchTime, + LastFetchErrorCount: material.LastFetchErrorCount, + FetchErrorMessage: material.FetchErrorMessage, + RefGitMaterialId: material.RefGitMaterialId, + CiPipelineMaterials: material.CiPipelineMaterials, } - materialMsg := &sql.GitMaterial{Id: material.Id, Url: material.Url} + wp.Submit(func() { - _, err := impl.pollAndUpdateGitMaterial(materialMsg) + _, err := impl.PollAndUpdateGitMaterial(nMaterial) if err != nil { - impl.logger.Errorw("error in polling git material", "material", materialMsg, "err", err) + impl.logger.Errorw("error in polling git material", + "material", material, + "err", err) } }) } @@ -128,23 +178,17 @@ func (impl *GitWatcherImpl) RunOnWorker(materials []*sql.GitMaterial) { } func (impl GitWatcherImpl) PollAndUpdateGitMaterial(material *sql.GitMaterial) (*sql.GitMaterial, error) { - //tmp expose remove in future - return impl.pollAndUpdateGitMaterial(material) -} - -func (impl GitWatcherImpl) pollAndUpdateGitMaterial(materialReq *sql.GitMaterial) (*sql.GitMaterial, error) { - repoLock := impl.locker.LeaseLocker(materialReq.Id) + repoLock := impl.locker.LeaseLocker(material.Id) repoLock.Mutex.Lock() defer func() { repoLock.Mutex.Unlock() - impl.locker.ReturnLocker(materialReq.Id) + impl.locker.ReturnLocker(material.Id) }() - material, err := impl.materialRepo.FindById(materialReq.Id) - if err != nil { - impl.logger.Errorw("error in fetching material ", "material", materialReq, "err", err) - return nil, err - } - err = impl.pollGitMaterialAndNotify(material) + + // poll and notify + err := impl.PollGitMaterialAndNotify(material) + + // update last fetch status, counters and time material.LastFetchTime = time.Now() material.FetchStatus = err == nil if err != nil { @@ -154,6 +198,7 @@ func (impl GitWatcherImpl) pollAndUpdateGitMaterial(materialReq *sql.GitMaterial material.LastFetchErrorCount = 0 material.FetchErrorMessage = "" } + err = impl.materialRepo.Update(material) if err != nil { impl.logger.Errorw("error in updating fetch status", "material", material, "err", err) @@ -161,120 +206,348 @@ func (impl GitWatcherImpl) pollAndUpdateGitMaterial(materialReq *sql.GitMaterial return material, err } -func (impl GitWatcherImpl) pollGitMaterialAndNotify(material *sql.GitMaterial) error { - gitProvider := material.GitProvider - userName, password, err := GetUserNamePassword(gitProvider) +func (impl GitWatcherImpl) PollGitMaterialAndNotify(material *sql.GitMaterial) error { + + // get credentials and location + username, password, err := GetUserNamePassword(material.GitProvider) location, err := GetLocationForMaterial(material) if err != nil { - impl.logger.Errorw("error in determining location", "url", material.Url, "err", err) + impl.logger.Errorw("error in getting credentials/location", + "materialId", material.Id, + "authMode", material.GitProvider.AuthMode, + "url", material.Url, + "err", err) + return err } - updated, repo, err := impl.repositoryManager.Fetch(userName, password, material.Url, location) + + // fetch changes + updated, repo, err := impl.repositoryManager.Fetch(username, password, material.Url, location) if err != nil { - impl.logger.Errorw("error in fetching material details ", "repo", material.Url, "err", err) + impl.logger.Errorw("error in while fetching changes", + "repo", material.Url, + "err", err) + // there might be the case if ssh private key gets flush from disk, so creating and single retrying in this case - if gitProvider.AuthMode == sql.AUTH_MODE_SSH { - err = impl.repositoryManager.CreateSshFileIfNotExistsAndConfigureSshCommand(location, gitProvider.Id, gitProvider.SshPrivateKey) + if material.GitProvider.AuthMode != sql.AUTH_MODE_SSH { + return err + + } else { + err = impl.repositoryManager.CreateSshFileIfNotExistsAndConfigureSshCommand(location, + material.GitProvider.Id, + material.GitProvider.SshPrivateKey) + if err != nil { - impl.logger.Errorw("error in creating/configuring ssh private key on disk ", "repo", material.Url, "gitProviderId", gitProvider.Id, "err", err) + impl.logger.Errorw("error in creating/configuring ssh private key on disk ", + "repo", material.Url, + "gitProviderId", material.GitProvider.Id, + "err", err) + return err + + } + + impl.logger.Info("Retrying fetching", "repo", material.Url) + updated, repo, err = impl.repositoryManager.Fetch(username, password, material.Url, location) + if err != nil { + impl.logger.Errorw("error in fetching material details in retry", + "repo", material.Url, + "err", err) return err - } else { - impl.logger.Info("Retrying fetching for", "repo", material.Url) - updated, repo, err = impl.repositoryManager.Fetch(userName, password, material.Url, location) - if err != nil { - impl.logger.Errorw("error in fetching material details in retry", "repo", material.Url, "err", err) - return err - } } - } else { - return err } } + if !updated { + impl.logger.Infow("no new updates found", + "materialId", material.Id, + "refMaterialId", material.RefGitMaterialId, + "url", material.Url) return nil } - materials, err := impl.ciPipelineMaterialRepository.FindByGitMaterialId(material.Id) + + // get all ci pipeline ciMaterials (unique branch) for all git ciMaterials which reference this material + ciMaterials, err := impl.ciPipelineMaterialRepository.FindAllCiPipelineMaterialsReferencingGivenMaterial(material.Id) if err != nil { - impl.logger.Errorw("error in calculating head", "err", err, "url", material.Url) + impl.logger.Errorw("error while fetching ci materials", + "err", err, + "gitMaterialId", material.Id) return err } - var updatedMaterials []*CiPipelineMaterialBean - var updatedMaterialsModel []*sql.CiPipelineMaterial - var erroredMaterialsModels []*sql.CiPipelineMaterial - for _, material := range materials { - if material.Type != sql.SOURCE_TYPE_BRANCH_FIXED { - continue - } - commits, err := impl.repositoryManager.ChangesSinceByRepository(repo, material.Value, "", "", 15) + + for _, ciMaterial := range ciMaterials { + + // Get recent changes + commits, err := impl.repositoryManager.ChangesSinceByRepository(repo, ciMaterial.Value, "", "", 15) + if err != nil { - material.Errored = true - material.ErrorMsg = err.Error() - erroredMaterialsModels = append(erroredMaterialsModels, material) + erroredCiMaterial := &sql.CiPipelineMaterial{ + Errored: true, + ErrorMsg: err.Error(), + } + // Update errored ci material + err = impl.ciPipelineMaterialRepository.UpdateErroredCiPipelineMaterialsReferencingGivenGitMaterial(material.Id, + ciMaterial.Value, erroredCiMaterial) + + if err != nil { + impl.logger.Errorw("error while updating errored ci pipeline materials", + "err", err) + } + } else if len(commits) > 0 { latestCommit := commits[0] - if latestCommit.Commit != material.LastSeenHash { + if latestCommit.Commit != ciMaterial.LastSeenHash { //new commit found - mb := &CiPipelineMaterialBean{ - Id: material.Id, - Value: material.Value, - GitMaterialId: material.GitMaterialId, - Type: material.Type, - Active: material.Active, - GitCommit: latestCommit, + commitJson, _ := json.Marshal(commits) + toBeUpdatedCiMaterial := &sql.CiPipelineMaterial{ + LastSeenHash: latestCommit.Commit, + CommitAuthor: latestCommit.Author, + CommitDate: latestCommit.Date, + CommitHistory: string(commitJson), + Errored: false, + ErrorMsg: "", } - updatedMaterials = append(updatedMaterials, mb) + // Update in db + err := impl.ciPipelineMaterialRepository.UpdateCiPipelineMaterialsReferencingGivenGitMaterial(material.Id, + ciMaterial.Value, toBeUpdatedCiMaterial) - material.LastSeenHash = latestCommit.Commit - material.CommitAuthor = latestCommit.Author - material.CommitDate = latestCommit.Date - commitJson, _ := json.Marshal(commits) - material.CommitHistory = string(commitJson) - material.Errored = false - material.ErrorMsg = "" - updatedMaterialsModel = append(updatedMaterialsModel, material) + if err != nil { + impl.logger.Errorw("error while updating ci pipeline materials", + "err", err) + } + + // Notify only if new changes are detected + _ = impl.NotifyForMaterialUpdate(material.Url, ciMaterial.Value, latestCommit) } middleware.GitMaterialUpdateCounter.WithLabelValues().Inc() } } - if len(updatedMaterialsModel) > 0 { - err = impl.NotifyForMaterialUpdate(updatedMaterials) - if err != nil { - impl.logger.Errorw("error in sending notification for materials", "url", material.Url, "update", updatedMaterialsModel) - } - err = impl.ciPipelineMaterialRepository.Update(updatedMaterialsModel) - if err != nil { - impl.logger.Errorw("error in update db ", "url", material.Url, "update", updatedMaterialsModel) - impl.logger.Errorw("error in sending notification for materials", "url", material.Url, "update", updatedMaterialsModel) - } - } - if len(erroredMaterialsModels) > 0 { - err = impl.ciPipelineMaterialRepository.Update(updatedMaterialsModel) - if err != nil { - impl.logger.Errorw("error in update db ", "url", material.Url, "update", updatedMaterialsModel) - impl.logger.Errorw("error in sending notification for materials", "url", material.Url, "update", updatedMaterialsModel) - } - } return nil } -func (impl GitWatcherImpl) NotifyForMaterialUpdate(materials []*CiPipelineMaterialBean) error { +func (impl GitWatcherImpl) NotifyForMaterialUpdate(repoUrl string, branch string, commit *GitCommit) error { - impl.logger.Warnw("material notification", "materials", materials) - for _, material := range materials { - mb, err := json.Marshal(material) - if err != nil { - impl.logger.Error("err in json marshaling", "err", err) - continue + ciMaterials, err := impl.ciPipelineMaterialRepository.FindSimilarCiPipelineMaterials(repoUrl, branch) + if err != nil { + impl.logger.Errorw("error while fetching similar ci pipeline materials", + "repoUrl", repoUrl, + "branch", branch, + "err", err) + return err + } + + for _, ciMaterial := range ciMaterials { + + event := &CiPipelineMaterialBean{ + Id: ciMaterial.Id, + GitMaterialId: ciMaterial.GitMaterialId, + Active: ciMaterial.Active, + GitCommit: commit, + Type: ciMaterial.Type, + Value: ciMaterial.Value, } - err = impl.pubSubClient.Publish(pubsub.NEW_CI_MATERIAL_TOPIC, string(mb)) + + payload, err := json.Marshal(event) if err != nil { - impl.logger.Errorw("error in publishing material modification msg ", "material", material) + impl.logger.Error("err in json marshaling", + "event", event, + "err", err) + + return err } + err = impl.pubSubClient.Publish(pubsub.NEW_CI_MATERIAL_TOPIC, string(payload)) + if err != nil { + impl.logger.Errorw("error in publishing material modification msg ", + "payload", payload, + "err", err) + + return err + } } return nil } +// Old implementations + +//func (impl GitWatcherImpl) Watch() { +// impl.logger.Infow("starting git watch thread") +// materials, err := impl.materialRepo.FindActive() +// if err != nil { +// impl.logger.Error("error in fetching watchlist", "err", err) +// return +// } +// //impl.Publish(materials) +// middleware.ActiveGitRepoCount.WithLabelValues().Set(float64(len(materials))) +// impl.RunOnWorker(materials) +// impl.logger.Infow("stop git watch thread") +//} + +//func (impl *GitWatcherImpl) RunOnWorker(materials []*sql.GitMaterial) { +// wp := workerpool.New(impl.pollConfig.PollWorker) +// for _, material := range materials { +// if len(material.CiPipelineMaterials) == 0 { +// impl.logger.Infow("no ci pipeline, skipping", "id", material.Id, "url", material.Url) +// continue +// } +// materialMsg := &sql.GitMaterial{Id: material.Id, Url: material.Url} +// wp.Submit(func() { +// _, err := impl.pollAndUpdateGitMaterial(materialMsg) +// if err != nil { +// impl.logger.Errorw("error in polling git material", "material", materialMsg, "err", err) +// } +// }) +// } +// wp.StopWait() +//} + +//func (impl GitWatcherImpl) PollAndUpdateGitMaterial(material *sql.GitMaterial) (*sql.GitMaterial, error) { +// //tmp expose remove in future +// return impl.pollAndUpdateGitMaterial(material) +//} + +//func (impl GitWatcherImpl) pollAndUpdateGitMaterial(materialReq *sql.GitMaterial) (*sql.GitMaterial, error) { +// repoLock := impl.locker.LeaseLocker(materialReq.Id) +// repoLock.Mutex.Lock() +// defer func() { +// repoLock.Mutex.Unlock() +// impl.locker.ReturnLocker(materialReq.Id) +// }() +// material, err := impl.materialRepo.FindById(materialReq.Id) +// if err != nil { +// impl.logger.Errorw("error in fetching material ", "material", materialReq, "err", err) +// return nil, err +// } +// err = impl.pollGitMaterialAndNotify(material) +// material.LastFetchTime = time.Now() +// material.FetchStatus = err == nil +// if err != nil { +// material.LastFetchErrorCount = material.LastFetchErrorCount + 1 +// material.FetchErrorMessage = err.Error() +// } else { +// material.LastFetchErrorCount = 0 +// material.FetchErrorMessage = "" +// } +// err = impl.materialRepo.Update(material) +// if err != nil { +// impl.logger.Errorw("error in updating fetch status", "material", material, "err", err) +// } +// return material, err +//} +// +//func (impl GitWatcherImpl) pollGitMaterialAndNotify(material *sql.GitMaterial) error { +// gitProvider := material.GitProvider +// userName, password, err := GetUserNamePassword(gitProvider) +// location, err := GetLocationForMaterial(material) +// if err != nil { +// impl.logger.Errorw("error in determining location", "url", material.Url, "err", err) +// return err +// } +// updated, repo, err := impl.repositoryManager.Fetch(userName, password, material.Url, location) +// if err != nil { +// impl.logger.Errorw("error in fetching material details ", "repo", material.Url, "err", err) +// // there might be the case if ssh private key gets flush from disk, so creating and single retrying in this case +// if gitProvider.AuthMode == sql.AUTH_MODE_SSH { +// err = impl.repositoryManager.CreateSshFileIfNotExistsAndConfigureSshCommand(location, gitProvider.Id, gitProvider.SshPrivateKey) +// if err != nil { +// impl.logger.Errorw("error in creating/configuring ssh private key on disk ", "repo", material.Url, "gitProviderId", gitProvider.Id, "err", err) +// return err +// } else { +// impl.logger.Info("Retrying fetching for", "repo", material.Url) +// updated, repo, err = impl.repositoryManager.Fetch(userName, password, material.Url, location) +// if err != nil { +// impl.logger.Errorw("error in fetching material details in retry", "repo", material.Url, "err", err) +// return err +// } +// } +// } else { +// return err +// } +// } +// if !updated { +// return nil +// } +// materials, err := impl.ciPipelineMaterialRepository.FindByGitMaterialId(material.Id) +// if err != nil { +// impl.logger.Errorw("error in calculating head", "err", err, "url", material.Url) +// return err +// } +// var updatedMaterials []*CiPipelineMaterialBean +// var updatedMaterialsModel []*sql.CiPipelineMaterial +// var erroredMaterialsModels []*sql.CiPipelineMaterial +// for _, material := range materials { +// if material.Type != sql.SOURCE_TYPE_BRANCH_FIXED { +// continue +// } +// commits, err := impl.repositoryManager.ChangesSinceByRepository(repo, material.Value, "", "", 15) +// if err != nil { +// material.Errored = true +// material.ErrorMsg = err.Error() +// erroredMaterialsModels = append(erroredMaterialsModels, material) +// } else if len(commits) > 0 { +// latestCommit := commits[0] +// if latestCommit.Commit != material.LastSeenHash { +// //new commit found +// mb := &CiPipelineMaterialBean{ +// Id: material.Id, +// Value: material.Value, +// GitMaterialId: material.GitMaterialId, +// Type: material.Type, +// Active: material.Active, +// GitCommit: latestCommit, +// } +// updatedMaterials = append(updatedMaterials, mb) +// +// material.LastSeenHash = latestCommit.Commit +// material.CommitAuthor = latestCommit.Author +// material.CommitDate = latestCommit.Date +// commitJson, _ := json.Marshal(commits) +// material.CommitHistory = string(commitJson) +// material.Errored = false +// material.ErrorMsg = "" +// updatedMaterialsModel = append(updatedMaterialsModel, material) +// } +// middleware.GitMaterialUpdateCounter.WithLabelValues().Inc() +// } +// } +// if len(updatedMaterialsModel) > 0 { +// err = impl.NotifyForMaterialUpdate(updatedMaterials) +// if err != nil { +// impl.logger.Errorw("error in sending notification for materials", "url", material.Url, "update", updatedMaterialsModel) +// } +// err = impl.ciPipelineMaterialRepository.Update(updatedMaterialsModel) +// if err != nil { +// impl.logger.Errorw("error in update db ", "url", material.Url, "update", updatedMaterialsModel) +// impl.logger.Errorw("error in sending notification for materials", "url", material.Url, "update", updatedMaterialsModel) +// } +// } +// if len(erroredMaterialsModels) > 0 { +// err = impl.ciPipelineMaterialRepository.Update(updatedMaterialsModel) +// if err != nil { +// impl.logger.Errorw("error in update db ", "url", material.Url, "update", updatedMaterialsModel) +// impl.logger.Errorw("error in sending notification for materials", "url", material.Url, "update", updatedMaterialsModel) +// } +// } +// return nil +//} + +//func (impl GitWatcherImpl) NotifyForMaterialUpdate(materials []*CiPipelineMaterialBean) error { +// +// impl.logger.Warnw("material notification", "materials", materials) +// for _, material := range materials { +// mb, err := json.Marshal(material) +// if err != nil { +// impl.logger.Error("err in json marshaling", "err", err) +// continue +// } +// err = impl.pubSubClient.Publish(pubsub.NEW_CI_MATERIAL_TOPIC, string(mb)) +// if err != nil { +// impl.logger.Errorw("error in publishing material modification msg ", "material", material) +// } +// +// } +// return nil +//} + func (impl GitWatcherImpl) SubscribeWebhookEvent() error { callback := func(msg *pubsub.PubSubMsg) { impl.logger.Debugw("received msg", "msg", msg) diff --git a/scripts/sql/10_git_material_ref.down.sql b/scripts/sql/10_git_material_ref.down.sql new file mode 100644 index 00000000..63da70e5 --- /dev/null +++ b/scripts/sql/10_git_material_ref.down.sql @@ -0,0 +1,3 @@ +---- drop ref_git_material_id column +ALTER TABLE git_material DROP COLUMN ref_git_material_id; + diff --git a/scripts/sql/10_git_material_ref.up.sql b/scripts/sql/10_git_material_ref.up.sql new file mode 100644 index 00000000..7923b931 --- /dev/null +++ b/scripts/sql/10_git_material_ref.up.sql @@ -0,0 +1,17 @@ +---- add ref_git_material_id column +ALTER TABLE git_material ADD COLUMN IF NOT EXISTS ref_git_material_id INTEGER; + +DO $$ +DECLARE + material_id integer; + repo_url varchar; + xnum integer; +BEGIN + FOR material_id, repo_url, xnum IN SELECT * FROM ( + SELECT id, url, ROW_NUMBER() OVER (PARTITION BY url ORDER BY id ASC) AS num FROM git_material WHERE deleted = false + ) materials where num <= 1 + LOOP + UPDATE git_material SET ref_git_material_id = material_id WHERE url = repo_url; + RAISE NOTICE 'updated ref_git_material_id for url: % to id: %', repo_url, material_id; + END LOOP; +END$$;