Skip to content

feat: disable polling for repos with same url #45

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 106 additions & 6 deletions internal/sql/CiPipelineMaterial.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
)

type CiPipelineMaterial struct {
tableName struct{} `sql:"ci_pipeline_material"`
tableName struct{} `sql:"ci_pipeline_material" pg:",discard_unknown_columns"`
Id int `sql:"id"`
GitMaterialId int `sql:"git_material_id"` //id stored in db GitMaterial( foreign key)
Type SourceType `sql:"type"`
Expand All @@ -32,20 +32,24 @@ type CiPipelineMaterial struct {
LastSeenHash string `sql:"last_seen_hash,notnull"`
CommitAuthor string `sql:"commit_author"`
CommitDate time.Time `sql:"commit_date"`

CommitHistory string `sql:"commit_history"` //last five commit for caching purpose1
Errored bool `sql:"errored,notnull"`
ErrorMsg string `sql:"error_msg,notnull"`
}

CommitHistory string `sql:"commit_history"` //last five commit for caching purpose1
Errored bool `sql:"errored,notnull"`
ErrorMsg string `sql:"error_msg,notnull"`
}

type CiPipelineMaterialRepository interface {
FindByGitMaterialId(gitMaterialId int) ([]*CiPipelineMaterial, error)
Update(material []*CiPipelineMaterial) error
UpdateWithTransaction(materials []*CiPipelineMaterial, tx *pg.Tx) error
FindByIds(ids []int) ([]*CiPipelineMaterial, error)
FindById(id int) (*CiPipelineMaterial, error)
Exists(id int) (bool, error)
Save(material []*CiPipelineMaterial) ([]*CiPipelineMaterial, error)
FindSimilarCiPipelineMaterials(repoUrl string, branch string) ([]*CiPipelineMaterial, error)
FindAllCiPipelineMaterialsReferencingGivenMaterial(gitMaterialId int) ([]*CiPipelineMaterial, error)
UpdateErroredCiPipelineMaterialsReferencingGivenGitMaterial(gitMaterialId int, branch string, material *CiPipelineMaterial) error
UpdateCiPipelineMaterialsReferencingGivenGitMaterial(gitMaterialId int, branch string, material *CiPipelineMaterial) error
}

type CiPipelineMaterialRepositoryImpl struct {
Expand Down Expand Up @@ -90,6 +94,33 @@ func (impl CiPipelineMaterialRepositoryImpl) Update(materials []*CiPipelineMater
return err
}

func (impl CiPipelineMaterialRepositoryImpl) UpdateWithTransaction(materials []*CiPipelineMaterial, tx *pg.Tx) error {

for _, material := range materials {
_, err := tx.Model(material).
WherePK().
UpdateNotNull()

if err != nil {
return err
}
}
return nil
}

func (impl CiPipelineMaterialRepositoryImpl) FindSimilarCiPipelineMaterials(repoUrl string, branch string) ([]*CiPipelineMaterial, error) {
materials := make([]*CiPipelineMaterial, 0)
err := impl.dbConnection.Model(&materials).
Join("INNER JOIN git_material gm ON gm.id = ci_pipeline_material.git_material_id").
Where("gm.url = ?", repoUrl).
Where("gm.deleted = ?", false).
Where("ci_pipeline_material.value = ?", branch).
Where("ci_pipeline_material.active = ?", true).
Select()

return materials, err
}

func (impl CiPipelineMaterialRepositoryImpl) FindByIds(ids []int) (materials []*CiPipelineMaterial, err error) {
err = impl.dbConnection.Model(&materials).
Where("id in (?)", pg.In(ids)).
Expand All @@ -104,3 +135,72 @@ func (impl CiPipelineMaterialRepositoryImpl) FindById(id int) (*CiPipelineMateri
Where("active = ?", true).Select()
return materials, err
}

func (impl CiPipelineMaterialRepositoryImpl) FindAllCiPipelineMaterialsReferencingGivenMaterial(gitMaterialId int) ([]*CiPipelineMaterial, error) {
ciPipelineMaterials := make([]*CiPipelineMaterial, 0)
//err := impl.dbConnection.Model(&ciPipelineMaterials).
// ColumnExpr("DISTINCT ci_pipeline_material.value").
// Column("ci_pipeline_material.*").
// Join("INNER JOIN git_material gm ON gm.id = ci_pipeline_material.git_material_id").
// Where("gm.ref_git_material_id = ?", gitMaterialId).
// Where("gm.deleted = ?", false).
// Where("ci_pipeline_material.active = ?", true).
// Where("ci_pipeline_material.type = ?", "SOURCE_TYPE_BRANCH_FIXED").
// Select()

query := "select * from (" +
"select *, row_number() over (partition by value order by commit_date desc) as row_num from ci_pipeline_material cpm " +
"inner join git_material gm on gm.id = cpm.git_material_id " +
"where (gm.ref_git_material_id = ?) and (gm.deleted = false) and (cpm.active = true) and (cpm.\"type\" = 'SOURCE_TYPE_BRANCH_FIXED')" +
") materials where row_num <= 1"

_, err := impl.dbConnection.Query(&ciPipelineMaterials, query, gitMaterialId)
return ciPipelineMaterials, err
}

func (impl CiPipelineMaterialRepositoryImpl) UpdateErroredCiPipelineMaterialsReferencingGivenGitMaterial(gitMaterialId int, branch string, material *CiPipelineMaterial) error {

//_, err := impl.dbConnection.Model(material).
// Column("ci_pipeline_material.errored", "ci_pipeline_material.error_msg").
// Join("INNER JOIN git_material gm ON gm.id = ci_pipeline_material.git_material_id").
// Where("gm.ref_git_material_id = ?", gitMaterialId).
// Where("ci_pipeline_material.value = ?", branch).
// Update()

query := "UPDATE ci_pipeline_material cpm " +
"SET errored = ?, " +
"error_msg = ? " +
"FROM git_material gm WHERE (cpm.git_material_id = gm.id) " +
"AND (gm.ref_git_material_id = ?) " +
"AND (cpm.value = ?)"

_, err := impl.dbConnection.Query(material, query, material.Errored, material.ErrorMsg, gitMaterialId, branch)
return err
}

func (impl CiPipelineMaterialRepositoryImpl) UpdateCiPipelineMaterialsReferencingGivenGitMaterial(gitMaterialId int, branch string, material *CiPipelineMaterial) error {

//_, err := impl.dbConnection.Model(material).
// Column("ci_pipeline_material.last_seen_hash", "ci_pipeline_material.commit_author", "ci_pipeline_material.commit_date",
// "ci_pipeline_material.commit_history", "ci_pipeline_material.errored", "ci_pipeline_material.error_msg").
// Join("INNER JOIN git_material gm ON gm.id = ci_pipeline_material.git_material_id").
// Where("gm.ref_git_material_id = ?", gitMaterialId).
// Where("ci_pipeline_material.value = ?", branch).
// Update()

query := "UPDATE ci_pipeline_material cpm " +
"SET errored = ?, " +
"error_msg = ?, " +
"last_seen_hash = ?, " +
"commit_author = ?, " +
"commit_date = ?, " +
"commit_history = ? " +
"FROM git_material gm WHERE (cpm.git_material_id = gm.id) " +
"AND (gm.ref_git_material_id = ?) " +
"AND (cpm.value = ?)"

_, err := impl.dbConnection.Query(material, query, material.Errored, material.ErrorMsg, material.LastSeenHash,
material.CommitAuthor, material.CommitDate, material.CommitHistory, gitMaterialId, branch)

return err
}
127 changes: 123 additions & 4 deletions internal/sql/GitMaterial.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ const (
SOURCE_TYPE_WEBHOOK SourceType = "WEBHOOK"
)

//TODO: add support for submodule
// TODO: add support for submodule
type GitMaterial struct {
tableName struct{} `sql:"git_material"`
tableName struct{} `sql:"git_material" pg:",discard_unknown_columns"`
Id int `sql:"id,pk"`
GitProviderId int `sql:"git_provider_id,notnull"`
Url string `sql:"url,omitempty"`
Expand All @@ -48,17 +48,28 @@ type GitMaterial struct {
FetchStatus bool `json:"fetch_status"`
LastFetchErrorCount int `json:"last_fetch_error_count"` //continues fetch error
FetchErrorMessage string `json:"fetch_error_message"`
RefGitMaterialId int `sql:"ref_git_material_id"`
GitProvider *GitProvider
CiPipelineMaterials []*CiPipelineMaterial
}

type MaterialRepository interface {
GetConnection() *pg.DB
FindById(id int) (*GitMaterial, error)
FindByIdWithCiMaterials(id int) (*GitMaterial, error)
Update(material *GitMaterial) error
Save(material *GitMaterial) error
SaveWithTransaction(material *GitMaterial, tx *pg.Tx) error
FindActive() ([]*GitMaterial, error)
FindAll() ([]*GitMaterial, error)
FindAllActiveByUrls(urls []string) ([]*GitMaterial, error)
FindAllReferencedGitMaterials() ([]*GitMaterial, error)
FindReferencedGitMaterial(materialId int) (*GitMaterial, error)
GetMaterialWithSameRepoUrl(repoUrl string, limit int) (*GitMaterial, error)
UpdateRefMaterialId(material *GitMaterial, tx *pg.Tx) error
FindByReferenceId(refGitMaterialId int, limit int, excludingIds []int) (*GitMaterial, error)
UpdateWithTransaction(material *GitMaterial, tx *pg.Tx) error
UpdateRefMaterialIdForAllWithSameUrl(url string, refGitMaterialId int, tx *pg.Tx) error
}
type MaterialRepositoryImpl struct {
dbConnection *pg.DB
Expand All @@ -68,20 +79,37 @@ func NewMaterialRepositoryImpl(dbConnection *pg.DB) *MaterialRepositoryImpl {
return &MaterialRepositoryImpl{dbConnection: dbConnection}
}

func (repo MaterialRepositoryImpl) GetConnection() *pg.DB {
return repo.dbConnection
}

func (repo MaterialRepositoryImpl) Save(material *GitMaterial) error {
err := repo.dbConnection.Insert(material)
return err
}

func (repo MaterialRepositoryImpl) SaveWithTransaction(material *GitMaterial, tx *pg.Tx) error {
err := tx.Insert(material)
return err
}

func (repo MaterialRepositoryImpl) Update(material *GitMaterial) error {
_, err := repo.dbConnection.Model(material).WherePK().Update()
return err
}

func (repo MaterialRepositoryImpl) UpdateWithTransaction(material *GitMaterial, tx *pg.Tx) error {
_, err := tx.Model(material).
WherePK().
Update()

return err
}

func (repo MaterialRepositoryImpl) FindActive() ([]*GitMaterial, error) {
var materials []*GitMaterial
err := repo.dbConnection.Model(&materials).
Column("git_material.*", "GitProvider", ).
Column("git_material.*", "GitProvider").
Relation("CiPipelineMaterials", func(q *orm.Query) (*orm.Query, error) {
return q.Where("active IS TRUE"), nil
}).
Expand Down Expand Up @@ -111,7 +139,20 @@ func (repo MaterialRepositoryImpl) FindById(id int) (*GitMaterial, error) {
return &material, err
}

func (repo MaterialRepositoryImpl) FindAllActiveByUrls(urls[] string) ([]*GitMaterial, error) {
func (repo MaterialRepositoryImpl) FindByIdWithCiMaterials(id int) (*GitMaterial, error) {
var material GitMaterial
err := repo.dbConnection.Model(&material).
Column("git_material.*", "GitProvider").
Relation("CiPipelineMaterials", func(q *orm.Query) (*orm.Query, error) {
return q.Where("active IS TRUE"), nil
}).
Where("git_material.id =? ", id).
Where("git_material.deleted =? ", false).
Select()
return &material, err
}

func (repo MaterialRepositoryImpl) FindAllActiveByUrls(urls []string) ([]*GitMaterial, error) {
var materials []*GitMaterial
err := repo.dbConnection.Model(&materials).
Relation("CiPipelineMaterials", func(q *orm.Query) (*orm.Query, error) {
Expand All @@ -122,3 +163,81 @@ func (repo MaterialRepositoryImpl) FindAllActiveByUrls(urls[] string) ([]*GitMat
Select()
return materials, err
}

func (repo MaterialRepositoryImpl) FindAllReferencedGitMaterials() ([]*GitMaterial, error) {

var materials []*GitMaterial

err := repo.dbConnection.Model(&materials).
ColumnExpr("DISTINCT(git_material.id)").
Column("git_material.*", "GitProvider").
Join("RIGHT JOIN git_material gm ON git_material.id = gm.ref_git_material_id").
Where("gm.deleted = ? ", false).
Where("gm.checkout_status = ? ", true).
Order("git_material.id ASC").
Select()

return materials, err
}

func (repo MaterialRepositoryImpl) FindReferencedGitMaterial(materialId int) (*GitMaterial, error) {

material := &GitMaterial{}
err := repo.dbConnection.Model(material).
Column("git_material.*", "GitProvider").
Join("RIGHT JOIN git_material gm ON git_material.id = gm.ref_git_material_id").
Where("gm.id = ? ", materialId).
Select()

return material, err
}

func (repo MaterialRepositoryImpl) GetMaterialWithSameRepoUrl(repoUrl string, limit int) (*GitMaterial, error) {

material := &GitMaterial{}
err := repo.dbConnection.Model(material).
Where("url = ?", repoUrl).
Where("deleted = false").
Limit(limit).
Select()

return material, err
}

func (repo MaterialRepositoryImpl) UpdateRefMaterialId(material *GitMaterial, tx *pg.Tx) error {

_, err := tx.Model(material).
Column("ref_git_material_id").
WherePK().
Update()

return err
}

func (repo MaterialRepositoryImpl) UpdateRefMaterialIdForAllWithSameUrl(url string, refGitMaterialId int, tx *pg.Tx) error {

material := &GitMaterial{}
_, err := tx.Model(material).
Set("ref_git_material_id = ?", refGitMaterialId).
Where("url = ?", url).
Where("deleted = ?", false).
Update()

return err
}

func (repo MaterialRepositoryImpl) FindByReferenceId(refGitMaterialId int, limit int, excludingIds []int) (*GitMaterial, error) {
var material GitMaterial
qry := repo.dbConnection.Model(&material).
Column("git_material.*", "GitProvider").
Where("git_material.ref_git_material_id = ? ", refGitMaterialId).
Where("git_material.deleted =? ", false).
Limit(limit)

if len(excludingIds) > 0 {
qry = qry.Where("git_material.id not in (?)", pg.In(excludingIds))
}

err := qry.Select()
return &material, err
}
Loading