From 7875d0f77bd83b1d2d9d426f3650ffb37bdb0e59 Mon Sep 17 00:00:00 2001 From: Vishal Bihani Date: Tue, 11 Apr 2023 18:47:44 +0530 Subject: [PATCH 01/24] add git material ref --- internal/sql/CiPipelineMaterial.go | 50 +++- internal/sql/GitMaterial.go | 23 +- pkg/git/Bean.go | 7 + pkg/git/Watcher.go | 449 ++++++++++++++++++++++------- 4 files changed, 413 insertions(+), 116 deletions(-) diff --git a/internal/sql/CiPipelineMaterial.go b/internal/sql/CiPipelineMaterial.go index b2634e38..7e11314a 100644 --- a/internal/sql/CiPipelineMaterial.go +++ b/internal/sql/CiPipelineMaterial.go @@ -32,12 +32,11 @@ type CiPipelineMaterial struct { LastSeenHash string `sql:"last_seen_hash,notnull"` CommitAuthor string `sql:"commit_author"` CommitDate time.Time `sql:"commit_date"` - - CommitHistory string `sql:"commit_history"` //last five commit for caching purpose1 - Errored bool `sql:"errored,notnull"` - ErrorMsg string `sql:"error_msg,notnull"` -} + CommitHistory string `sql:"commit_history"` //last five commit for caching purpose1 + Errored bool `sql:"errored,notnull"` + ErrorMsg string `sql:"error_msg,notnull"` +} type CiPipelineMaterialRepository interface { FindByGitMaterialId(gitMaterialId int) ([]*CiPipelineMaterial, error) @@ -46,6 +45,9 @@ type CiPipelineMaterialRepository interface { FindById(id int) (*CiPipelineMaterial, error) Exists(id int) (bool, error) Save(material []*CiPipelineMaterial) ([]*CiPipelineMaterial, error) + FindAllCiPipelineMaterialsReferencingGivenMaterial(gitMaterialId int) ([]*CiPipelineMaterial, error) + UpdateErroredCiPipelineMaterialsReferencingGivenGitMaterial(gitMaterialId int, branch string, material *CiPipelineMaterial) error + UpdateCiPipelineMaterialsReferencingGivenGitMaterial(gitMaterialId int, branch string, material *CiPipelineMaterial) error } type CiPipelineMaterialRepositoryImpl struct { @@ -104,3 +106,41 @@ func (impl CiPipelineMaterialRepositoryImpl) FindById(id int) (*CiPipelineMateri Where("active = ?", true).Select() return materials, err } + +func (impl CiPipelineMaterialRepositoryImpl) FindAllCiPipelineMaterialsReferencingGivenMaterial(gitMaterialId int) ([]*CiPipelineMaterial, error) { + ciPipelineMaterials := make([]*CiPipelineMaterial, 0) + err := impl.dbConnection.Model(&ciPipelineMaterials). + ColumnExpr("DISTINCT ci_pipeline_material.value"). + Join("INNER JOIN git_material gm ON gm.id = ci_pipeline_material.git_material_id"). + Where("gm.ref_git_material_id = ?", gitMaterialId). + Where("gm.deleted = false"). + Where("ci_pipeline_material.active = true"). + Where("ci_pipeline_material.type = SOURCE_TYPE_BRANCH_FIXED"). + Select() + + return ciPipelineMaterials, err +} + +func (impl CiPipelineMaterialRepositoryImpl) UpdateErroredCiPipelineMaterialsReferencingGivenGitMaterial(gitMaterialId int, branch string, material *CiPipelineMaterial) error { + + _, err := impl.dbConnection.Model(material). + Column("errored", "error_msg"). + Join("INNER JOIN git_material gm ON gm.id = ci_pipeline_material.git_material_id"). + Where("gm.ref_git_material_id = ?", gitMaterialId). + Where("ci_pipeline_material.value = ?", branch). + Update() + + return err +} + +func (impl CiPipelineMaterialRepositoryImpl) UpdateCiPipelineMaterialsReferencingGivenGitMaterial(gitMaterialId int, branch string, material *CiPipelineMaterial) error { + + _, err := impl.dbConnection.Model(material). + Column("last_seen_hash", "commit_author", "commit_date", "commit_history", "errored", "error_msg"). + Join("INNER JOIN git_material gm ON gm.id = ci_pipeline_material.git_material_id"). + Where("gm.ref_git_material_id = ?", gitMaterialId). + Where("ci_pipeline_material.value = ?", branch). + Update() + + return err +} diff --git a/internal/sql/GitMaterial.go b/internal/sql/GitMaterial.go index b2d1a355..7fb68d03 100644 --- a/internal/sql/GitMaterial.go +++ b/internal/sql/GitMaterial.go @@ -31,7 +31,7 @@ const ( SOURCE_TYPE_WEBHOOK SourceType = "WEBHOOK" ) -//TODO: add support for submodule +// TODO: add support for submodule type GitMaterial struct { tableName struct{} `sql:"git_material"` Id int `sql:"id,pk"` @@ -48,6 +48,7 @@ type GitMaterial struct { FetchStatus bool `json:"fetch_status"` LastFetchErrorCount int `json:"last_fetch_error_count"` //continues fetch error FetchErrorMessage string `json:"fetch_error_message"` + RefGitMaterialId int `sql:"ref_git_material_id"` GitProvider *GitProvider CiPipelineMaterials []*CiPipelineMaterial } @@ -59,6 +60,7 @@ type MaterialRepository interface { FindActive() ([]*GitMaterial, error) FindAll() ([]*GitMaterial, error) FindAllActiveByUrls(urls []string) ([]*GitMaterial, error) + FindReferencedGitMaterial() ([]*GitMaterial, error) } type MaterialRepositoryImpl struct { dbConnection *pg.DB @@ -81,7 +83,7 @@ func (repo MaterialRepositoryImpl) Update(material *GitMaterial) error { func (repo MaterialRepositoryImpl) FindActive() ([]*GitMaterial, error) { var materials []*GitMaterial err := repo.dbConnection.Model(&materials). - Column("git_material.*", "GitProvider", ). + Column("git_material.*", "GitProvider"). Relation("CiPipelineMaterials", func(q *orm.Query) (*orm.Query, error) { return q.Where("active IS TRUE"), nil }). @@ -111,7 +113,7 @@ func (repo MaterialRepositoryImpl) FindById(id int) (*GitMaterial, error) { return &material, err } -func (repo MaterialRepositoryImpl) FindAllActiveByUrls(urls[] string) ([]*GitMaterial, error) { +func (repo MaterialRepositoryImpl) FindAllActiveByUrls(urls []string) ([]*GitMaterial, error) { var materials []*GitMaterial err := repo.dbConnection.Model(&materials). Relation("CiPipelineMaterials", func(q *orm.Query) (*orm.Query, error) { @@ -122,3 +124,18 @@ func (repo MaterialRepositoryImpl) FindAllActiveByUrls(urls[] string) ([]*GitMat Select() return materials, err } + +func (repo MaterialRepositoryImpl) FindReferencedGitMaterial() ([]*GitMaterial, error) { + + var materials []*GitMaterial + err := repo.dbConnection.Model(&materials). + Column("gm.*", "gm.GitProvider"). + ColumnExpr("DISTINCT gm.id"). + Join("INNER JOIN git_material gm ON git_material.ref_git_material_id = gm.id"). + Where("git_material.deleted = ? ", false). + Where("git_material.checkout_status = ? ", true). + Order("gm.id ASC"). + Select() + + return materials, err +} diff --git a/pkg/git/Bean.go b/pkg/git/Bean.go index 7fe32716..dd23cc09 100644 --- a/pkg/git/Bean.go +++ b/pkg/git/Bean.go @@ -43,6 +43,13 @@ type CiPipelineMaterialBean struct { ExtraEnvironmentVariables map[string]string // extra env variables which will be used for CI } +type CiPipelineMaterialUpdateEvent struct { + GitRepoUrl string `json:"gitRepoUrl"` + Value string `json:"value"` + GitCommit *GitCommit `json:"gitCommit"` + ExtraEnvironmentVariables map[string]string `json:"extraEnvironmentVariables,omitempty"` +} + type MaterialChangeResp struct { Commits []*GitCommit `json:"commits"` LastFetchTime time.Time `json:"lastFetchTime"` diff --git a/pkg/git/Watcher.go b/pkg/git/Watcher.go index 78062f2f..a241df24 100644 --- a/pkg/git/Watcher.go +++ b/pkg/git/Watcher.go @@ -96,55 +96,79 @@ func NewGitWatcherImpl(repositoryManager RepositoryManager, func (impl GitWatcherImpl) StopCron() { impl.cron.Stop() } + +// Case -> 1. Unique repos (needs regular fetching) +// Case -> 2. Repo with same repo url, and has linked-ci pipeline referencing repo (does not need fetching) +// Case -> 3. Repo with same repo url but no linked-ci (needs fetching only in one repo with same url) + +// Proposed approach +// 1. Cron job added for polling the git repos +// 2. Get the list of unique referenced checkout locations and their git providers +// 3. Perform fetch +// 3.1 Fetch +// 3.1.1 Get credentials +// 3.1.2 Get ref checkout location +// 3.1.3 CLI util fetch +// 3.1.4 Update ci pipeline material for all pipelines having same ref checkout location +// 3.2 Update fetch time and fetch status for material which has same checkout location + +// (Earlier) Things getting updated in DB (For each git material) +// 1. Ci pipeline material -> Error +// 2. Ci pipeline material -> Error message +// 3. Ci pipeline material -> Last seen hash +// 4. Ci pipeline material -> Author +// 5. Ci pipeline material -> Date +// 6. Ci pipeline material -> Commit history +// 7. Git material -> Last fetch time +// 8. Git material -> Last Fetch status + +// Notify +// 1. Topic -> NEW-CI-MATERIAL, Payload -> ci pipeline material bean + +// New implementation func (impl GitWatcherImpl) Watch() { - impl.logger.Infow("starting git watch thread") - materials, err := impl.materialRepo.FindActive() + impl.logger.Infow("Starting git watcher thread") + + // Get the list of git materials which are referenced by other materials + refGitMaterials, err := impl.materialRepo.FindReferencedGitMaterial() if err != nil { - impl.logger.Error("error in fetching watchlist", "err", err) + impl.logger.Errorw("Error getting list of referenced git materials", + "err", err) return } - //impl.Publish(materials) - middleware.ActiveGitRepoCount.WithLabelValues().Set(float64(len(materials))) - impl.RunOnWorker(materials) - impl.logger.Infow("stop git watch thread") + impl.RunOnWorker(refGitMaterials) } -func (impl *GitWatcherImpl) RunOnWorker(materials []*sql.GitMaterial) { +func (impl GitWatcherImpl) RunOnWorker(materials []*sql.GitMaterial) { + + // initialized worker pool wp := workerpool.New(impl.pollConfig.PollWorker) + for _, material := range materials { - if len(material.CiPipelineMaterials) == 0 { - impl.logger.Infow("no ci pipeline, skipping", "id", material.Id, "url", material.Url) - continue - } - materialMsg := &sql.GitMaterial{Id: material.Id, Url: material.Url} wp.Submit(func() { - _, err := impl.pollAndUpdateGitMaterial(materialMsg) + err := impl.PollAndUpdateGitMaterial(material) if err != nil { - impl.logger.Errorw("error in polling git material", "material", materialMsg, "err", err) + impl.logger.Errorw("error in polling git material", + "material", material, + "err", err) } }) } wp.StopWait() } -func (impl GitWatcherImpl) PollAndUpdateGitMaterial(material *sql.GitMaterial) (*sql.GitMaterial, error) { - //tmp expose remove in future - return impl.pollAndUpdateGitMaterial(material) -} - -func (impl GitWatcherImpl) pollAndUpdateGitMaterial(materialReq *sql.GitMaterial) (*sql.GitMaterial, error) { - repoLock := impl.locker.LeaseLocker(materialReq.Id) +func (impl GitWatcherImpl) PollAndUpdateGitMaterial(material *sql.GitMaterial) error { + repoLock := impl.locker.LeaseLocker(material.Id) repoLock.Mutex.Lock() defer func() { repoLock.Mutex.Unlock() - impl.locker.ReturnLocker(materialReq.Id) + impl.locker.ReturnLocker(material.Id) }() - material, err := impl.materialRepo.FindById(materialReq.Id) - if err != nil { - impl.logger.Errorw("error in fetching material ", "material", materialReq, "err", err) - return nil, err - } - err = impl.pollGitMaterialAndNotify(material) + + // poll and notify + err := impl.PollGitMaterialAndNotify(material) + + // update last fetch status, counters and time material.LastFetchTime = time.Now() material.FetchStatus = err == nil if err != nil { @@ -154,127 +178,336 @@ func (impl GitWatcherImpl) pollAndUpdateGitMaterial(materialReq *sql.GitMaterial material.LastFetchErrorCount = 0 material.FetchErrorMessage = "" } + err = impl.materialRepo.Update(material) if err != nil { impl.logger.Errorw("error in updating fetch status", "material", material, "err", err) } - return material, err + return err } -func (impl GitWatcherImpl) pollGitMaterialAndNotify(material *sql.GitMaterial) error { - gitProvider := material.GitProvider - userName, password, err := GetUserNamePassword(gitProvider) +func (impl GitWatcherImpl) PollGitMaterialAndNotify(material *sql.GitMaterial) error { + + // get credentials and location + username, password, err := GetUserNamePassword(material.GitProvider) location, err := GetLocationForMaterial(material) if err != nil { - impl.logger.Errorw("error in determining location", "url", material.Url, "err", err) + impl.logger.Errorw("error in getting credentials/location", + "materialId", material.Id, + "authMode", material.GitProvider.AuthMode, + "url", material.Url, + "err", err) + return err } - updated, repo, err := impl.repositoryManager.Fetch(userName, password, material.Url, location) + + // fetch changes + updated, repo, err := impl.repositoryManager.Fetch(username, password, material.Url, location) if err != nil { - impl.logger.Errorw("error in fetching material details ", "repo", material.Url, "err", err) + impl.logger.Errorw("error in while fetching changes", + "repo", material.Url, + "err", err) + // there might be the case if ssh private key gets flush from disk, so creating and single retrying in this case - if gitProvider.AuthMode == sql.AUTH_MODE_SSH { - err = impl.repositoryManager.CreateSshFileIfNotExistsAndConfigureSshCommand(location, gitProvider.Id, gitProvider.SshPrivateKey) + if material.GitProvider.AuthMode != sql.AUTH_MODE_SSH { + return err + + } else { + err = impl.repositoryManager.CreateSshFileIfNotExistsAndConfigureSshCommand(location, + material.GitProvider.Id, + material.GitProvider.SshPrivateKey) + if err != nil { - impl.logger.Errorw("error in creating/configuring ssh private key on disk ", "repo", material.Url, "gitProviderId", gitProvider.Id, "err", err) + impl.logger.Errorw("error in creating/configuring ssh private key on disk ", + "repo", material.Url, + "gitProviderId", material.GitProvider.Id, + "err", err) + return err + + } + + impl.logger.Info("Retrying fetching", "repo", material.Url) + updated, repo, err = impl.repositoryManager.Fetch(username, password, material.Url, location) + if err != nil { + impl.logger.Errorw("error in fetching material details in retry", + "repo", material.Url, + "err", err) return err - } else { - impl.logger.Info("Retrying fetching for", "repo", material.Url) - updated, repo, err = impl.repositoryManager.Fetch(userName, password, material.Url, location) - if err != nil { - impl.logger.Errorw("error in fetching material details in retry", "repo", material.Url, "err", err) - return err - } } - } else { - return err } } + if !updated { return nil } - materials, err := impl.ciPipelineMaterialRepository.FindByGitMaterialId(material.Id) + + // get all ci pipeline ciMaterials (unique branch) for all git ciMaterials which reference this material + ciMaterials, err := impl.ciPipelineMaterialRepository.FindAllCiPipelineMaterialsReferencingGivenMaterial(material.Id) if err != nil { - impl.logger.Errorw("error in calculating head", "err", err, "url", material.Url) + impl.logger.Errorw("error while fetching ci materials", + "err", err, + "gitMaterialId", material.Id) return err } - var updatedMaterials []*CiPipelineMaterialBean - var updatedMaterialsModel []*sql.CiPipelineMaterial - var erroredMaterialsModels []*sql.CiPipelineMaterial - for _, material := range materials { - if material.Type != sql.SOURCE_TYPE_BRANCH_FIXED { - continue - } - commits, err := impl.repositoryManager.ChangesSinceByRepository(repo, material.Value, "", "", 15) + + for _, ciMaterial := range ciMaterials { + + // Get recent changes + commits, err := impl.repositoryManager.ChangesSinceByRepository(repo, ciMaterial.Value, "", "", 15) + if err != nil { - material.Errored = true - material.ErrorMsg = err.Error() - erroredMaterialsModels = append(erroredMaterialsModels, material) + erroredCiMaterial := &sql.CiPipelineMaterial{ + Errored: true, + ErrorMsg: err.Error(), + } + // Update errored ci material + err = impl.ciPipelineMaterialRepository.UpdateErroredCiPipelineMaterialsReferencingGivenGitMaterial(material.Id, + ciMaterial.Value, erroredCiMaterial) + + if err != nil { + impl.logger.Errorw("error while updating errored ci pipeline materials", + "err", err) + } + } else if len(commits) > 0 { latestCommit := commits[0] - if latestCommit.Commit != material.LastSeenHash { + if latestCommit.Commit != ciMaterial.LastSeenHash { //new commit found - mb := &CiPipelineMaterialBean{ - Id: material.Id, - Value: material.Value, - GitMaterialId: material.GitMaterialId, - Type: material.Type, - Active: material.Active, - GitCommit: latestCommit, + commitJson, _ := json.Marshal(commits) + toBeUpdatedCiMaterial := &sql.CiPipelineMaterial{ + LastSeenHash: latestCommit.Commit, + CommitAuthor: latestCommit.Author, + CommitDate: latestCommit.Date, + CommitHistory: string(commitJson), + Errored: false, + ErrorMsg: "", } - updatedMaterials = append(updatedMaterials, mb) + // Update in db + err := impl.ciPipelineMaterialRepository.UpdateCiPipelineMaterialsReferencingGivenGitMaterial(material.Id, + ciMaterial.Value, toBeUpdatedCiMaterial) - material.LastSeenHash = latestCommit.Commit - material.CommitAuthor = latestCommit.Author - material.CommitDate = latestCommit.Date - commitJson, _ := json.Marshal(commits) - material.CommitHistory = string(commitJson) - material.Errored = false - material.ErrorMsg = "" - updatedMaterialsModel = append(updatedMaterialsModel, material) + if err != nil { + impl.logger.Errorw("error while updating ci pipeline materials", + "err", err) + } } + + // Notify + _ = impl.NotifyForMaterialUpdate(&CiPipelineMaterialUpdateEvent{ + GitRepoUrl: material.Url, + Value: ciMaterial.Value, + GitCommit: latestCommit, + }) + middleware.GitMaterialUpdateCounter.WithLabelValues().Inc() } } - if len(updatedMaterialsModel) > 0 { - err = impl.NotifyForMaterialUpdate(updatedMaterials) - if err != nil { - impl.logger.Errorw("error in sending notification for materials", "url", material.Url, "update", updatedMaterialsModel) - } - err = impl.ciPipelineMaterialRepository.Update(updatedMaterialsModel) - if err != nil { - impl.logger.Errorw("error in update db ", "url", material.Url, "update", updatedMaterialsModel) - impl.logger.Errorw("error in sending notification for materials", "url", material.Url, "update", updatedMaterialsModel) - } - } - if len(erroredMaterialsModels) > 0 { - err = impl.ciPipelineMaterialRepository.Update(updatedMaterialsModel) - if err != nil { - impl.logger.Errorw("error in update db ", "url", material.Url, "update", updatedMaterialsModel) - impl.logger.Errorw("error in sending notification for materials", "url", material.Url, "update", updatedMaterialsModel) - } - } return nil } -func (impl GitWatcherImpl) NotifyForMaterialUpdate(materials []*CiPipelineMaterialBean) error { +func (impl GitWatcherImpl) NotifyForMaterialUpdate(event *CiPipelineMaterialUpdateEvent) error { - impl.logger.Warnw("material notification", "materials", materials) - for _, material := range materials { - mb, err := json.Marshal(material) - if err != nil { - impl.logger.Error("err in json marshaling", "err", err) - continue - } - err = impl.pubSubClient.Publish(pubsub.NEW_CI_MATERIAL_TOPIC, string(mb)) - if err != nil { - impl.logger.Errorw("error in publishing material modification msg ", "material", material) - } + payload, err := json.Marshal(event) + if err != nil { + impl.logger.Error("err in json marshaling", + "event", event, + "err", err) + return err + } + + err = impl.pubSubClient.Publish(pubsub.NEW_CI_MATERIAL_TOPIC, string(payload)) + if err != nil { + impl.logger.Errorw("error in publishing material modification msg ", + "payload", payload, + "err", err) + + return err } return nil } +// Old implementations + +//func (impl GitWatcherImpl) Watch() { +// impl.logger.Infow("starting git watch thread") +// materials, err := impl.materialRepo.FindActive() +// if err != nil { +// impl.logger.Error("error in fetching watchlist", "err", err) +// return +// } +// //impl.Publish(materials) +// middleware.ActiveGitRepoCount.WithLabelValues().Set(float64(len(materials))) +// impl.RunOnWorker(materials) +// impl.logger.Infow("stop git watch thread") +//} + +//func (impl *GitWatcherImpl) RunOnWorker(materials []*sql.GitMaterial) { +// wp := workerpool.New(impl.pollConfig.PollWorker) +// for _, material := range materials { +// if len(material.CiPipelineMaterials) == 0 { +// impl.logger.Infow("no ci pipeline, skipping", "id", material.Id, "url", material.Url) +// continue +// } +// materialMsg := &sql.GitMaterial{Id: material.Id, Url: material.Url} +// wp.Submit(func() { +// _, err := impl.pollAndUpdateGitMaterial(materialMsg) +// if err != nil { +// impl.logger.Errorw("error in polling git material", "material", materialMsg, "err", err) +// } +// }) +// } +// wp.StopWait() +//} + +//func (impl GitWatcherImpl) PollAndUpdateGitMaterial(material *sql.GitMaterial) (*sql.GitMaterial, error) { +// //tmp expose remove in future +// return impl.pollAndUpdateGitMaterial(material) +//} + +//func (impl GitWatcherImpl) pollAndUpdateGitMaterial(materialReq *sql.GitMaterial) (*sql.GitMaterial, error) { +// repoLock := impl.locker.LeaseLocker(materialReq.Id) +// repoLock.Mutex.Lock() +// defer func() { +// repoLock.Mutex.Unlock() +// impl.locker.ReturnLocker(materialReq.Id) +// }() +// material, err := impl.materialRepo.FindById(materialReq.Id) +// if err != nil { +// impl.logger.Errorw("error in fetching material ", "material", materialReq, "err", err) +// return nil, err +// } +// err = impl.pollGitMaterialAndNotify(material) +// material.LastFetchTime = time.Now() +// material.FetchStatus = err == nil +// if err != nil { +// material.LastFetchErrorCount = material.LastFetchErrorCount + 1 +// material.FetchErrorMessage = err.Error() +// } else { +// material.LastFetchErrorCount = 0 +// material.FetchErrorMessage = "" +// } +// err = impl.materialRepo.Update(material) +// if err != nil { +// impl.logger.Errorw("error in updating fetch status", "material", material, "err", err) +// } +// return material, err +//} +// +//func (impl GitWatcherImpl) pollGitMaterialAndNotify(material *sql.GitMaterial) error { +// gitProvider := material.GitProvider +// userName, password, err := GetUserNamePassword(gitProvider) +// location, err := GetLocationForMaterial(material) +// if err != nil { +// impl.logger.Errorw("error in determining location", "url", material.Url, "err", err) +// return err +// } +// updated, repo, err := impl.repositoryManager.Fetch(userName, password, material.Url, location) +// if err != nil { +// impl.logger.Errorw("error in fetching material details ", "repo", material.Url, "err", err) +// // there might be the case if ssh private key gets flush from disk, so creating and single retrying in this case +// if gitProvider.AuthMode == sql.AUTH_MODE_SSH { +// err = impl.repositoryManager.CreateSshFileIfNotExistsAndConfigureSshCommand(location, gitProvider.Id, gitProvider.SshPrivateKey) +// if err != nil { +// impl.logger.Errorw("error in creating/configuring ssh private key on disk ", "repo", material.Url, "gitProviderId", gitProvider.Id, "err", err) +// return err +// } else { +// impl.logger.Info("Retrying fetching for", "repo", material.Url) +// updated, repo, err = impl.repositoryManager.Fetch(userName, password, material.Url, location) +// if err != nil { +// impl.logger.Errorw("error in fetching material details in retry", "repo", material.Url, "err", err) +// return err +// } +// } +// } else { +// return err +// } +// } +// if !updated { +// return nil +// } +// materials, err := impl.ciPipelineMaterialRepository.FindByGitMaterialId(material.Id) +// if err != nil { +// impl.logger.Errorw("error in calculating head", "err", err, "url", material.Url) +// return err +// } +// var updatedMaterials []*CiPipelineMaterialBean +// var updatedMaterialsModel []*sql.CiPipelineMaterial +// var erroredMaterialsModels []*sql.CiPipelineMaterial +// for _, material := range materials { +// if material.Type != sql.SOURCE_TYPE_BRANCH_FIXED { +// continue +// } +// commits, err := impl.repositoryManager.ChangesSinceByRepository(repo, material.Value, "", "", 15) +// if err != nil { +// material.Errored = true +// material.ErrorMsg = err.Error() +// erroredMaterialsModels = append(erroredMaterialsModels, material) +// } else if len(commits) > 0 { +// latestCommit := commits[0] +// if latestCommit.Commit != material.LastSeenHash { +// //new commit found +// mb := &CiPipelineMaterialBean{ +// Id: material.Id, +// Value: material.Value, +// GitMaterialId: material.GitMaterialId, +// Type: material.Type, +// Active: material.Active, +// GitCommit: latestCommit, +// } +// updatedMaterials = append(updatedMaterials, mb) +// +// material.LastSeenHash = latestCommit.Commit +// material.CommitAuthor = latestCommit.Author +// material.CommitDate = latestCommit.Date +// commitJson, _ := json.Marshal(commits) +// material.CommitHistory = string(commitJson) +// material.Errored = false +// material.ErrorMsg = "" +// updatedMaterialsModel = append(updatedMaterialsModel, material) +// } +// middleware.GitMaterialUpdateCounter.WithLabelValues().Inc() +// } +// } +// if len(updatedMaterialsModel) > 0 { +// err = impl.NotifyForMaterialUpdate(updatedMaterials) +// if err != nil { +// impl.logger.Errorw("error in sending notification for materials", "url", material.Url, "update", updatedMaterialsModel) +// } +// err = impl.ciPipelineMaterialRepository.Update(updatedMaterialsModel) +// if err != nil { +// impl.logger.Errorw("error in update db ", "url", material.Url, "update", updatedMaterialsModel) +// impl.logger.Errorw("error in sending notification for materials", "url", material.Url, "update", updatedMaterialsModel) +// } +// } +// if len(erroredMaterialsModels) > 0 { +// err = impl.ciPipelineMaterialRepository.Update(updatedMaterialsModel) +// if err != nil { +// impl.logger.Errorw("error in update db ", "url", material.Url, "update", updatedMaterialsModel) +// impl.logger.Errorw("error in sending notification for materials", "url", material.Url, "update", updatedMaterialsModel) +// } +// } +// return nil +//} + +//func (impl GitWatcherImpl) NotifyForMaterialUpdate(materials []*CiPipelineMaterialBean) error { +// +// impl.logger.Warnw("material notification", "materials", materials) +// for _, material := range materials { +// mb, err := json.Marshal(material) +// if err != nil { +// impl.logger.Error("err in json marshaling", "err", err) +// continue +// } +// err = impl.pubSubClient.Publish(pubsub.NEW_CI_MATERIAL_TOPIC, string(mb)) +// if err != nil { +// impl.logger.Errorw("error in publishing material modification msg ", "material", material) +// } +// +// } +// return nil +//} + func (impl GitWatcherImpl) SubscribeWebhookEvent() error { callback := func(msg *pubsub.PubSubMsg) { impl.logger.Debugw("received msg", "msg", msg) From f2e0c14e3ba9d3f2dc68cd530f9a726f3e94e148 Mon Sep 17 00:00:00 2001 From: Vishal Bihani Date: Tue, 11 Apr 2023 22:36:49 +0530 Subject: [PATCH 02/24] update ref material id while adding repo --- internal/sql/GitMaterial.go | 36 ++++++++++++++++- pkg/RepoManages.go | 78 +++++++++++++++++++++++++++++++++++-- pkg/git/Watcher.go | 6 +-- 3 files changed, 113 insertions(+), 7 deletions(-) diff --git a/internal/sql/GitMaterial.go b/internal/sql/GitMaterial.go index 7fb68d03..5e18783a 100644 --- a/internal/sql/GitMaterial.go +++ b/internal/sql/GitMaterial.go @@ -54,13 +54,17 @@ type GitMaterial struct { } type MaterialRepository interface { + GetConnection() *pg.DB FindById(id int) (*GitMaterial, error) Update(material *GitMaterial) error Save(material *GitMaterial) error + SaveWithTransaction(material *GitMaterial, tx *pg.Tx) error FindActive() ([]*GitMaterial, error) FindAll() ([]*GitMaterial, error) FindAllActiveByUrls(urls []string) ([]*GitMaterial, error) FindReferencedGitMaterial() ([]*GitMaterial, error) + GetMaterialWithSameRepoUrl(repoUrl string, limit int) (*GitMaterial, error) + UpdateRefMaterialId(material *GitMaterial, tx *pg.Tx) error } type MaterialRepositoryImpl struct { dbConnection *pg.DB @@ -70,11 +74,20 @@ func NewMaterialRepositoryImpl(dbConnection *pg.DB) *MaterialRepositoryImpl { return &MaterialRepositoryImpl{dbConnection: dbConnection} } +func (repo MaterialRepositoryImpl) GetConnection() *pg.DB { + return repo.dbConnection +} + func (repo MaterialRepositoryImpl) Save(material *GitMaterial) error { err := repo.dbConnection.Insert(material) return err } +func (repo MaterialRepositoryImpl) SaveWithTransaction(material *GitMaterial, tx *pg.Tx) error { + err := tx.Insert(material) + return err +} + func (repo MaterialRepositoryImpl) Update(material *GitMaterial) error { _, err := repo.dbConnection.Model(material).WherePK().Update() return err @@ -129,8 +142,8 @@ func (repo MaterialRepositoryImpl) FindReferencedGitMaterial() ([]*GitMaterial, var materials []*GitMaterial err := repo.dbConnection.Model(&materials). + ColumnExpr("DISTINCT(gm.id)"). Column("gm.*", "gm.GitProvider"). - ColumnExpr("DISTINCT gm.id"). Join("INNER JOIN git_material gm ON git_material.ref_git_material_id = gm.id"). Where("git_material.deleted = ? ", false). Where("git_material.checkout_status = ? ", true). @@ -139,3 +152,24 @@ func (repo MaterialRepositoryImpl) FindReferencedGitMaterial() ([]*GitMaterial, return materials, err } + +func (repo MaterialRepositoryImpl) GetMaterialWithSameRepoUrl(repoUrl string, limit int) (*GitMaterial, error) { + + material := &GitMaterial{} + err := repo.dbConnection.Model(material). + Where("url = ?", repoUrl). + Limit(limit). + Select() + + return material, err +} + +func (repo MaterialRepositoryImpl) UpdateRefMaterialId(material *GitMaterial, tx *pg.Tx) error { + + _, err := tx.Model(material). + Column("ref_git_material_id"). + WherePK(). + Update() + + return err +} diff --git a/pkg/RepoManages.go b/pkg/RepoManages.go index f625c248..131a2b0a 100644 --- a/pkg/RepoManages.go +++ b/pkg/RepoManages.go @@ -24,6 +24,7 @@ import ( "github.com/devtron-labs/git-sensor/internal/sql" "github.com/devtron-labs/git-sensor/internal/util" "github.com/devtron-labs/git-sensor/pkg/git" + "github.com/go-pg/pg" _ "github.com/robfig/cron/v3" "go.uber.org/zap" ) @@ -238,7 +239,7 @@ func (impl RepoManagerImpl) SaveGitProvider(provider *sql.GitProvider) (*sql.Git return provider, err } -//handle update +// handle update func (impl RepoManagerImpl) AddRepo(materials []*sql.GitMaterial) ([]*sql.GitMaterial, error) { for _, material := range materials { _, err := impl.addRepo(material) @@ -307,11 +308,82 @@ func (impl RepoManagerImpl) checkoutUpdatedRepo(materialId int) error { } func (impl RepoManagerImpl) addRepo(material *sql.GitMaterial) (*sql.GitMaterial, error) { - err := impl.materialRepository.Save(material) + + updateRefWithOwnId := false + + // Get the ref_material_id of the material (same url) that already exists + existingMaterial, err := impl.materialRepository.GetMaterialWithSameRepoUrl(material.Url, 1) + if err != nil && err != pg.ErrNoRows { + impl.logger.Errorw("error fetching existing git material with same url", + "material", material, + "err", err) + + return material, err + } + + // if no existing material found + if err == pg.ErrNoRows { + // Use own id as ref + updateRefWithOwnId = true + } else { + // else use existing ones ref id + material.RefGitMaterialId = existingMaterial.RefGitMaterialId + } + + // perform in transaction + tx, err := impl.materialRepository. + GetConnection(). + Begin() + + err = impl.materialRepository.SaveWithTransaction(material, tx) if err != nil { - impl.logger.Errorw("error in saving material ", "material", material, "err", err) + impl.logger.Errorw("error in saving material ", + "material", material, + "err", err) + return material, err } + + // if updateRefWithOwnId is true, update ref column + if !updateRefWithOwnId { + err := tx.Commit() + if err != nil { + impl.logger.Errorw("error while committing changes", + "operation", "save git material", + "err", err) + + return material, err + } + } else { + + material.RefGitMaterialId = material.Id + err := impl.materialRepository.UpdateRefMaterialId(material, tx) + if err != nil { + + // rollback + impl.logger.Errorw("error updating ref material id with self id. performing rollback", + "material", material, + "err", err) + + err := tx.Rollback() + if err != nil { + impl.logger.Errorw("error while performing rollback", "err", err) + return material, err + } + + return material, err + } + + err = tx.Commit() + if err != nil { + impl.logger.Errorw("error while committing changes", + "operation", "updating ref material id", + "err", err) + + return material, err + } + } + return impl.checkoutRepo(material) } diff --git a/pkg/git/Watcher.go b/pkg/git/Watcher.go index a241df24..58e98f42 100644 --- a/pkg/git/Watcher.go +++ b/pkg/git/Watcher.go @@ -146,7 +146,7 @@ func (impl GitWatcherImpl) RunOnWorker(materials []*sql.GitMaterial) { for _, material := range materials { wp.Submit(func() { - err := impl.PollAndUpdateGitMaterial(material) + _, err := impl.PollAndUpdateGitMaterial(material) if err != nil { impl.logger.Errorw("error in polling git material", "material", material, @@ -157,7 +157,7 @@ func (impl GitWatcherImpl) RunOnWorker(materials []*sql.GitMaterial) { wp.StopWait() } -func (impl GitWatcherImpl) PollAndUpdateGitMaterial(material *sql.GitMaterial) error { +func (impl GitWatcherImpl) PollAndUpdateGitMaterial(material *sql.GitMaterial) (*sql.GitMaterial, error) { repoLock := impl.locker.LeaseLocker(material.Id) repoLock.Mutex.Lock() defer func() { @@ -183,7 +183,7 @@ func (impl GitWatcherImpl) PollAndUpdateGitMaterial(material *sql.GitMaterial) e if err != nil { impl.logger.Errorw("error in updating fetch status", "material", material, "err", err) } - return err + return material, err } func (impl GitWatcherImpl) PollGitMaterialAndNotify(material *sql.GitMaterial) error { From 7c3196ff5735da0a81eb9aa6434ec2988f4796ef Mon Sep 17 00:00:00 2001 From: Vishal Bihani Date: Tue, 11 Apr 2023 23:36:01 +0530 Subject: [PATCH 03/24] add sql scripts --- scripts/sql/10_git_material_ref.down.sql | 3 +++ scripts/sql/10_git_material_ref.up.sql | 17 +++++++++++++++++ 2 files changed, 20 insertions(+) create mode 100644 scripts/sql/10_git_material_ref.down.sql create mode 100644 scripts/sql/10_git_material_ref.up.sql diff --git a/scripts/sql/10_git_material_ref.down.sql b/scripts/sql/10_git_material_ref.down.sql new file mode 100644 index 00000000..63da70e5 --- /dev/null +++ b/scripts/sql/10_git_material_ref.down.sql @@ -0,0 +1,3 @@ +---- drop ref_git_material_id column +ALTER TABLE git_material DROP COLUMN ref_git_material_id; + diff --git a/scripts/sql/10_git_material_ref.up.sql b/scripts/sql/10_git_material_ref.up.sql new file mode 100644 index 00000000..7923b931 --- /dev/null +++ b/scripts/sql/10_git_material_ref.up.sql @@ -0,0 +1,17 @@ +---- add ref_git_material_id column +ALTER TABLE git_material ADD COLUMN IF NOT EXISTS ref_git_material_id INTEGER; + +DO $$ +DECLARE + material_id integer; + repo_url varchar; + xnum integer; +BEGIN + FOR material_id, repo_url, xnum IN SELECT * FROM ( + SELECT id, url, ROW_NUMBER() OVER (PARTITION BY url ORDER BY id ASC) AS num FROM git_material WHERE deleted = false + ) materials where num <= 1 + LOOP + UPDATE git_material SET ref_git_material_id = material_id WHERE url = repo_url; + RAISE NOTICE 'updated ref_git_material_id for url: % to id: %', repo_url, material_id; + END LOOP; +END$$; From f96cadbf56b32881ebf2fb5ae4feea66e741e803 Mon Sep 17 00:00:00 2001 From: Vishal Bihani Date: Wed, 12 Apr 2023 01:52:14 +0530 Subject: [PATCH 04/24] re-balance while deleting git material --- internal/sql/GitMaterial.go | 57 +++++++++++++- pkg/RepoManages.go | 143 +++++++++++++++++++++++++++++++++++- pkg/git/Watcher.go | 2 +- 3 files changed, 196 insertions(+), 6 deletions(-) diff --git a/internal/sql/GitMaterial.go b/internal/sql/GitMaterial.go index 5e18783a..4a8b0060 100644 --- a/internal/sql/GitMaterial.go +++ b/internal/sql/GitMaterial.go @@ -62,9 +62,13 @@ type MaterialRepository interface { FindActive() ([]*GitMaterial, error) FindAll() ([]*GitMaterial, error) FindAllActiveByUrls(urls []string) ([]*GitMaterial, error) - FindReferencedGitMaterial() ([]*GitMaterial, error) + FindAllReferencedGitMaterials() ([]*GitMaterial, error) + FindReferencedGitMaterial(materialId int) (*GitMaterial, error) GetMaterialWithSameRepoUrl(repoUrl string, limit int) (*GitMaterial, error) UpdateRefMaterialId(material *GitMaterial, tx *pg.Tx) error + FindByReferenceId(refGitMaterialId int, limit int, excludingIds []int) (*GitMaterial, error) + UpdateWithTransaction(material *GitMaterial, tx *pg.Tx) error + UpdateRefMaterialIdForAllWithSameUrl(url string, refGitMaterialId int, tx *pg.Tx) error } type MaterialRepositoryImpl struct { dbConnection *pg.DB @@ -93,6 +97,14 @@ func (repo MaterialRepositoryImpl) Update(material *GitMaterial) error { return err } +func (repo MaterialRepositoryImpl) UpdateWithTransaction(material *GitMaterial, tx *pg.Tx) error { + _, err := tx.Model(material). + WherePK(). + Update() + + return err +} + func (repo MaterialRepositoryImpl) FindActive() ([]*GitMaterial, error) { var materials []*GitMaterial err := repo.dbConnection.Model(&materials). @@ -138,7 +150,7 @@ func (repo MaterialRepositoryImpl) FindAllActiveByUrls(urls []string) ([]*GitMat return materials, err } -func (repo MaterialRepositoryImpl) FindReferencedGitMaterial() ([]*GitMaterial, error) { +func (repo MaterialRepositoryImpl) FindAllReferencedGitMaterials() ([]*GitMaterial, error) { var materials []*GitMaterial err := repo.dbConnection.Model(&materials). @@ -153,11 +165,24 @@ func (repo MaterialRepositoryImpl) FindReferencedGitMaterial() ([]*GitMaterial, return materials, err } +func (repo MaterialRepositoryImpl) FindReferencedGitMaterial(materialId int) (*GitMaterial, error) { + + material := &GitMaterial{} + err := repo.dbConnection.Model(material). + Column("gm.*", "gm.GitProvider"). + Join("INNER JOIN git_material gm ON git_material.ref_git_material_id = gm.id"). + Where("git_material.id = ? ", materialId). + Select() + + return material, err +} + func (repo MaterialRepositoryImpl) GetMaterialWithSameRepoUrl(repoUrl string, limit int) (*GitMaterial, error) { material := &GitMaterial{} err := repo.dbConnection.Model(material). Where("url = ?", repoUrl). + Where("deleted = false"). Limit(limit). Select() @@ -173,3 +198,31 @@ func (repo MaterialRepositoryImpl) UpdateRefMaterialId(material *GitMaterial, tx return err } + +func (repo MaterialRepositoryImpl) UpdateRefMaterialIdForAllWithSameUrl(url string, refGitMaterialId int, tx *pg.Tx) error { + + material := &GitMaterial{} + _, err := tx.Model(material). + Set("ref_git_material_id", refGitMaterialId). + Where("url = ?", url). + Where("deleted = ?", false). + Update() + + return err +} + +func (repo MaterialRepositoryImpl) FindByReferenceId(refGitMaterialId int, limit int, excludingIds []int) (*GitMaterial, error) { + var material GitMaterial + qry := repo.dbConnection.Model(&material). + Column("git_material.*", "GitProvider"). + Where("git_material.ref_git_material_id = ? ", refGitMaterialId). + Where("git_material.deleted =? ", false). + Limit(limit) + + if len(excludingIds) > 0 { + qry = qry.Where("git_material.id not in (?)", pg.In(excludingIds)) + } + + err := qry.Select() + return &material, err +} diff --git a/pkg/RepoManages.go b/pkg/RepoManages.go index 131a2b0a..956dae1f 100644 --- a/pkg/RepoManages.go +++ b/pkg/RepoManages.go @@ -257,6 +257,121 @@ func (impl RepoManagerImpl) UpdateRepo(material *sql.GitMaterial) (*sql.GitMater impl.logger.Errorw("error in fetching material", err) return nil, err } + + isSelfReferenced := existingMaterial.Id == existingMaterial.RefGitMaterialId + tx, err := impl.materialRepository.GetConnection(). + Begin() + if err != nil { + impl.logger.Errorw("error while starting transaction", + "err", err) + return nil, err + } + + // if url / deleted is updated, ref_git_material_id needs to be updated as well + if material.Url != existingMaterial.Url { + // url is updated + + if isSelfReferenced { + // as it is self referenced, need to update ref of other git materials + excludingIds := []int{existingMaterial.Id} + nxtMaterial, err := impl.materialRepository.FindByReferenceId(existingMaterial.Id, 1, excludingIds) + if err != nil && err != pg.ErrNoRows { + impl.logger.Errorw("error while getting next material with same repo url", + "err", err) + + return nil, err + } else { + if err == nil { + // If other materials found + err = impl.materialRepository.UpdateRefMaterialIdForAllWithSameUrl(nxtMaterial.Url, nxtMaterial.Id, tx) + if err != nil { + impl.logger.Errorw("error while updating ref git material id", + "err", err) + + _ = tx.Rollback() + return nil, err + } + + resp, err := impl.RefreshGitMaterial(&git.RefreshGitMaterialRequest{ + GitMaterialId: nxtMaterial.Id, + }) + if err != nil { + impl.logger.Errorw("error while refreshing new referenced material", + "response", resp, + "err", err) + + _ = tx.Rollback() + return nil, err + } + + } else if err == pg.ErrNoRows { + // If no other materials found with same url + impl.logger.Infow("no other git materials found referencing given material id", + "materialId", existingMaterial.Id) + } + } + } + + materialWithSameUrl, err := impl.materialRepository.GetMaterialWithSameRepoUrl(material.Url, 1) + if err != nil && err != pg.ErrNoRows { + impl.logger.Errorw("error fetching existing git material with same url", + "material", material, + "err", err) + + return nil, err + } + + // if no existing material found + if err == pg.ErrNoRows { + existingMaterial.RefGitMaterialId = existingMaterial.Id + + } else { + // else use existing ones ref id + existingMaterial.RefGitMaterialId = materialWithSameUrl.RefGitMaterialId + } + } + + if material.Deleted && isSelfReferenced { + // as it is self referenced, need to update ref of other git materials + excludingIds := []int{existingMaterial.Id} + nxtMaterial, err := impl.materialRepository.FindByReferenceId(existingMaterial.Id, 1, excludingIds) + if err != nil && err != pg.ErrNoRows { + impl.logger.Errorw("error while getting next material with same repo url", + "err", err) + + return nil, err + } else { + if err == nil { + // If other materials found + err = impl.materialRepository.UpdateRefMaterialIdForAllWithSameUrl(nxtMaterial.Url, nxtMaterial.Id, tx) + if err != nil { + impl.logger.Errorw("error while updating ref git material id", + "err", err) + + _ = tx.Rollback() + return nil, err + } + + resp, err := impl.RefreshGitMaterial(&git.RefreshGitMaterialRequest{ + GitMaterialId: nxtMaterial.Id, + }) + if err != nil { + impl.logger.Errorw("error while refreshing new referenced material", + "response", resp, + "err", err) + + _ = tx.Rollback() + return nil, err + } + + } else if err == pg.ErrNoRows { + // If no other materials found with same url + impl.logger.Infow("no other git materials found referencing given material id", + "materialId", existingMaterial.Id) + } + } + } + existingMaterial.Name = material.Name existingMaterial.Url = material.Url existingMaterial.GitProviderId = material.GitProviderId @@ -264,9 +379,10 @@ func (impl RepoManagerImpl) UpdateRepo(material *sql.GitMaterial) (*sql.GitMater existingMaterial.CheckoutStatus = false existingMaterial.FetchSubmodules = material.FetchSubmodules - err = impl.materialRepository.Update(existingMaterial) + err = impl.materialRepository.UpdateWithTransaction(existingMaterial, tx) if err != nil { impl.logger.Errorw("error in updating material ", "material", material, "err", err) + _ = tx.Rollback() return nil, err } @@ -280,6 +396,7 @@ func (impl RepoManagerImpl) UpdateRepo(material *sql.GitMaterial) (*sql.GitMater err = impl.repositoryManager.Clean(existingMaterial.CheckoutLocation) if err != nil { impl.logger.Errorw("error in refreshing material ", "err", err) + _ = tx.Rollback() return nil, err } @@ -287,9 +404,19 @@ func (impl RepoManagerImpl) UpdateRepo(material *sql.GitMaterial) (*sql.GitMater err = impl.checkoutUpdatedRepo(material.Id) if err != nil { impl.logger.Errorw("error in checking out updated repo", "err", err) + _ = tx.Rollback() return nil, err } } + + err = tx.Commit() + if err != nil { + impl.logger.Errorw("error while committing transaction", + "err", err) + _ = tx.Rollback() + return nil, err + } + return existingMaterial, nil } @@ -785,11 +912,21 @@ type ReleaseChangesRequest struct { } func (impl RepoManagerImpl) RefreshGitMaterial(req *git.RefreshGitMaterialRequest) (*git.RefreshGitMaterialResponse, error) { - material := &sql.GitMaterial{Id: req.GitMaterialId} res := &git.RefreshGitMaterialResponse{} + + referencedMaterial, err := impl.materialRepository.FindReferencedGitMaterial(req.GitMaterialId) + if err != nil { + impl.logger.Errorw("error while getting referenced git material", + "materialId", req.GitMaterialId, + "err", err) + + res.ErrorMsg = err.Error() + return res, err + } + //refresh repo. and notify all pipeline for changes //lock inside watcher itself - material, err := impl.gitWatcher.PollAndUpdateGitMaterial(material) + material, err := impl.gitWatcher.PollAndUpdateGitMaterial(referencedMaterial) if err != nil { res.ErrorMsg = err.Error() } else if material.LastFetchErrorCount > 0 { diff --git a/pkg/git/Watcher.go b/pkg/git/Watcher.go index 58e98f42..ebb4451b 100644 --- a/pkg/git/Watcher.go +++ b/pkg/git/Watcher.go @@ -130,7 +130,7 @@ func (impl GitWatcherImpl) Watch() { impl.logger.Infow("Starting git watcher thread") // Get the list of git materials which are referenced by other materials - refGitMaterials, err := impl.materialRepo.FindReferencedGitMaterial() + refGitMaterials, err := impl.materialRepo.FindAllReferencedGitMaterials() if err != nil { impl.logger.Errorw("Error getting list of referenced git materials", "err", err) From 50e849d3d3b0054f250755a50d556cb267877314 Mon Sep 17 00:00:00 2001 From: Vishal Bihani Date: Wed, 12 Apr 2023 09:01:55 +0530 Subject: [PATCH 05/24] fix git material sql query --- internal/sql/GitMaterial.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/internal/sql/GitMaterial.go b/internal/sql/GitMaterial.go index 4a8b0060..e89a3014 100644 --- a/internal/sql/GitMaterial.go +++ b/internal/sql/GitMaterial.go @@ -153,13 +153,14 @@ func (repo MaterialRepositoryImpl) FindAllActiveByUrls(urls []string) ([]*GitMat func (repo MaterialRepositoryImpl) FindAllReferencedGitMaterials() ([]*GitMaterial, error) { var materials []*GitMaterial + err := repo.dbConnection.Model(&materials). - ColumnExpr("DISTINCT(gm.id)"). - Column("gm.*", "gm.GitProvider"). - Join("INNER JOIN git_material gm ON git_material.ref_git_material_id = gm.id"). - Where("git_material.deleted = ? ", false). - Where("git_material.checkout_status = ? ", true). - Order("gm.id ASC"). + ColumnExpr("DISTINCT(git_material.id)"). + Column("git_material.*", "GitProvider"). + Join("RIGHT JOIN git_material gm ON git_material.id = gm.ref_git_material_id"). + Where("gm.deleted = ? ", false). + Where("gm.checkout_status = ? ", true). + Order("git_material.id ASC"). Select() return materials, err @@ -169,9 +170,9 @@ func (repo MaterialRepositoryImpl) FindReferencedGitMaterial(materialId int) (*G material := &GitMaterial{} err := repo.dbConnection.Model(material). - Column("gm.*", "gm.GitProvider"). - Join("INNER JOIN git_material gm ON git_material.ref_git_material_id = gm.id"). - Where("git_material.id = ? ", materialId). + Column("git_material.*", "GitProvider"). + Join("RIGHT JOIN git_material gm ON git_material.id = gm.ref_git_material_id"). + Where("gm.id = ? ", materialId). Select() return material, err From f1f787c66686c46f883cc063b681c03b3a034f44 Mon Sep 17 00:00:00 2001 From: Vishal Bihani Date: Wed, 12 Apr 2023 09:12:44 +0530 Subject: [PATCH 06/24] fix rest handler layer --- pkg/RepoManages.go | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/pkg/RepoManages.go b/pkg/RepoManages.go index 956dae1f..30169bd1 100644 --- a/pkg/RepoManages.go +++ b/pkg/RepoManages.go @@ -752,20 +752,20 @@ func (impl RepoManagerImpl) GetCommitMetadata(pipelineMaterialId int, gitHash st if err != nil { return nil, err } - gitMaterial, err := impl.materialRepository.FindById(pipelineMaterial.GitMaterialId) + referencedGitMaterial, err := impl.materialRepository.FindReferencedGitMaterial(pipelineMaterial.GitMaterialId) if err != nil { return nil, err } - if !gitMaterial.CheckoutStatus { - return nil, fmt.Errorf("checkout not succeed please checkout first %s", gitMaterial.Url) + if !referencedGitMaterial.CheckoutStatus { + return nil, fmt.Errorf("checkout not succeed please checkout first %s", referencedGitMaterial.Url) } - repoLock := impl.locker.LeaseLocker(gitMaterial.Id) + repoLock := impl.locker.LeaseLocker(referencedGitMaterial.Id) repoLock.Mutex.Lock() defer func() { repoLock.Mutex.Unlock() - impl.locker.ReturnLocker(gitMaterial.Id) + impl.locker.ReturnLocker(referencedGitMaterial.Id) }() - commit, err := impl.repositoryManager.GetCommitMetadata(gitMaterial.CheckoutLocation, gitHash) + commit, err := impl.repositoryManager.GetCommitMetadata(referencedGitMaterial.CheckoutLocation, gitHash) return commit, err } @@ -777,25 +777,25 @@ func (impl RepoManagerImpl) GetLatestCommitForBranch(pipelineMaterialId int, bra return nil, err } - gitMaterial, err := impl.materialRepository.FindById(pipelineMaterial.GitMaterialId) + referencedGitMaterial, err := impl.materialRepository.FindReferencedGitMaterial(pipelineMaterial.GitMaterialId) if err != nil { impl.logger.Errorw("error in getting material ", "gitMaterialId", pipelineMaterial.GitMaterialId, "err", err) return nil, err } - if !gitMaterial.CheckoutStatus { - return nil, fmt.Errorf("checkout not succeed please checkout first %s", gitMaterial.Url) + if !referencedGitMaterial.CheckoutStatus { + return nil, fmt.Errorf("checkout not succeed please checkout first %s", referencedGitMaterial.Url) } - repoLock := impl.locker.LeaseLocker(gitMaterial.Id) + repoLock := impl.locker.LeaseLocker(referencedGitMaterial.Id) repoLock.Mutex.Lock() defer func() { repoLock.Mutex.Unlock() - impl.locker.ReturnLocker(gitMaterial.Id) + impl.locker.ReturnLocker(referencedGitMaterial.Id) }() - userName, password, err := git.GetUserNamePassword(gitMaterial.GitProvider) - updated, repo, err := impl.repositoryManager.Fetch(userName, password, gitMaterial.Url, gitMaterial.CheckoutLocation) + userName, password, err := git.GetUserNamePassword(referencedGitMaterial.GitProvider) + updated, repo, err := impl.repositoryManager.Fetch(userName, password, referencedGitMaterial.Url, referencedGitMaterial.CheckoutLocation) if err != nil { impl.logger.Errorw("error in fetching the repository ", "err", err) @@ -838,27 +838,27 @@ func (impl RepoManagerImpl) GetCommitMetadataForPipelineMaterial(pipelineMateria // fetch gitMaterial gitMaterialId := pipelineMaterial.GitMaterialId - gitMaterial, err := impl.materialRepository.FindById(gitMaterialId) + referencedGitMaterial, err := impl.materialRepository.FindReferencedGitMaterial(gitMaterialId) if err != nil { impl.logger.Errorw("error while fetching gitMaterial", "gitMaterialId", gitMaterialId, "err", err) return nil, err } // validate checkout status of gitMaterial - if !gitMaterial.CheckoutStatus { + if !referencedGitMaterial.CheckoutStatus { impl.logger.Errorw("checkout not success", "gitMaterialId", gitMaterialId) - return nil, fmt.Errorf("checkout not succeed please checkout first %s", gitMaterial.Url) + return nil, fmt.Errorf("checkout not succeed please checkout first %s", referencedGitMaterial.Url) } // lock-unlock - repoLock := impl.locker.LeaseLocker(gitMaterial.Id) + repoLock := impl.locker.LeaseLocker(referencedGitMaterial.Id) repoLock.Mutex.Lock() defer func() { repoLock.Mutex.Unlock() - impl.locker.ReturnLocker(gitMaterial.Id) + impl.locker.ReturnLocker(referencedGitMaterial.Id) }() - commits, err := impl.repositoryManager.ChangesSince(gitMaterial.CheckoutLocation, branchName, "", gitHash, 1) + commits, err := impl.repositoryManager.ChangesSince(referencedGitMaterial.CheckoutLocation, branchName, "", gitHash, 1) if err != nil { impl.logger.Errorw("error while fetching commit info", "pipelineMaterialId", pipelineMaterialId, "gitHash", gitHash, "err", err) return nil, err From ee2c5dc6634d03f53eed26387ce472cc72f666c5 Mon Sep 17 00:00:00 2001 From: Vishal Bihani Date: Wed, 12 Apr 2023 09:26:55 +0530 Subject: [PATCH 07/24] git material discard unknown columns --- internal/sql/GitMaterial.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/sql/GitMaterial.go b/internal/sql/GitMaterial.go index e89a3014..435cc32f 100644 --- a/internal/sql/GitMaterial.go +++ b/internal/sql/GitMaterial.go @@ -33,7 +33,7 @@ const ( // TODO: add support for submodule type GitMaterial struct { - tableName struct{} `sql:"git_material"` + tableName struct{} `sql:"git_material" pg:",discard_unknown_columns"` Id int `sql:"id,pk"` GitProviderId int `sql:"git_provider_id,notnull"` Url string `sql:"url,omitempty"` From 9e22bc6baedf0a93ce605af4f358e9997fe7784f Mon Sep 17 00:00:00 2001 From: Vishal Bihani Date: Wed, 12 Apr 2023 10:35:11 +0530 Subject: [PATCH 08/24] fix worker concurrency issue --- pkg/git/Watcher.go | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/pkg/git/Watcher.go b/pkg/git/Watcher.go index ebb4451b..4fb55777 100644 --- a/pkg/git/Watcher.go +++ b/pkg/git/Watcher.go @@ -146,7 +146,26 @@ func (impl GitWatcherImpl) RunOnWorker(materials []*sql.GitMaterial) { for _, material := range materials { wp.Submit(func() { - _, err := impl.PollAndUpdateGitMaterial(material) + + nMaterial := &sql.GitMaterial{ + Id: material.Id, + GitProviderId: material.GitProviderId, + GitProvider: material.GitProvider, + Url: material.Url, + FetchSubmodules: material.FetchSubmodules, + Name: material.Name, + CheckoutLocation: material.CheckoutLocation, + CheckoutStatus: material.CheckoutStatus, + CheckoutMsgAny: material.CheckoutMsgAny, + Deleted: material.Deleted, + LastFetchTime: material.LastFetchTime, + LastFetchErrorCount: material.LastFetchErrorCount, + FetchErrorMessage: material.FetchErrorMessage, + RefGitMaterialId: material.RefGitMaterialId, + CiPipelineMaterials: material.CiPipelineMaterials, + } + + _, err := impl.PollAndUpdateGitMaterial(nMaterial) if err != nil { impl.logger.Errorw("error in polling git material", "material", material, @@ -238,6 +257,8 @@ func (impl GitWatcherImpl) PollGitMaterialAndNotify(material *sql.GitMaterial) e } if !updated { + impl.logger.Infow("no new updates found", + "material", material) return nil } From ab5aa6e8478508e2f3ebf6a5445554634ddb32ab Mon Sep 17 00:00:00 2001 From: Vishal Bihani Date: Wed, 12 Apr 2023 11:44:36 +0530 Subject: [PATCH 09/24] fix cipipeline material update error --- internal/sql/CiPipelineMaterial.go | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/internal/sql/CiPipelineMaterial.go b/internal/sql/CiPipelineMaterial.go index 7e11314a..9f6dc0fb 100644 --- a/internal/sql/CiPipelineMaterial.go +++ b/internal/sql/CiPipelineMaterial.go @@ -23,7 +23,7 @@ import ( ) type CiPipelineMaterial struct { - tableName struct{} `sql:"ci_pipeline_material"` + tableName struct{} `sql:"ci_pipeline_material" pg:",discard_unknown_columns"` Id int `sql:"id"` GitMaterialId int `sql:"git_material_id"` //id stored in db GitMaterial( foreign key) Type SourceType `sql:"type"` @@ -109,15 +109,23 @@ func (impl CiPipelineMaterialRepositoryImpl) FindById(id int) (*CiPipelineMateri func (impl CiPipelineMaterialRepositoryImpl) FindAllCiPipelineMaterialsReferencingGivenMaterial(gitMaterialId int) ([]*CiPipelineMaterial, error) { ciPipelineMaterials := make([]*CiPipelineMaterial, 0) - err := impl.dbConnection.Model(&ciPipelineMaterials). - ColumnExpr("DISTINCT ci_pipeline_material.value"). - Join("INNER JOIN git_material gm ON gm.id = ci_pipeline_material.git_material_id"). - Where("gm.ref_git_material_id = ?", gitMaterialId). - Where("gm.deleted = false"). - Where("ci_pipeline_material.active = true"). - Where("ci_pipeline_material.type = SOURCE_TYPE_BRANCH_FIXED"). - Select() - + //err := impl.dbConnection.Model(&ciPipelineMaterials). + // ColumnExpr("DISTINCT ci_pipeline_material.value"). + // Column("ci_pipeline_material.*"). + // Join("INNER JOIN git_material gm ON gm.id = ci_pipeline_material.git_material_id"). + // Where("gm.ref_git_material_id = ?", gitMaterialId). + // Where("gm.deleted = ?", false). + // Where("ci_pipeline_material.active = ?", true). + // Where("ci_pipeline_material.type = ?", "SOURCE_TYPE_BRANCH_FIXED"). + // Select() + + query := "select * from (" + + "select *, row_number() over (partition by value order by commit_date desc) as row_num from ci_pipeline_material cpm " + + "inner join git_material gm on gm.id = cpm.git_material_id " + + "where (gm.ref_git_material_id = ?) and (gm.deleted = false) and (cpm.active = true) and (cpm.\"type\" = 'SOURCE_TYPE_BRANCH_FIXED')" + + ") materials where row_num <= 1" + + _, err := impl.dbConnection.Query(&ciPipelineMaterials, query, gitMaterialId) return ciPipelineMaterials, err } From 53d07e7624524c4476faec32023a98453f4b4024 Mon Sep 17 00:00:00 2001 From: Vishal Bihani Date: Wed, 12 Apr 2023 12:09:10 +0530 Subject: [PATCH 10/24] fix update with join query errors --- internal/sql/CiPipelineMaterial.go | 48 ++++++++++++++++++++++-------- 1 file changed, 36 insertions(+), 12 deletions(-) diff --git a/internal/sql/CiPipelineMaterial.go b/internal/sql/CiPipelineMaterial.go index 9f6dc0fb..3eb1d1e6 100644 --- a/internal/sql/CiPipelineMaterial.go +++ b/internal/sql/CiPipelineMaterial.go @@ -131,24 +131,48 @@ func (impl CiPipelineMaterialRepositoryImpl) FindAllCiPipelineMaterialsReferenci func (impl CiPipelineMaterialRepositoryImpl) UpdateErroredCiPipelineMaterialsReferencingGivenGitMaterial(gitMaterialId int, branch string, material *CiPipelineMaterial) error { - _, err := impl.dbConnection.Model(material). - Column("errored", "error_msg"). - Join("INNER JOIN git_material gm ON gm.id = ci_pipeline_material.git_material_id"). - Where("gm.ref_git_material_id = ?", gitMaterialId). - Where("ci_pipeline_material.value = ?", branch). - Update() + //_, err := impl.dbConnection.Model(material). + // Column("ci_pipeline_material.errored", "ci_pipeline_material.error_msg"). + // Join("INNER JOIN git_material gm ON gm.id = ci_pipeline_material.git_material_id"). + // Where("gm.ref_git_material_id = ?", gitMaterialId). + // Where("ci_pipeline_material.value = ?", branch). + // Update() + + query := "UPDATE ci_pipeline_material " + + "SET ci_pipeline_material.errored = ?, " + + "ci_pipeline_material.error_msg = ? " + + "FROM git_material gm WHERE (ci_pipeline_material.git_material_id = gm.id) " + + "AND (gm.ref_git_material_id = ?) " + + "AND (ci_pipeline_material.value = ?)" + _, err := impl.dbConnection.Query(material, query, material.Errored, material.ErrorMsg, gitMaterialId, branch) return err } func (impl CiPipelineMaterialRepositoryImpl) UpdateCiPipelineMaterialsReferencingGivenGitMaterial(gitMaterialId int, branch string, material *CiPipelineMaterial) error { - _, err := impl.dbConnection.Model(material). - Column("last_seen_hash", "commit_author", "commit_date", "commit_history", "errored", "error_msg"). - Join("INNER JOIN git_material gm ON gm.id = ci_pipeline_material.git_material_id"). - Where("gm.ref_git_material_id = ?", gitMaterialId). - Where("ci_pipeline_material.value = ?", branch). - Update() + //_, err := impl.dbConnection.Model(material). + // Column("ci_pipeline_material.last_seen_hash", "ci_pipeline_material.commit_author", "ci_pipeline_material.commit_date", + // "ci_pipeline_material.commit_history", "ci_pipeline_material.errored", "ci_pipeline_material.error_msg"). + // Join("INNER JOIN git_material gm ON gm.id = ci_pipeline_material.git_material_id"). + // Where("gm.ref_git_material_id = ?", gitMaterialId). + // Where("ci_pipeline_material.value = ?", branch). + // Update() + + query := "UPDATE ci_pipeline_material " + + "SET ci_pipeline_material.errored = ?, " + + "ci_pipeline_material.error_msg = ? " + + "ci_pipeline_material.last_seen_hash = ? " + + "ci_pipeline_material.commit_author = ? " + + "ci_pipeline_material.commit_date = ? " + + "ci_pipeline_material.commit_history = ? " + + "FROM git_material gm WHERE (ci_pipeline_material.git_material_id = gm.id) " + + "AND (gm.ref_git_material_id = ?) " + + "AND (ci_pipeline_material.value = ?)" + + _, err := impl.dbConnection.Query(material, query, material.Errored, material.LastSeenHash, + material.CommitAuthor, material.CommitDate, material.CommitHistory, + material.ErrorMsg, gitMaterialId, branch) return err } From 6e1e6db91a12faa4e462f725cdfd9edb711f68cf Mon Sep 17 00:00:00 2001 From: Vishal Bihani Date: Wed, 12 Apr 2023 12:17:42 +0530 Subject: [PATCH 11/24] fix invalid sql query --- internal/sql/CiPipelineMaterial.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/sql/CiPipelineMaterial.go b/internal/sql/CiPipelineMaterial.go index 3eb1d1e6..3c75075d 100644 --- a/internal/sql/CiPipelineMaterial.go +++ b/internal/sql/CiPipelineMaterial.go @@ -161,11 +161,11 @@ func (impl CiPipelineMaterialRepositoryImpl) UpdateCiPipelineMaterialsReferencin query := "UPDATE ci_pipeline_material " + "SET ci_pipeline_material.errored = ?, " + - "ci_pipeline_material.error_msg = ? " + - "ci_pipeline_material.last_seen_hash = ? " + - "ci_pipeline_material.commit_author = ? " + - "ci_pipeline_material.commit_date = ? " + - "ci_pipeline_material.commit_history = ? " + + "ci_pipeline_material.error_msg = ?, " + + "ci_pipeline_material.last_seen_hash = ?, " + + "ci_pipeline_material.commit_author = ?, " + + "ci_pipeline_material.commit_date = ?, " + + "ci_pipeline_material.commit_history = ?, " + "FROM git_material gm WHERE (ci_pipeline_material.git_material_id = gm.id) " + "AND (gm.ref_git_material_id = ?) " + "AND (ci_pipeline_material.value = ?)" From 5937c7234dd04fc1a24fef019ecd4845ec3e576f Mon Sep 17 00:00:00 2001 From: Vishal Bihani Date: Wed, 12 Apr 2023 12:17:55 +0530 Subject: [PATCH 12/24] fix invalid sql query --- internal/sql/CiPipelineMaterial.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/sql/CiPipelineMaterial.go b/internal/sql/CiPipelineMaterial.go index 3c75075d..cd14b2c5 100644 --- a/internal/sql/CiPipelineMaterial.go +++ b/internal/sql/CiPipelineMaterial.go @@ -165,7 +165,7 @@ func (impl CiPipelineMaterialRepositoryImpl) UpdateCiPipelineMaterialsReferencin "ci_pipeline_material.last_seen_hash = ?, " + "ci_pipeline_material.commit_author = ?, " + "ci_pipeline_material.commit_date = ?, " + - "ci_pipeline_material.commit_history = ?, " + + "ci_pipeline_material.commit_history = ? " + "FROM git_material gm WHERE (ci_pipeline_material.git_material_id = gm.id) " + "AND (gm.ref_git_material_id = ?) " + "AND (ci_pipeline_material.value = ?)" From 2ee8c169e579cd7ed3ce6620d5de6ba7f4f10d30 Mon Sep 17 00:00:00 2001 From: Vishal Bihani Date: Wed, 12 Apr 2023 12:26:58 +0530 Subject: [PATCH 13/24] fix invalid sql query --- internal/sql/CiPipelineMaterial.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/internal/sql/CiPipelineMaterial.go b/internal/sql/CiPipelineMaterial.go index cd14b2c5..ef632371 100644 --- a/internal/sql/CiPipelineMaterial.go +++ b/internal/sql/CiPipelineMaterial.go @@ -138,12 +138,12 @@ func (impl CiPipelineMaterialRepositoryImpl) UpdateErroredCiPipelineMaterialsRef // Where("ci_pipeline_material.value = ?", branch). // Update() - query := "UPDATE ci_pipeline_material " + - "SET ci_pipeline_material.errored = ?, " + - "ci_pipeline_material.error_msg = ? " + - "FROM git_material gm WHERE (ci_pipeline_material.git_material_id = gm.id) " + + query := "UPDATE ci_pipeline_material cpm " + + "SET errored = ?, " + + "error_msg = ? " + + "FROM git_material gm WHERE (cpm.git_material_id = gm.id) " + "AND (gm.ref_git_material_id = ?) " + - "AND (ci_pipeline_material.value = ?)" + "AND (cpm.value = ?)" _, err := impl.dbConnection.Query(material, query, material.Errored, material.ErrorMsg, gitMaterialId, branch) return err @@ -159,16 +159,16 @@ func (impl CiPipelineMaterialRepositoryImpl) UpdateCiPipelineMaterialsReferencin // Where("ci_pipeline_material.value = ?", branch). // Update() - query := "UPDATE ci_pipeline_material " + - "SET ci_pipeline_material.errored = ?, " + - "ci_pipeline_material.error_msg = ?, " + - "ci_pipeline_material.last_seen_hash = ?, " + - "ci_pipeline_material.commit_author = ?, " + - "ci_pipeline_material.commit_date = ?, " + - "ci_pipeline_material.commit_history = ? " + - "FROM git_material gm WHERE (ci_pipeline_material.git_material_id = gm.id) " + + query := "UPDATE ci_pipeline_material cpm " + + "SET errored = ?, " + + "error_msg = ?, " + + "last_seen_hash = ?, " + + "commit_author = ?, " + + "commit_date = ?, " + + "commit_history = ? " + + "FROM git_material gm WHERE (cpm.git_material_id = gm.id) " + "AND (gm.ref_git_material_id = ?) " + - "AND (ci_pipeline_material.value = ?)" + "AND (cpm.value = ?)" _, err := impl.dbConnection.Query(material, query, material.Errored, material.LastSeenHash, material.CommitAuthor, material.CommitDate, material.CommitHistory, From 3e3e1edebed4049d7b147ce8790e6abb5aa7c3e6 Mon Sep 17 00:00:00 2001 From: Vishal Bihani Date: Wed, 12 Apr 2023 12:37:40 +0530 Subject: [PATCH 14/24] fix invalid sql query --- internal/sql/CiPipelineMaterial.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/internal/sql/CiPipelineMaterial.go b/internal/sql/CiPipelineMaterial.go index ef632371..3de9f94a 100644 --- a/internal/sql/CiPipelineMaterial.go +++ b/internal/sql/CiPipelineMaterial.go @@ -170,9 +170,8 @@ func (impl CiPipelineMaterialRepositoryImpl) UpdateCiPipelineMaterialsReferencin "AND (gm.ref_git_material_id = ?) " + "AND (cpm.value = ?)" - _, err := impl.dbConnection.Query(material, query, material.Errored, material.LastSeenHash, - material.CommitAuthor, material.CommitDate, material.CommitHistory, - material.ErrorMsg, gitMaterialId, branch) + _, err := impl.dbConnection.Query(material, query, material.Errored, material.ErrorMsg, material.LastSeenHash, + material.CommitAuthor, material.CommitDate, material.CommitHistory, gitMaterialId, branch) return err } From 53637bd6e2b4bbcceca2a40f412de01758df8995 Mon Sep 17 00:00:00 2001 From: Vishal Bihani Date: Wed, 12 Apr 2023 12:57:22 +0530 Subject: [PATCH 15/24] fix workerpool sharing resource issue --- pkg/git/Watcher.go | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/pkg/git/Watcher.go b/pkg/git/Watcher.go index 4fb55777..d42a6abc 100644 --- a/pkg/git/Watcher.go +++ b/pkg/git/Watcher.go @@ -145,26 +145,26 @@ func (impl GitWatcherImpl) RunOnWorker(materials []*sql.GitMaterial) { wp := workerpool.New(impl.pollConfig.PollWorker) for _, material := range materials { - wp.Submit(func() { - nMaterial := &sql.GitMaterial{ - Id: material.Id, - GitProviderId: material.GitProviderId, - GitProvider: material.GitProvider, - Url: material.Url, - FetchSubmodules: material.FetchSubmodules, - Name: material.Name, - CheckoutLocation: material.CheckoutLocation, - CheckoutStatus: material.CheckoutStatus, - CheckoutMsgAny: material.CheckoutMsgAny, - Deleted: material.Deleted, - LastFetchTime: material.LastFetchTime, - LastFetchErrorCount: material.LastFetchErrorCount, - FetchErrorMessage: material.FetchErrorMessage, - RefGitMaterialId: material.RefGitMaterialId, - CiPipelineMaterials: material.CiPipelineMaterials, - } + nMaterial := &sql.GitMaterial{ + Id: material.Id, + GitProviderId: material.GitProviderId, + GitProvider: material.GitProvider, + Url: material.Url, + FetchSubmodules: material.FetchSubmodules, + Name: material.Name, + CheckoutLocation: material.CheckoutLocation, + CheckoutStatus: material.CheckoutStatus, + CheckoutMsgAny: material.CheckoutMsgAny, + Deleted: material.Deleted, + LastFetchTime: material.LastFetchTime, + LastFetchErrorCount: material.LastFetchErrorCount, + FetchErrorMessage: material.FetchErrorMessage, + RefGitMaterialId: material.RefGitMaterialId, + CiPipelineMaterials: material.CiPipelineMaterials, + } + wp.Submit(func() { _, err := impl.PollAndUpdateGitMaterial(nMaterial) if err != nil { impl.logger.Errorw("error in polling git material", From 495555895c820577d4a7428bdc54fb902d5ed1d2 Mon Sep 17 00:00:00 2001 From: Vishal Bihani Date: Wed, 12 Apr 2023 14:22:23 +0530 Subject: [PATCH 16/24] fix ref update error --- internal/sql/GitMaterial.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/sql/GitMaterial.go b/internal/sql/GitMaterial.go index 435cc32f..baffd568 100644 --- a/internal/sql/GitMaterial.go +++ b/internal/sql/GitMaterial.go @@ -204,7 +204,7 @@ func (repo MaterialRepositoryImpl) UpdateRefMaterialIdForAllWithSameUrl(url stri material := &GitMaterial{} _, err := tx.Model(material). - Set("ref_git_material_id", refGitMaterialId). + Set("ref_git_material_id = ?", refGitMaterialId). Where("url = ?", url). Where("deleted = ?", false). Update() From fe08e3674e59a203a3e789f240851ea400871ac3 Mon Sep 17 00:00:00 2001 From: Vishal Bihani Date: Thu, 13 Apr 2023 00:44:18 +0530 Subject: [PATCH 17/24] refactor new ci material notify --- internal/sql/CiPipelineMaterial.go | 14 +++++++++ pkg/RepoManages.go | 30 +++++++----------- pkg/git/Bean.go | 7 ----- pkg/git/Watcher.go | 49 ++++++++++++++++++++---------- 4 files changed, 58 insertions(+), 42 deletions(-) diff --git a/internal/sql/CiPipelineMaterial.go b/internal/sql/CiPipelineMaterial.go index 3de9f94a..40dc4d53 100644 --- a/internal/sql/CiPipelineMaterial.go +++ b/internal/sql/CiPipelineMaterial.go @@ -45,6 +45,7 @@ type CiPipelineMaterialRepository interface { FindById(id int) (*CiPipelineMaterial, error) Exists(id int) (bool, error) Save(material []*CiPipelineMaterial) ([]*CiPipelineMaterial, error) + FindSimilarCiPipelineMaterials(repoUrl string, branch string) ([]*CiPipelineMaterial, error) FindAllCiPipelineMaterialsReferencingGivenMaterial(gitMaterialId int) ([]*CiPipelineMaterial, error) UpdateErroredCiPipelineMaterialsReferencingGivenGitMaterial(gitMaterialId int, branch string, material *CiPipelineMaterial) error UpdateCiPipelineMaterialsReferencingGivenGitMaterial(gitMaterialId int, branch string, material *CiPipelineMaterial) error @@ -92,6 +93,19 @@ func (impl CiPipelineMaterialRepositoryImpl) Update(materials []*CiPipelineMater return err } +func (impl CiPipelineMaterialRepositoryImpl) FindSimilarCiPipelineMaterials(repoUrl string, branch string) ([]*CiPipelineMaterial, error) { + materials := make([]*CiPipelineMaterial, 0) + err := impl.dbConnection.Model(&materials). + Join("INNER JOIN git_material gm ON gm.id = ci_pipeline_material.git_material_id"). + Where("gm.url = ?", repoUrl). + Where("gm.deleted = ?", false). + Where("ci_pipeline_material.value = ?", branch). + Where("ci_pipeline_material.active = ?", true). + Select() + + return materials, err +} + func (impl CiPipelineMaterialRepositoryImpl) FindByIds(ids []int) (materials []*CiPipelineMaterial, err error) { err = impl.dbConnection.Model(&materials). Where("id in (?)", pg.In(ids)). diff --git a/pkg/RepoManages.go b/pkg/RepoManages.go index 30169bd1..c024c422 100644 --- a/pkg/RepoManages.go +++ b/pkg/RepoManages.go @@ -261,6 +261,9 @@ func (impl RepoManagerImpl) UpdateRepo(material *sql.GitMaterial) (*sql.GitMater isSelfReferenced := existingMaterial.Id == existingMaterial.RefGitMaterialId tx, err := impl.materialRepository.GetConnection(). Begin() + + defer tx.Rollback() + if err != nil { impl.logger.Errorw("error while starting transaction", "err", err) @@ -273,6 +276,7 @@ func (impl RepoManagerImpl) UpdateRepo(material *sql.GitMaterial) (*sql.GitMater if isSelfReferenced { // as it is self referenced, need to update ref of other git materials + excludingIds := []int{existingMaterial.Id} nxtMaterial, err := impl.materialRepository.FindByReferenceId(existingMaterial.Id, 1, excludingIds) if err != nil && err != pg.ErrNoRows { @@ -280,18 +284,20 @@ func (impl RepoManagerImpl) UpdateRepo(material *sql.GitMaterial) (*sql.GitMater "err", err) return nil, err + } else { if err == nil { // If other materials found + + // update ref for all materials with same repo url err = impl.materialRepository.UpdateRefMaterialIdForAllWithSameUrl(nxtMaterial.Url, nxtMaterial.Id, tx) if err != nil { impl.logger.Errorw("error while updating ref git material id", "err", err) - - _ = tx.Rollback() return nil, err } + // refresh git material resp, err := impl.RefreshGitMaterial(&git.RefreshGitMaterialRequest{ GitMaterialId: nxtMaterial.Id, }) @@ -299,8 +305,6 @@ func (impl RepoManagerImpl) UpdateRepo(material *sql.GitMaterial) (*sql.GitMater impl.logger.Errorw("error while refreshing new referenced material", "response", resp, "err", err) - - _ = tx.Rollback() return nil, err } @@ -338,8 +342,8 @@ func (impl RepoManagerImpl) UpdateRepo(material *sql.GitMaterial) (*sql.GitMater if err != nil && err != pg.ErrNoRows { impl.logger.Errorw("error while getting next material with same repo url", "err", err) - return nil, err + } else { if err == nil { // If other materials found @@ -348,7 +352,6 @@ func (impl RepoManagerImpl) UpdateRepo(material *sql.GitMaterial) (*sql.GitMater impl.logger.Errorw("error while updating ref git material id", "err", err) - _ = tx.Rollback() return nil, err } @@ -360,7 +363,6 @@ func (impl RepoManagerImpl) UpdateRepo(material *sql.GitMaterial) (*sql.GitMater "response", resp, "err", err) - _ = tx.Rollback() return nil, err } @@ -382,7 +384,6 @@ func (impl RepoManagerImpl) UpdateRepo(material *sql.GitMaterial) (*sql.GitMater err = impl.materialRepository.UpdateWithTransaction(existingMaterial, tx) if err != nil { impl.logger.Errorw("error in updating material ", "material", material, "err", err) - _ = tx.Rollback() return nil, err } @@ -396,7 +397,6 @@ func (impl RepoManagerImpl) UpdateRepo(material *sql.GitMaterial) (*sql.GitMater err = impl.repositoryManager.Clean(existingMaterial.CheckoutLocation) if err != nil { impl.logger.Errorw("error in refreshing material ", "err", err) - _ = tx.Rollback() return nil, err } @@ -404,7 +404,6 @@ func (impl RepoManagerImpl) UpdateRepo(material *sql.GitMaterial) (*sql.GitMater err = impl.checkoutUpdatedRepo(material.Id) if err != nil { impl.logger.Errorw("error in checking out updated repo", "err", err) - _ = tx.Rollback() return nil, err } } @@ -413,7 +412,6 @@ func (impl RepoManagerImpl) UpdateRepo(material *sql.GitMaterial) (*sql.GitMater if err != nil { impl.logger.Errorw("error while committing transaction", "err", err) - _ = tx.Rollback() return nil, err } @@ -462,6 +460,8 @@ func (impl RepoManagerImpl) addRepo(material *sql.GitMaterial) (*sql.GitMaterial GetConnection(). Begin() + defer tx.Rollback() + err = impl.materialRepository.SaveWithTransaction(material, tx) if err != nil { impl.logger.Errorw("error in saving material ", @@ -486,18 +486,10 @@ func (impl RepoManagerImpl) addRepo(material *sql.GitMaterial) (*sql.GitMaterial material.RefGitMaterialId = material.Id err := impl.materialRepository.UpdateRefMaterialId(material, tx) if err != nil { - - // rollback impl.logger.Errorw("error updating ref material id with self id. performing rollback", "material", material, "err", err) - err := tx.Rollback() - if err != nil { - impl.logger.Errorw("error while performing rollback", "err", err) - return material, err - } - return material, err } diff --git a/pkg/git/Bean.go b/pkg/git/Bean.go index dd23cc09..7fe32716 100644 --- a/pkg/git/Bean.go +++ b/pkg/git/Bean.go @@ -43,13 +43,6 @@ type CiPipelineMaterialBean struct { ExtraEnvironmentVariables map[string]string // extra env variables which will be used for CI } -type CiPipelineMaterialUpdateEvent struct { - GitRepoUrl string `json:"gitRepoUrl"` - Value string `json:"value"` - GitCommit *GitCommit `json:"gitCommit"` - ExtraEnvironmentVariables map[string]string `json:"extraEnvironmentVariables,omitempty"` -} - type MaterialChangeResp struct { Commits []*GitCommit `json:"commits"` LastFetchTime time.Time `json:"lastFetchTime"` diff --git a/pkg/git/Watcher.go b/pkg/git/Watcher.go index d42a6abc..7db502cb 100644 --- a/pkg/git/Watcher.go +++ b/pkg/git/Watcher.go @@ -314,11 +314,7 @@ func (impl GitWatcherImpl) PollGitMaterialAndNotify(material *sql.GitMaterial) e } // Notify - _ = impl.NotifyForMaterialUpdate(&CiPipelineMaterialUpdateEvent{ - GitRepoUrl: material.Url, - Value: ciMaterial.Value, - GitCommit: latestCommit, - }) + _ = impl.NotifyForMaterialUpdate(material.Url, ciMaterial.Value, latestCommit) middleware.GitMaterialUpdateCounter.WithLabelValues().Inc() } @@ -326,24 +322,45 @@ func (impl GitWatcherImpl) PollGitMaterialAndNotify(material *sql.GitMaterial) e return nil } -func (impl GitWatcherImpl) NotifyForMaterialUpdate(event *CiPipelineMaterialUpdateEvent) error { +func (impl GitWatcherImpl) NotifyForMaterialUpdate(repoUrl string, branch string, commit *GitCommit) error { - payload, err := json.Marshal(event) + ciMaterials, err := impl.ciPipelineMaterialRepository.FindSimilarCiPipelineMaterials(repoUrl, branch) if err != nil { - impl.logger.Error("err in json marshaling", - "event", event, + impl.logger.Errorw("error while fetching similar ci pipeline materials", + "repoUrl", repoUrl, + "branch", branch, "err", err) - return err } - err = impl.pubSubClient.Publish(pubsub.NEW_CI_MATERIAL_TOPIC, string(payload)) - if err != nil { - impl.logger.Errorw("error in publishing material modification msg ", - "payload", payload, - "err", err) + for _, ciMaterial := range ciMaterials { - return err + event := &CiPipelineMaterialBean{ + Id: ciMaterial.Id, + GitMaterialId: ciMaterial.GitMaterialId, + Active: ciMaterial.Active, + GitCommit: commit, + Type: ciMaterial.Type, + Value: ciMaterial.Value, + } + + payload, err := json.Marshal(event) + if err != nil { + impl.logger.Error("err in json marshaling", + "event", event, + "err", err) + + return err + } + + err = impl.pubSubClient.Publish(pubsub.NEW_CI_MATERIAL_TOPIC, string(payload)) + if err != nil { + impl.logger.Errorw("error in publishing material modification msg ", + "payload", payload, + "err", err) + + return err + } } return nil } From 2bfdbdb1c85ed016fe22e099132676c29ae65c24 Mon Sep 17 00:00:00 2001 From: Vishal Bihani Date: Thu, 13 Apr 2023 01:39:06 +0530 Subject: [PATCH 18/24] fix multiple ci-material events issue --- pkg/git/Watcher.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/git/Watcher.go b/pkg/git/Watcher.go index 7db502cb..b3b07d67 100644 --- a/pkg/git/Watcher.go +++ b/pkg/git/Watcher.go @@ -311,11 +311,10 @@ func (impl GitWatcherImpl) PollGitMaterialAndNotify(material *sql.GitMaterial) e impl.logger.Errorw("error while updating ci pipeline materials", "err", err) } - } - - // Notify - _ = impl.NotifyForMaterialUpdate(material.Url, ciMaterial.Value, latestCommit) + // Notify only if new changes are detected + _ = impl.NotifyForMaterialUpdate(material.Url, ciMaterial.Value, latestCommit) + } middleware.GitMaterialUpdateCounter.WithLabelValues().Inc() } } From cc8e58e9118f1d168e7f80b8a9c7cd7638270278 Mon Sep 17 00:00:00 2001 From: Vishal Bihani Date: Thu, 13 Apr 2023 02:32:09 +0530 Subject: [PATCH 19/24] fix get release changes issue --- pkg/RepoManages.go | 12 ++++++------ pkg/git/Watcher.go | 4 +++- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/pkg/RepoManages.go b/pkg/RepoManages.go index c024c422..806f57e3 100644 --- a/pkg/RepoManages.go +++ b/pkg/RepoManages.go @@ -874,20 +874,20 @@ func (impl RepoManagerImpl) GetReleaseChanges(request *ReleaseChangesRequest) (* if err != nil { return nil, err } - gitMaterial, err := impl.materialRepository.FindById(pipelineMaterial.GitMaterialId) + referencedGitMaterial, err := impl.materialRepository.FindReferencedGitMaterial(pipelineMaterial.GitMaterialId) if err != nil { return nil, err } - if !gitMaterial.CheckoutStatus { - return nil, fmt.Errorf("checkout not succeed please checkout first %s", gitMaterial.Url) + if !referencedGitMaterial.CheckoutStatus { + return nil, fmt.Errorf("checkout not succeed please checkout first %s", referencedGitMaterial.Url) } - repoLock := impl.locker.LeaseLocker(gitMaterial.Id) + repoLock := impl.locker.LeaseLocker(referencedGitMaterial.Id) repoLock.Mutex.Lock() defer func() { repoLock.Mutex.Unlock() - impl.locker.ReturnLocker(gitMaterial.Id) + impl.locker.ReturnLocker(referencedGitMaterial.Id) }() - gitChanges, err := impl.repositoryManager.ChangesSinceByRepositoryForAnalytics(gitMaterial.CheckoutLocation, pipelineMaterial.Value, request.OldCommit, request.NewCommit) + gitChanges, err := impl.repositoryManager.ChangesSinceByRepositoryForAnalytics(referencedGitMaterial.CheckoutLocation, pipelineMaterial.Value, request.OldCommit, request.NewCommit) if err != nil { impl.logger.Errorw("error in computing changes", "req", request, "err", err) } else { diff --git a/pkg/git/Watcher.go b/pkg/git/Watcher.go index b3b07d67..febfe32f 100644 --- a/pkg/git/Watcher.go +++ b/pkg/git/Watcher.go @@ -258,7 +258,9 @@ func (impl GitWatcherImpl) PollGitMaterialAndNotify(material *sql.GitMaterial) e if !updated { impl.logger.Infow("no new updates found", - "material", material) + "materialId", material.Id, + "refMaterialId", material.RefGitMaterialId, + "url", material.Url) return nil } From 7125c8c15fe44cdcb08a45f17f96acd1dc9a5626 Mon Sep 17 00:00:00 2001 From: Vishal Bihani Date: Fri, 14 Apr 2023 02:40:46 +0530 Subject: [PATCH 20/24] fix update repo issue --- pkg/RepoManages.go | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/pkg/RepoManages.go b/pkg/RepoManages.go index 806f57e3..ef3058ac 100644 --- a/pkg/RepoManages.go +++ b/pkg/RepoManages.go @@ -270,12 +270,18 @@ func (impl RepoManagerImpl) UpdateRepo(material *sql.GitMaterial) (*sql.GitMater return nil, err } - // if url / deleted is updated, ref_git_material_id needs to be updated as well + // if url is updated, ref_git_material_id needs to be updated as well if material.Url != existingMaterial.Url { // url is updated + impl.logger.Infow("repo url is updated", + "materialId", material.Id, + "oldUrl", existingMaterial.Url, + "newUrl", material.Url) if isSelfReferenced { // as it is self referenced, need to update ref of other git materials + impl.logger.Infow("git material is referenced by other git materials, changing ref for other git materials", + "materialId", material.Id) excludingIds := []int{existingMaterial.Id} nxtMaterial, err := impl.materialRepository.FindByReferenceId(existingMaterial.Id, 1, excludingIds) @@ -295,15 +301,16 @@ func (impl RepoManagerImpl) UpdateRepo(material *sql.GitMaterial) (*sql.GitMater impl.logger.Errorw("error while updating ref git material id", "err", err) return nil, err + } else { + impl.logger.Infow("ref updated for other git materials", + "new ref git material id", nxtMaterial.Id) } // refresh git material - resp, err := impl.RefreshGitMaterial(&git.RefreshGitMaterialRequest{ - GitMaterialId: nxtMaterial.Id, - }) + nxtMaterial.RefGitMaterialId = nxtMaterial.Id + _, err := impl.gitWatcher.PollAndUpdateGitMaterial(nxtMaterial) if err != nil { impl.logger.Errorw("error while refreshing new referenced material", - "response", resp, "err", err) return nil, err } @@ -337,6 +344,8 @@ func (impl RepoManagerImpl) UpdateRepo(material *sql.GitMaterial) (*sql.GitMater if material.Deleted && isSelfReferenced { // as it is self referenced, need to update ref of other git materials + impl.logger.Info("git material delete request. updating ref of other git materials") + excludingIds := []int{existingMaterial.Id} nxtMaterial, err := impl.materialRepository.FindByReferenceId(existingMaterial.Id, 1, excludingIds) if err != nil && err != pg.ErrNoRows { @@ -353,14 +362,15 @@ func (impl RepoManagerImpl) UpdateRepo(material *sql.GitMaterial) (*sql.GitMater "err", err) return nil, err + } else { + impl.logger.Infow("ref updated for other git materials", + "new ref git material id", nxtMaterial.Id) } - resp, err := impl.RefreshGitMaterial(&git.RefreshGitMaterialRequest{ - GitMaterialId: nxtMaterial.Id, - }) + nxtMaterial.RefGitMaterialId = nxtMaterial.Id + _, err := impl.gitWatcher.PollAndUpdateGitMaterial(nxtMaterial) if err != nil { impl.logger.Errorw("error while refreshing new referenced material", - "response", resp, "err", err) return nil, err From a2f7e08f5073c15be6211ecc050492d40804f839 Mon Sep 17 00:00:00 2001 From: Vishal Bihani Date: Fri, 14 Apr 2023 03:15:49 +0530 Subject: [PATCH 21/24] fix blocked sql update --- pkg/RepoManages.go | 139 +++++++++++++++++++++++---------------------- pkg/git/Watcher.go | 1 + 2 files changed, 72 insertions(+), 68 deletions(-) diff --git a/pkg/RepoManages.go b/pkg/RepoManages.go index ef3058ac..efae21ac 100644 --- a/pkg/RepoManages.go +++ b/pkg/RepoManages.go @@ -27,6 +27,7 @@ import ( "github.com/go-pg/pg" _ "github.com/robfig/cron/v3" "go.uber.org/zap" + "time" ) type RepoManager interface { @@ -251,6 +252,70 @@ func (impl RepoManagerImpl) AddRepo(materials []*sql.GitMaterial) ([]*sql.GitMat return materials, nil } +func (impl RepoManagerImpl) ReconfigureRefGitMaterialId(currentRef int, tx *pg.Tx) (*sql.GitMaterial, error) { + + excludingIds := []int{currentRef} + nxtMaterial, err := impl.materialRepository.FindByReferenceId(currentRef, 1, excludingIds) + if err != nil && err != pg.ErrNoRows { + impl.logger.Errorw("error while getting next material with same repo url", + "err", err) + + return nil, err + + } else if err == pg.ErrNoRows { + // If no other materials found with same url + impl.logger.Infow("no other git materials found referencing given material id", + "materialId", currentRef) + return nil, nil + } + + // If other materials found + // update ref for all materials with same repo url + err = impl.materialRepository.UpdateRefMaterialIdForAllWithSameUrl(nxtMaterial.Url, nxtMaterial.Id, tx) + if err != nil { + impl.logger.Errorw("error while updating ref git material id", + "err", err) + return nil, err + } else { + impl.logger.Infow("ref updated for other git materials", + "new ref git material id", nxtMaterial.Id) + } + + // refresh git material + nxtMaterial.RefGitMaterialId = nxtMaterial.Id + + // poll and update git material with transaction + repoLock := impl.locker.LeaseLocker(nxtMaterial.Id) + repoLock.Mutex.Lock() + defer func() { + repoLock.Mutex.Unlock() + impl.locker.ReturnLocker(nxtMaterial.Id) + }() + + err = impl.gitWatcher.PollGitMaterialAndNotify(nxtMaterial) + + // update last fetch status, counters and time + nxtMaterial.LastFetchTime = time.Now() + nxtMaterial.FetchStatus = err == nil + if err != nil { + nxtMaterial.LastFetchErrorCount = nxtMaterial.LastFetchErrorCount + 1 + nxtMaterial.FetchErrorMessage = err.Error() + } else { + nxtMaterial.LastFetchErrorCount = 0 + nxtMaterial.FetchErrorMessage = "" + } + + err = impl.materialRepository.UpdateWithTransaction(nxtMaterial, tx) + if err != nil { + impl.logger.Errorw("error in updating fetch status", + "material", nxtMaterial, + "err", err) + + return nil, err + } + return nxtMaterial, nil +} + func (impl RepoManagerImpl) UpdateRepo(material *sql.GitMaterial) (*sql.GitMaterial, error) { existingMaterial, err := impl.materialRepository.FindById(material.Id) if err != nil { @@ -283,43 +348,11 @@ func (impl RepoManagerImpl) UpdateRepo(material *sql.GitMaterial) (*sql.GitMater impl.logger.Infow("git material is referenced by other git materials, changing ref for other git materials", "materialId", material.Id) - excludingIds := []int{existingMaterial.Id} - nxtMaterial, err := impl.materialRepository.FindByReferenceId(existingMaterial.Id, 1, excludingIds) - if err != nil && err != pg.ErrNoRows { - impl.logger.Errorw("error while getting next material with same repo url", + _, err := impl.ReconfigureRefGitMaterialId(existingMaterial.RefGitMaterialId, tx) + if err != nil { + impl.logger.Errorw("error reconfiguring ref for other materials", "err", err) - return nil, err - - } else { - if err == nil { - // If other materials found - - // update ref for all materials with same repo url - err = impl.materialRepository.UpdateRefMaterialIdForAllWithSameUrl(nxtMaterial.Url, nxtMaterial.Id, tx) - if err != nil { - impl.logger.Errorw("error while updating ref git material id", - "err", err) - return nil, err - } else { - impl.logger.Infow("ref updated for other git materials", - "new ref git material id", nxtMaterial.Id) - } - - // refresh git material - nxtMaterial.RefGitMaterialId = nxtMaterial.Id - _, err := impl.gitWatcher.PollAndUpdateGitMaterial(nxtMaterial) - if err != nil { - impl.logger.Errorw("error while refreshing new referenced material", - "err", err) - return nil, err - } - - } else if err == pg.ErrNoRows { - // If no other materials found with same url - impl.logger.Infow("no other git materials found referencing given material id", - "materialId", existingMaterial.Id) - } } } @@ -346,41 +379,11 @@ func (impl RepoManagerImpl) UpdateRepo(material *sql.GitMaterial) (*sql.GitMater // as it is self referenced, need to update ref of other git materials impl.logger.Info("git material delete request. updating ref of other git materials") - excludingIds := []int{existingMaterial.Id} - nxtMaterial, err := impl.materialRepository.FindByReferenceId(existingMaterial.Id, 1, excludingIds) - if err != nil && err != pg.ErrNoRows { - impl.logger.Errorw("error while getting next material with same repo url", + _, err := impl.ReconfigureRefGitMaterialId(existingMaterial.RefGitMaterialId, tx) + if err != nil { + impl.logger.Errorw("error reconfiguring ref for other materials", "err", err) return nil, err - - } else { - if err == nil { - // If other materials found - err = impl.materialRepository.UpdateRefMaterialIdForAllWithSameUrl(nxtMaterial.Url, nxtMaterial.Id, tx) - if err != nil { - impl.logger.Errorw("error while updating ref git material id", - "err", err) - - return nil, err - } else { - impl.logger.Infow("ref updated for other git materials", - "new ref git material id", nxtMaterial.Id) - } - - nxtMaterial.RefGitMaterialId = nxtMaterial.Id - _, err := impl.gitWatcher.PollAndUpdateGitMaterial(nxtMaterial) - if err != nil { - impl.logger.Errorw("error while refreshing new referenced material", - "err", err) - - return nil, err - } - - } else if err == pg.ErrNoRows { - // If no other materials found with same url - impl.logger.Infow("no other git materials found referencing given material id", - "materialId", existingMaterial.Id) - } } } diff --git a/pkg/git/Watcher.go b/pkg/git/Watcher.go index febfe32f..1a77db18 100644 --- a/pkg/git/Watcher.go +++ b/pkg/git/Watcher.go @@ -44,6 +44,7 @@ type GitWatcherImpl struct { type GitWatcher interface { PollAndUpdateGitMaterial(material *sql.GitMaterial) (*sql.GitMaterial, error) + PollGitMaterialAndNotify(material *sql.GitMaterial) error } type PollConfig struct { From 96dc522c46188bd858591a514dbbe6e53f48520a Mon Sep 17 00:00:00 2001 From: Vishal Bihani Date: Fri, 14 Apr 2023 04:25:47 +0530 Subject: [PATCH 22/24] fix blocked sql update --- pkg/RepoManages.go | 49 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/pkg/RepoManages.go b/pkg/RepoManages.go index efae21ac..7e04fc48 100644 --- a/pkg/RepoManages.go +++ b/pkg/RepoManages.go @@ -52,6 +52,8 @@ type RepoManager interface { GetWebhookEventConfig(eventId int) (*git.WebhookEventConfig, error) GetWebhookPayloadDataForPipelineMaterialId(request *git.WebhookPayloadDataRequest) (*git.WebhookPayloadDataResponse, error) GetWebhookPayloadFilterDataForPipelineMaterialId(request *git.WebhookPayloadFilterDataRequest) (*git.WebhookPayloadFilterDataResponse, error) + + CheckoutMaterialWithTransaction(material *sql.GitMaterial, tx *pg.Tx) (*sql.GitMaterial, error) } type RepoManagerImpl struct { @@ -414,7 +416,7 @@ func (impl RepoManagerImpl) UpdateRepo(material *sql.GitMaterial) (*sql.GitMater } if !existingMaterial.Deleted { - err = impl.checkoutUpdatedRepo(material.Id) + _, err = impl.CheckoutMaterialWithTransaction(existingMaterial, tx) if err != nil { impl.logger.Errorw("error in checking out updated repo", "err", err) return nil, err @@ -519,6 +521,51 @@ func (impl RepoManagerImpl) addRepo(material *sql.GitMaterial) (*sql.GitMaterial return impl.checkoutRepo(material) } +func (impl RepoManagerImpl) CheckoutMaterialWithTransaction(material *sql.GitMaterial, tx *pg.Tx) (*sql.GitMaterial, error) { + repoLock := impl.locker.LeaseLocker(material.Id) + repoLock.Mutex.Lock() + defer func() { + repoLock.Mutex.Unlock() + impl.locker.ReturnLocker(material.Id) + }() + + username, password, err := git.GetUserNamePassword(material.GitProvider) + location, err := git.GetLocationForMaterial(material) + if err != nil { + impl.logger.Errorw("error in getting credentials/location", + "materialId", material.Id, + "authMode", material.GitProvider.AuthMode, + "url", material.Url, + "err", err) + + return nil, err + } + + err = impl.repositoryManager.Add(material.GitProviderId, location, material.Url, username, password, + material.GitProvider.AuthMode, material.GitProvider.SshPrivateKey) + + if err == nil { + material.CheckoutLocation = location + material.CheckoutStatus = true + } else { + material.CheckoutStatus = false + material.CheckoutMsgAny = err.Error() + material.FetchErrorMessage = err.Error() + } + + err = impl.materialRepository.UpdateWithTransaction(material, tx) + if err != nil { + impl.logger.Errorw("error in updating material", + "err", err, + "material", material) + return nil, err + } + + // Not updating ci pipeline materials, as it will be polled in the referenced repo + // if the material is self-referenced, then watcher will perform it. + return material, nil +} + func (impl RepoManagerImpl) checkoutRepo(material *sql.GitMaterial) (*sql.GitMaterial, error) { repoLock := impl.locker.LeaseLocker(material.Id) repoLock.Mutex.Lock() From 128615e99a391f6979025478cff6011e25270db6 Mon Sep 17 00:00:00 2001 From: Vishal Bihani Date: Fri, 14 Apr 2023 04:41:52 +0530 Subject: [PATCH 23/24] fix double repo lock --- pkg/RepoManages.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pkg/RepoManages.go b/pkg/RepoManages.go index 7e04fc48..81ae6e80 100644 --- a/pkg/RepoManages.go +++ b/pkg/RepoManages.go @@ -522,12 +522,6 @@ func (impl RepoManagerImpl) addRepo(material *sql.GitMaterial) (*sql.GitMaterial } func (impl RepoManagerImpl) CheckoutMaterialWithTransaction(material *sql.GitMaterial, tx *pg.Tx) (*sql.GitMaterial, error) { - repoLock := impl.locker.LeaseLocker(material.Id) - repoLock.Mutex.Lock() - defer func() { - repoLock.Mutex.Unlock() - impl.locker.ReturnLocker(material.Id) - }() username, password, err := git.GetUserNamePassword(material.GitProvider) location, err := git.GetLocationForMaterial(material) From b7abafac580c5299f3acd19cf662dd5b7c0747db Mon Sep 17 00:00:00 2001 From: Vishal Bihani Date: Fri, 14 Apr 2023 05:04:48 +0530 Subject: [PATCH 24/24] fix ci pipeline material not updating after material update --- internal/sql/CiPipelineMaterial.go | 15 +++++++++ internal/sql/GitMaterial.go | 14 +++++++++ pkg/RepoManages.go | 49 ++++++++++++++++++++++++++++-- 3 files changed, 75 insertions(+), 3 deletions(-) diff --git a/internal/sql/CiPipelineMaterial.go b/internal/sql/CiPipelineMaterial.go index 40dc4d53..bdb22942 100644 --- a/internal/sql/CiPipelineMaterial.go +++ b/internal/sql/CiPipelineMaterial.go @@ -41,6 +41,7 @@ type CiPipelineMaterial struct { type CiPipelineMaterialRepository interface { FindByGitMaterialId(gitMaterialId int) ([]*CiPipelineMaterial, error) Update(material []*CiPipelineMaterial) error + UpdateWithTransaction(materials []*CiPipelineMaterial, tx *pg.Tx) error FindByIds(ids []int) ([]*CiPipelineMaterial, error) FindById(id int) (*CiPipelineMaterial, error) Exists(id int) (bool, error) @@ -93,6 +94,20 @@ func (impl CiPipelineMaterialRepositoryImpl) Update(materials []*CiPipelineMater return err } +func (impl CiPipelineMaterialRepositoryImpl) UpdateWithTransaction(materials []*CiPipelineMaterial, tx *pg.Tx) error { + + for _, material := range materials { + _, err := tx.Model(material). + WherePK(). + UpdateNotNull() + + if err != nil { + return err + } + } + return nil +} + func (impl CiPipelineMaterialRepositoryImpl) FindSimilarCiPipelineMaterials(repoUrl string, branch string) ([]*CiPipelineMaterial, error) { materials := make([]*CiPipelineMaterial, 0) err := impl.dbConnection.Model(&materials). diff --git a/internal/sql/GitMaterial.go b/internal/sql/GitMaterial.go index baffd568..eb1194a4 100644 --- a/internal/sql/GitMaterial.go +++ b/internal/sql/GitMaterial.go @@ -56,6 +56,7 @@ type GitMaterial struct { type MaterialRepository interface { GetConnection() *pg.DB FindById(id int) (*GitMaterial, error) + FindByIdWithCiMaterials(id int) (*GitMaterial, error) Update(material *GitMaterial) error Save(material *GitMaterial) error SaveWithTransaction(material *GitMaterial, tx *pg.Tx) error @@ -138,6 +139,19 @@ func (repo MaterialRepositoryImpl) FindById(id int) (*GitMaterial, error) { return &material, err } +func (repo MaterialRepositoryImpl) FindByIdWithCiMaterials(id int) (*GitMaterial, error) { + var material GitMaterial + err := repo.dbConnection.Model(&material). + Column("git_material.*", "GitProvider"). + Relation("CiPipelineMaterials", func(q *orm.Query) (*orm.Query, error) { + return q.Where("active IS TRUE"), nil + }). + Where("git_material.id =? ", id). + Where("git_material.deleted =? ", false). + Select() + return &material, err +} + func (repo MaterialRepositoryImpl) FindAllActiveByUrls(urls []string) ([]*GitMaterial, error) { var materials []*GitMaterial err := repo.dbConnection.Model(&materials). diff --git a/pkg/RepoManages.go b/pkg/RepoManages.go index 81ae6e80..5d518361 100644 --- a/pkg/RepoManages.go +++ b/pkg/RepoManages.go @@ -174,6 +174,44 @@ func (impl RepoManagerImpl) InactivateWebhookDataMappingForPipelineMaterials(old return nil } +func (impl RepoManagerImpl) UpdatePipelineMaterialCommitWithTransaction(material *sql.GitMaterial, tx *pg.Tx) error { + newCiMaterials := make([]*sql.CiPipelineMaterial, 0, len(material.CiPipelineMaterials)) + + for _, ciMaterial := range material.CiPipelineMaterials { + + commits, err := impl.repositoryManager.ChangesSince(material.CheckoutLocation, ciMaterial.Value, "", "", 0) + if err != nil { + ciMaterial.Errored = true + ciMaterial.ErrorMsg = err.Error() + ciMaterial.LastSeenHash = "" + + } else { + + // commits found + b, err := json.Marshal(commits) + if err != nil { + ciMaterial.Errored = true + ciMaterial.ErrorMsg = err.Error() + ciMaterial.LastSeenHash = "" + + } else { + ciMaterial.CommitHistory = string(b) + if len(commits) > 0 { + latestCommit := commits[0] + ciMaterial.LastSeenHash = latestCommit.Commit + ciMaterial.CommitAuthor = latestCommit.Author + ciMaterial.CommitDate = latestCommit.Date + } + ciMaterial.Errored = false + ciMaterial.ErrorMsg = "" + } + } + newCiMaterials = append(newCiMaterials, ciMaterial) + } + err := impl.ciPipelineMaterialRepository.UpdateWithTransaction(newCiMaterials, tx) + return err +} + func (impl RepoManagerImpl) updatePipelineMaterialCommit(materials []*sql.CiPipelineMaterial) error { var materialCommits []*sql.CiPipelineMaterial for _, pipelineMaterial := range materials { @@ -319,7 +357,7 @@ func (impl RepoManagerImpl) ReconfigureRefGitMaterialId(currentRef int, tx *pg.T } func (impl RepoManagerImpl) UpdateRepo(material *sql.GitMaterial) (*sql.GitMaterial, error) { - existingMaterial, err := impl.materialRepository.FindById(material.Id) + existingMaterial, err := impl.materialRepository.FindByIdWithCiMaterials(material.Id) if err != nil { impl.logger.Errorw("error in fetching material", err) return nil, err @@ -555,8 +593,13 @@ func (impl RepoManagerImpl) CheckoutMaterialWithTransaction(material *sql.GitMat return nil, err } - // Not updating ci pipeline materials, as it will be polled in the referenced repo - // if the material is self-referenced, then watcher will perform it. + err = impl.UpdatePipelineMaterialCommitWithTransaction(material, tx) + if err != nil { + impl.logger.Errorw("error while updating pipeline material commit", + "err", err) + return nil, err + } + return material, nil }