Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ type Agent struct {
// deletions tracks valid deletions from the source.
// This is used to differentiate between valid and invalid deletions
deletions *manager.DeletionTracker

// below are loggers to control log levels of different subsystems
resourceProxyLogger *logging.CentralizedLogger
redisProxyLogger *logging.CentralizedLogger
grpcEventLogger *logging.CentralizedLogger
}

const defaultQueueName = "default"
Expand Down Expand Up @@ -167,6 +172,18 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri
}
}

if a.resourceProxyLogger == nil {
a.resourceProxyLogger = logging.GetDefaultLogger()
}

if a.redisProxyLogger == nil {
a.redisProxyLogger = logging.GetDefaultLogger()
}

if a.grpcEventLogger == nil {
a.grpcEventLogger = logging.GetDefaultLogger()
}

if a.remote == nil {
return nil, fmt.Errorf("remote not defined")
}
Expand Down Expand Up @@ -489,7 +506,19 @@ func (a *Agent) SetConnected(connected bool) {
}

func log() *logrus.Entry {
return logging.ModuleLogger("Agent")
return logging.GetDefaultLogger().ModuleLogger("Agent")
}

func (a *Agent) logResourceProxy() *logrus.Entry {
return logging.SelectLogger(a.resourceProxyLogger).ModuleLogger("ResourceProxy")
}

func (a *Agent) logRedisProxy() *logrus.Entry {
return logging.SelectLogger(a.redisProxyLogger).ModuleLogger("RedisProxy")
}

func (a *Agent) logGrpcEvent() *logrus.Entry {
return logging.SelectLogger(a.grpcEventLogger).ModuleLogger("GrpcEvent")
}

func (a *Agent) healthzHandler(w http.ResponseWriter, r *http.Request) {
Expand Down
37 changes: 18 additions & 19 deletions agent/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (a *Agent) processIncomingEvent(ev *event.Event) error {
err := a.processIncomingRedisRequest(ev)
if err != nil {
tracing.RecordError(redisSpan, err)
log().WithError(err).Errorf("Unable to process incoming redis event")
a.logGrpcEvent().WithError(err).Errorf("Unable to process incoming redis event")
} else {
tracing.SetSpanOK(redisSpan)
}
Expand All @@ -103,7 +103,7 @@ func (a *Agent) processIncomingEvent(ev *event.Event) error {
// Process terminal request in a separate goroutine to avoid blocking the event thread
go func() {
if termErr := a.processIncomingTerminalRequest(ev); termErr != nil && !isShellNotFoundError(termErr) {
log().WithError(termErr).Errorf("Unable to process incoming terminal event")
a.logGrpcEvent().WithError(termErr).Errorf("Unable to process incoming terminal event")
}
}()
default:
Expand Down Expand Up @@ -137,7 +137,7 @@ func (a *Agent) processIncomingEvent(ev *event.Event) error {
}

func (a *Agent) processIncomingApplication(ev *event.Event) error {
logCtx := log().WithFields(logrus.Fields{
logCtx := a.logGrpcEvent().WithFields(logrus.Fields{
"method": "processIncomingEvents",
})
incomingApp, err := ev.Application()
Expand Down Expand Up @@ -238,7 +238,7 @@ func (a *Agent) processIncomingApplication(ev *event.Event) error {
}

func (a *Agent) processIncomingAppProject(ev *event.Event) error {
logCtx := log().WithFields(logrus.Fields{
logCtx := a.logGrpcEvent().WithFields(logrus.Fields{
"method": "processIncomingEvents",
})
incomingAppProject, err := ev.AppProject()
Expand Down Expand Up @@ -321,7 +321,7 @@ func (a *Agent) processIncomingAppProject(ev *event.Event) error {
}

func (a *Agent) processIncomingRepository(ev *event.Event) error {
logCtx := log().WithFields(logrus.Fields{
logCtx := a.logGrpcEvent().WithFields(logrus.Fields{
"method": "processIncomingEvents",
})

Expand Down Expand Up @@ -408,7 +408,7 @@ func (a *Agent) processIncomingRepository(ev *event.Event) error {
// processIncomingResourceResyncEvent handles all the resync events that are
// exchanged with the agent/principal restarts
func (a *Agent) processIncomingResourceResyncEvent(ev *event.Event) error {
logCtx := log().WithFields(logrus.Fields{
logCtx := a.logGrpcEvent().WithFields(logrus.Fields{
"method": "processIncomingEvents",
"agent": a.remote.ClientID(),
"mode": a.mode,
Expand Down Expand Up @@ -496,7 +496,7 @@ func (a *Agent) createApplication(incoming *v1alpha1.Application) (*v1alpha1.App
// Applications must exist in the same namespace as the agent
incoming.SetNamespace(a.namespace)

logCtx := log().WithFields(logrus.Fields{
logCtx := a.logGrpcEvent().WithFields(logrus.Fields{
"method": "CreateApplication",
"app": incoming.QualifiedName(),
})
Expand Down Expand Up @@ -548,7 +548,7 @@ func (a *Agent) updateApplication(incoming *v1alpha1.Application) (*v1alpha1.App
// Applications must exist in the same namespace as the agent
incoming.SetNamespace(a.namespace)

logCtx := log().WithFields(logrus.Fields{
logCtx := a.logGrpcEvent().WithFields(logrus.Fields{
"method": "UpdateApplication",
"app": incoming.QualifiedName(),
"resourceVersion": incoming.ResourceVersion,
Expand Down Expand Up @@ -591,7 +591,7 @@ func (a *Agent) deleteApplication(app *v1alpha1.Application) error {
// Applications must exist in the same namespace as the agent
app.SetNamespace(a.namespace)

logCtx := log().WithFields(logrus.Fields{
logCtx := a.logGrpcEvent().WithFields(logrus.Fields{
"method": "DeleteApplication",
"app": app.QualifiedName(),
})
Expand Down Expand Up @@ -634,7 +634,7 @@ func (a *Agent) deleteApplication(app *v1alpha1.Application) error {

err = a.appManager.Unmanage(app.QualifiedName())
if err != nil {
log().Warnf("Could not unmanage app %s: %v", app.QualifiedName(), err)
a.logGrpcEvent().Warnf("Could not unmanage app %s: %v", app.QualifiedName(), err)
}
return nil
}
Expand All @@ -645,7 +645,7 @@ func (a *Agent) createAppProject(incoming *v1alpha1.AppProject) (*v1alpha1.AppPr
// AppProjects must exist in the same namespace as the agent
incoming.SetNamespace(a.namespace)

logCtx := log().WithFields(logrus.Fields{
logCtx := a.logGrpcEvent().WithFields(logrus.Fields{
"method": "CreateAppProject",
"appProject": incoming.Name,
})
Expand Down Expand Up @@ -685,7 +685,7 @@ func (a *Agent) updateAppProject(incoming *v1alpha1.AppProject) (*v1alpha1.AppPr
// AppProjects must exist in the same namespace as the agent
incoming.SetNamespace(a.namespace)

logCtx := log().WithFields(logrus.Fields{
logCtx := a.logGrpcEvent().WithFields(logrus.Fields{
"method": "UpdateAppProject",
"appProject": incoming.Name,
"resourceVersion": incoming.ResourceVersion,
Expand All @@ -709,14 +709,13 @@ func (a *Agent) updateAppProject(incoming *v1alpha1.AppProject) (*v1alpha1.AppPr

logCtx.Tracef("Calling update spec for this event")
return a.projectManager.UpdateAppProject(a.context, incoming)

}

func (a *Agent) deleteAppProject(project *v1alpha1.AppProject) error {
// AppProjects must exist in the same namespace as the agent
project.SetNamespace(a.namespace)

logCtx := log().WithFields(logrus.Fields{
logCtx := a.logGrpcEvent().WithFields(logrus.Fields{
"method": "DeleteAppProject",
"appProject": project.Name,
})
Expand Down Expand Up @@ -755,7 +754,7 @@ func (a *Agent) deleteAppProject(project *v1alpha1.AppProject) error {

err = a.projectManager.Unmanage(project.Name)
if err != nil {
log().Warnf("Could not unmanage appProject %s: %v", project.Name, err)
a.logGrpcEvent().Warnf("Could not unmanage appProject %s: %v", project.Name, err)
}
return nil
}
Expand All @@ -765,7 +764,7 @@ func (a *Agent) createRepository(incoming *corev1.Secret) (*corev1.Secret, error
// Repository secrets must exist in the same namespace as the agent
incoming.SetNamespace(a.namespace)

logCtx := log().WithFields(logrus.Fields{
logCtx := a.logGrpcEvent().WithFields(logrus.Fields{
"method": "CreateRepository",
"repo": incoming.Name,
})
Expand Down Expand Up @@ -808,7 +807,7 @@ func (a *Agent) updateRepository(incoming *corev1.Secret) (*corev1.Secret, error
// Repository secrets must exist in the same namespace as the agent
incoming.SetNamespace(a.namespace)

logCtx := log().WithFields(logrus.Fields{
logCtx := a.logGrpcEvent().WithFields(logrus.Fields{
"method": "UpdateRepository",
"repo": incoming.Name,
"resourceVersion": incoming.ResourceVersion,
Expand Down Expand Up @@ -837,7 +836,7 @@ func (a *Agent) deleteRepository(repo *corev1.Secret) error {
// Repository secrets must exist in the same namespace as the agent
repo.SetNamespace(a.namespace)

logCtx := log().WithFields(logrus.Fields{
logCtx := a.logGrpcEvent().WithFields(logrus.Fields{
"method": "DeleteRepository",
"repo": repo.Name,
})
Expand Down Expand Up @@ -872,7 +871,7 @@ func (a *Agent) deleteRepository(repo *corev1.Secret) error {

err = a.repoManager.Unmanage(repo.Name)
if err != nil {
log().Warnf("Could not unmanage repository %s: %v", repo.Name, err)
a.logGrpcEvent().Warnf("Could not unmanage repository %s: %v", repo.Name, err)
}

return nil
Expand Down
8 changes: 1 addition & 7 deletions agent/inbound_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,11 @@ const (

// processIncomingRedisRequest handles incoming redis-specific events from principal, including get/subscribe (from argo cd) and ping (internal).
func (a *Agent) processIncomingRedisRequest(ev *event.Event) error {

rreq, err := ev.RedisRequest()
if err != nil {
return err
}
logCtx := log().WithFields(logrus.Fields{
logCtx := a.logRedisProxy().WithFields(logrus.Fields{
"method": "processIncomingRedisRequest",
"uuid": rreq.UUID,
"connectionUUID": rreq.ConnectionUUID,
Expand Down Expand Up @@ -174,7 +173,6 @@ func (a *Agent) handleRedisSubscribeMessage(logCtx *logrus.Entry, rreq *event.Re
}()

return res, nil

}

// forwardRedisSubscribeNotificationsToPrincipal reads subscription events from agent Argo CD redis, then sends them to principal.
Expand Down Expand Up @@ -240,7 +238,6 @@ func (a *Agent) forwardRedisSubscribeNotificationsToPrincipal(pubsub *redis.PubS
logCtx.Tracef("Emitted Subscribe push event")
}
}

}

func (a *Agent) handleRedisGetMessage(logCtx *logrus.Entry, rreq *event.RedisRequest) (*event.RedisResponseBody, error) {
Expand Down Expand Up @@ -289,7 +286,6 @@ func (a *Agent) handleRedisGetMessage(logCtx *logrus.Entry, rreq *event.RedisReq
// needs to be converted to, e.g.
// "app|resources-tree|my-app|1.8.3.gz
func stripNamespaceFromRedisKey(key string, logCtx *logrus.Entry) (string, error) {

var matchedPrefix string
expectedPrefixes := []string{
"app|resources-tree|",
Expand Down Expand Up @@ -329,7 +325,6 @@ func stripNamespaceFromRedisKey(key string, logCtx *logrus.Entry) (string, error
key = strings.Join(components, "|")

return key, nil

}

func (a *Agent) getRedisClientAndCache() (*redis.Client, *rediscache.Cache, error) {
Expand All @@ -351,5 +346,4 @@ func (a *Agent) getRedisClientAndCache() (*redis.Client, *rediscache.Cache, erro
cache := rediscache.New(&rediscache.Options{Redis: client})

return client, cache, nil

}
11 changes: 11 additions & 0 deletions agent/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import (
"fmt"
"time"

"github.com/argoproj-labs/argocd-agent/internal/logging"
"github.com/argoproj-labs/argocd-agent/pkg/client"
"github.com/argoproj-labs/argocd-agent/pkg/types"
"github.com/sirupsen/logrus"
)

func WithAllowedNamespaces(namespaces ...string) AgentOption {
Expand Down Expand Up @@ -114,3 +116,12 @@ func WithHeartbeatInterval(interval time.Duration) AgentOption {
return nil
}
}

func WithSubsystemLoggers(resourceProxy, redisProxy, grpcEvent *logrus.Logger) AgentOption {
return func(o *Agent) error {
o.resourceProxyLogger = logging.New(resourceProxy)
o.redisProxyLogger = logging.New(redisProxy)
o.grpcEventLogger = logging.New(grpcEvent)
return nil
}
}
22 changes: 11 additions & 11 deletions agent/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
// addAppCreationToQueue processes a new application event originating from the
// AppInformer and puts it in the send queue.
func (a *Agent) addAppCreationToQueue(app *v1alpha1.Application) {
logCtx := log().WithField(logfields.Event, "NewApp").WithField(logfields.Application, app.QualifiedName())
logCtx := a.logGrpcEvent().WithField(logfields.Event, "NewApp").WithField(logfields.Application, app.QualifiedName())
logCtx.Debugf("New app event")

ctx, span := a.startSpan("create", "Application", app)
Expand Down Expand Up @@ -76,7 +76,7 @@ func (a *Agent) addAppCreationToQueue(app *v1alpha1.Application) {
// addAppUpdateToQueue processes an application update event originating from
// the AppInformer and puts it in the agent's send queue.
func (a *Agent) addAppUpdateToQueue(old *v1alpha1.Application, new *v1alpha1.Application) {
logCtx := log().WithField(logfields.Event, "UpdateApp").WithField(logfields.Application, old.QualifiedName())
logCtx := a.logGrpcEvent().WithField(logfields.Event, "UpdateApp").WithField(logfields.Application, old.QualifiedName())
a.watchLock.Lock()
defer a.watchLock.Unlock()

Expand Down Expand Up @@ -129,7 +129,7 @@ func (a *Agent) addAppUpdateToQueue(old *v1alpha1.Application, new *v1alpha1.App
// addAppDeletionToQueue processes an application delete event originating from
// the AppInformer and puts it in the send queue.
func (a *Agent) addAppDeletionToQueue(app *v1alpha1.Application) {
logCtx := log().WithField(logfields.Event, "DeleteApp").WithField(logfields.Application, app.QualifiedName())
logCtx := a.logGrpcEvent().WithField(logfields.Event, "DeleteApp").WithField(logfields.Application, app.QualifiedName())
logCtx.Debugf("Delete app event")

ctx, span := a.startSpan("delete", "Application", app)
Expand Down Expand Up @@ -171,7 +171,7 @@ func (a *Agent) addAppDeletionToQueue(app *v1alpha1.Application) {
// deleteNamespaceCallback is called when the user deletes the agent namespace.
// Since there is no namespace we can remove the queue associated with this agent.
func (a *Agent) deleteNamespaceCallback(outbound *corev1.Namespace) {
logCtx := log().WithField(logfields.Event, "DeleteNamespace").WithField(logfields.Agent, outbound.Name)
logCtx := a.logGrpcEvent().WithField(logfields.Event, "DeleteNamespace").WithField(logfields.Agent, outbound.Name)

if !a.queues.HasQueuePair(outbound.Name) {
return
Expand All @@ -188,7 +188,7 @@ func (a *Agent) deleteNamespaceCallback(outbound *corev1.Namespace) {
// addAppProjectCreationToQueue processes a new appProject event originating from the
// AppProject Informer and puts it in the send queue.
func (a *Agent) addAppProjectCreationToQueue(appProject *v1alpha1.AppProject) {
logCtx := log().WithFields(logrus.Fields{
logCtx := a.logGrpcEvent().WithFields(logrus.Fields{
"event": "NewAppProject",
"appProject": appProject.Name,
"sendq_name": defaultQueueName,
Expand Down Expand Up @@ -233,7 +233,7 @@ func (a *Agent) addAppProjectCreationToQueue(appProject *v1alpha1.AppProject) {
// addAppProjectUpdateToQueue processes an appProject update event originating from the
// AppProject Informer and puts it in the send queue.
func (a *Agent) addAppProjectUpdateToQueue(old *v1alpha1.AppProject, new *v1alpha1.AppProject) {
logCtx := log().WithFields(logrus.Fields{
logCtx := a.logGrpcEvent().WithFields(logrus.Fields{
"event": "UpdateAppProject",
"appProject": new.Name,
"sendq_name": defaultQueueName,
Expand Down Expand Up @@ -290,7 +290,7 @@ func (a *Agent) addAppProjectUpdateToQueue(old *v1alpha1.AppProject, new *v1alph
// addAppProjectDeletionToQueue processes an appProject delete event originating from the
// AppProject Informer and puts it in the send queue.
func (a *Agent) addAppProjectDeletionToQueue(appProject *v1alpha1.AppProject) {
logCtx := log().WithFields(logrus.Fields{
logCtx := a.logGrpcEvent().WithFields(logrus.Fields{
"event": "DeleteAppProject",
"appProject": appProject.Name,
"sendq_name": defaultQueueName,
Expand Down Expand Up @@ -342,7 +342,7 @@ func (a *Agent) addAppProjectDeletionToQueue(appProject *v1alpha1.AppProject) {
// addClusterCacheInfoUpdateToQueue processes a cluster cache info update event
// and puts it in the send queue.
func (a *Agent) addClusterCacheInfoUpdateToQueue() {
logCtx := log().WithFields(logrus.Fields{
logCtx := a.logGrpcEvent().WithFields(logrus.Fields{
"event": "addClusterCacheInfoUpdateToQueue",
})

Expand Down Expand Up @@ -380,7 +380,7 @@ func (a *Agent) addClusterCacheInfoUpdateToQueue() {
}

func (a *Agent) handleRepositoryCreation(repo *corev1.Secret) {
logCtx := log().WithFields(logrus.Fields{
logCtx := a.logGrpcEvent().WithFields(logrus.Fields{
"event": "NewRepository",
"repository": repo.Name,
})
Expand All @@ -406,7 +406,7 @@ func (a *Agent) handleRepositoryCreation(repo *corev1.Secret) {
}

func (a *Agent) handleRepositoryUpdate(old, new *corev1.Secret) {
logCtx := log().WithFields(logrus.Fields{
logCtx := a.logGrpcEvent().WithFields(logrus.Fields{
"event": "UpdateRepository",
"repository": new.Name,
})
Expand Down Expand Up @@ -436,7 +436,7 @@ func (a *Agent) handleRepositoryUpdate(old, new *corev1.Secret) {
}

func (a *Agent) handleRepositoryDeletion(repo *corev1.Secret) {
logCtx := log().WithFields(logrus.Fields{
logCtx := a.logGrpcEvent().WithFields(logrus.Fields{
"event": "DeleteRepository",
"repository": repo.Name,
})
Expand Down
Loading
Loading