Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package semantic

import (
"errors"
"fmt"
"io/fs"
"path"

"gopkg.in/yaml.v3"

"github.com/elastic/package-spec/v3/code/go/internal/fspath"
"github.com/elastic/package-spec/v3/code/go/pkg/specerrors"
)

const (
defaultStreamTemplatePath = "stream.yml.hbs"
packageTypeIntegration = "integration"
)

type policyTemplateInput struct {
Type string `yaml:"type"`
TemplatePath string `yaml:"template_path"` // optional for integration packages
}

type integrationPolicyTemplate struct {
Name string `yaml:"name"`
Inputs []policyTemplateInput `yaml:"inputs"`
}

type integrationPackageManifest struct { // package manifest
Type string `yaml:"type"` // integration or input
PolicyTemplates []integrationPolicyTemplate `yaml:"policy_templates"`
}

type stream struct {
Input string `yaml:"input"`
TemplatePath string `yaml:"template_path"`
}

type dataStreamManifest struct {
Streams []stream `yaml:"streams"`
}

// ValidateIntegrationPolicyTemplates validates the template_path fields at the policy template level for integration type packages
func ValidateIntegrationPolicyTemplates(fsys fspath.FS) specerrors.ValidationErrors {
var errs specerrors.ValidationErrors

manifestPath := "manifest.yml"
data, err := fs.ReadFile(fsys, manifestPath)
if err != nil {
return specerrors.ValidationErrors{
specerrors.NewStructuredErrorf("file \"%s\" is invalid: %ww", fsys.Path(manifestPath), errFailedToReadManifest)}
}

var manifest integrationPackageManifest
err = yaml.Unmarshal(data, &manifest)
if err != nil {
return specerrors.ValidationErrors{
specerrors.NewStructuredErrorf("file \"%s\" is invalid: %w", fsys.Path(manifestPath), errFailedToParseManifest)}
}

// only validate integration type packages
if manifest.Type != packageTypeIntegration {
return nil
}

// read at once all data stream manifests
dataStreamsManifestMap, err := readDataStreamsManifests(fsys)
if err != nil {
return specerrors.ValidationErrors{
specerrors.NewStructuredErrorf("file \"%s\" is invalid: %w", fsys.Path(manifestPath), err)}
}

for _, policyTemplate := range manifest.PolicyTemplates {
err = validateIntegrationPackagePolicyTemplate(fsys, policyTemplate, dataStreamsManifestMap)
if err != nil {
errs = append(errs, specerrors.NewStructuredErrorf(
"file \"%s\" is invalid: policy template \"%s\" references input template_path: %w",
fsys.Path(manifestPath), policyTemplate.Name, err))
}
}

return errs
}

// validateIntegrationPackagePolicyTemplate validates the template_path fields at the policy template level for integration type packages
func validateIntegrationPackagePolicyTemplate(fsys fspath.FS, policyTemplate integrationPolicyTemplate, dsManifestMap map[string]dataStreamManifest) error {
for _, input := range policyTemplate.Inputs {
if input.TemplatePath != "" {
// validate the provided template_path file exists
err := validateAgentInputTemplatePath(fsys, input.TemplatePath)
if err != nil {
return fmt.Errorf("error validating input \"%s\": %w", input.Type, err)
}
continue
}

err := validateInputWithStreams(fsys, input.Type, dsManifestMap)
if err != nil {
return fmt.Errorf("error validating input from streams \"%s\": %w", input.Type, err)
}
}
return nil
}

// readDataStreamsManifests reads all data stream manifests and returns a map of data stream directory to its manifest relevant content
func readDataStreamsManifests(fsys fspath.FS) (map[string]dataStreamManifest, error) {
// map of data stream directory to its manifest
dsManifestMap := make(map[string]dataStreamManifest, 0)

dsManifests, err := fs.Glob(fsys, "data_stream/*/manifest.yml")
if err != nil {
return nil, err
}
for _, file := range dsManifests {
data, err := fs.ReadFile(fsys, file)
if err != nil {
return nil, err
}
var m dataStreamManifest
err = yaml.Unmarshal(data, &m)
if err != nil {
return nil, err
}

dsDir := path.Dir(file)
dsManifestMap[dsDir] = m
}

return dsManifestMap, nil
}

// validateInputWithStreams validates that for the given input type, the streams of each dataset related to it have valid template_path files
// an input is related to a data_stream if any of its streams has the same input type as input
func validateInputWithStreams(fsys fspath.FS, input string, dsMap map[string]dataStreamManifest) error {
for dsDir, manifest := range dsMap {
for _, stream := range manifest.Streams {
// only consider streams that match the input type of the policy template
if stream.Input != input {
continue
}
// if template_path is not set at the stream level, default to "stream.yml.hbs"
if stream.TemplatePath == "" {
stream.TemplatePath = defaultStreamTemplatePath
}

_, err := fs.ReadFile(fsys, path.Join(dsDir, "agent", "stream", stream.TemplatePath))
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
// fallback to glob pattern matching in case the default template path is customized with a prefix
matches, err := fs.Glob(fsys, path.Join(dsDir, "agent", "stream", "*"+stream.TemplatePath))
if err != nil {
return err
}
if len(matches) == 0 {
return errTemplateNotFound
}
continue
}
return err
}
}
}

return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package semantic

import (
"os"
"path"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/package-spec/v3/code/go/internal/fspath"
)

func TestReadDataStreamsManifests(t *testing.T) {

d := t.TempDir()

err := os.MkdirAll(filepath.Join(d, "data_stream", "logs"), 0o755)
require.NoError(t, err)
err = os.WriteFile(filepath.Join(d, "data_stream", "logs", "manifest.yml"), []byte(`
streams:
- input: nginx/access
template_path: stream.yml.hbs
- input: nginx/error
template_path: error_stream.yml.hbs
`), 0o644)
require.NoError(t, err)

err = os.MkdirAll(filepath.Join(d, "data_stream", "logs", "nested"), 0o755)
require.NoError(t, err)
err = os.WriteFile(filepath.Join(d, "data_stream", "logs", "nested", "manifest.yml"), []byte(`
streams:
- input: nginx/access
template_path: stream.yml.hbs
- input: nginx/error
template_path: error_stream.yml.hbs
`), 0o644)
require.NoError(t, err)

dataStreamsManifestMap, err := readDataStreamsManifests(fspath.DirFS(d))
require.NoError(t, err)
// only the top-level manifest.yml should be read
require.Len(t, dataStreamsManifestMap, 1)

mapKey := filepath.ToSlash(path.Join("data_stream", "logs"))
require.NotEmpty(t, dataStreamsManifestMap[mapKey])
logsManifest := dataStreamsManifestMap[mapKey]
require.Len(t, logsManifest.Streams, 2)
require.Equal(t, "nginx/access", logsManifest.Streams[0].Input)
require.Equal(t, "stream.yml.hbs", logsManifest.Streams[0].TemplatePath)
require.Equal(t, "nginx/error", logsManifest.Streams[1].Input)
require.Equal(t, "error_stream.yml.hbs", logsManifest.Streams[1].TemplatePath)
}

func TestValidateInputWithStreams(t *testing.T) {
d := t.TempDir()
err := os.MkdirAll(filepath.Join(d, "data_stream", "logs", "agent", "stream"), 0o755)
require.NoError(t, err)
err = os.WriteFile(filepath.Join(d, "data_stream", "logs", "agent", "stream", "access.yml.hbs"), []byte(`access stream template`), 0o644)
require.NoError(t, err)

dsMap := make(map[string]dataStreamManifest)
dsMap[filepath.ToSlash(path.Join("data_stream", "logs"))] = dataStreamManifest{
Streams: []stream{
{
Input: "nginx/access",
TemplatePath: "access.yml.hbs",
},
{
Input: "nginx/error",
TemplatePath: "error_stream.yml.hbs",
},
{
Input: "nginx/other",
},
{
Input: "prefix/stream",
},
},
}
t.Run("valid input with existing template_path", func(t *testing.T) {
err = validateInputWithStreams(fspath.DirFS(d), "nginx/access", dsMap)
require.NoError(t, err)
})

t.Run("input with non-existing template_path", func(t *testing.T) {
err = validateInputWithStreams(fspath.DirFS(d), "nginx/error", dsMap)
require.ErrorIs(t, errTemplateNotFound, err)
})

t.Run("valid input with default template_path", func(t *testing.T) {
err = os.WriteFile(filepath.Join(d, "data_stream", "logs", "agent", "stream", "stream.yml.hbs"), []byte(`default stream template`), 0o644)
require.NoError(t, err)
defer os.Remove(filepath.Join(d, "data_stream", "logs", "agent", "stream", "stream.yml.hbs"))

err = validateInputWithStreams(fspath.DirFS(d), "nginx/other", dsMap)
require.NoError(t, err)
})

t.Run("valid input with default prefixed template_path", func(t *testing.T) {
err = os.WriteFile(filepath.Join(d, "data_stream", "logs", "agent", "stream", "prefixstream.yml.hbs"), []byte(`access stream template`), 0o644)
require.NoError(t, err)
defer os.Remove(filepath.Join(d, "data_stream", "logs", "agent", "stream", "prefixstream.yml.hbs"))

err = validateInputWithStreams(fspath.DirFS(d), "prefix/stream", dsMap)
require.NoError(t, err)
})

}
func TestValidateIntegrationPolicyTemplates_NonIntegrationType(t *testing.T) {
d := t.TempDir()
// write a manifest with a non-integration type
err := os.WriteFile(filepath.Join(d, "manifest.yml"), []byte(`type: input`), 0o644)
require.NoError(t, err)

errs := ValidateIntegrationPolicyTemplates(fspath.DirFS(d))
require.Nil(t, errs)
}

func TestValidateIntegrationPolicyTemplates_IntegrationValidTemplates(t *testing.T) {
d := t.TempDir()

// manifest: integration with a policy template referencing nginx/access (no template_path at policy level)
err := os.WriteFile(filepath.Join(d, "manifest.yml"), []byte(`
type: integration
policy_templates:
- name: pt1
inputs:
- type: nginx/access
`), 0o644)
require.NoError(t, err)

// data stream manifest providing the stream for nginx/access with a specific template
err = os.MkdirAll(filepath.Join(d, "data_stream", "logs", "agent", "stream"), 0o755)
require.NoError(t, err)
err = os.WriteFile(filepath.Join(d, "data_stream", "logs", "manifest.yml"), []byte(`
streams:
- input: nginx/access
template_path: access.yml.hbs
`), 0o644)
require.NoError(t, err)
// write the actual template file referenced by the stream
err = os.WriteFile(filepath.Join(d, "data_stream", "logs", "agent", "stream", "access.yml.hbs"), []byte("template"), 0o644)
require.NoError(t, err)

errs := ValidateIntegrationPolicyTemplates(fspath.DirFS(d))
require.Empty(t, errs)
}

func TestValidateIntegrationPolicyTemplates_DefaultTemplate(t *testing.T) {
d := t.TempDir()

// manifest: integration with a policy template referencing an input that does not exist in any data stream
err := os.WriteFile(filepath.Join(d, "manifest.yml"), []byte(`
type: integration
policy_templates:
- name: pt2
inputs:
- type: nginx/access
`), 0o644)
require.NoError(t, err)

// create a data stream that does NOT include the referenced input
err = os.MkdirAll(filepath.Join(d, "data_stream", "logs", "agent", "stream"), 0o755)
require.NoError(t, err)
err = os.WriteFile(filepath.Join(d, "data_stream", "logs", "manifest.yml"), []byte(`
streams:
- input: nginx/access
`), 0o644)
require.NoError(t, err)
// write the default template file for the existing stream
err = os.WriteFile(filepath.Join(d, "data_stream", "logs", "agent", "stream", "stream.yml.hbs"), []byte("template"), 0o644)
require.NoError(t, err)

errs := ValidateIntegrationPolicyTemplates(fspath.DirFS(d))
require.Empty(t, errs)
}
1 change: 1 addition & 0 deletions code/go/internal/validator/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ func (s Spec) rules(pkgType string, rootSpec spectypes.ItemSpec) validationRules
{fn: semantic.ValidateDurationVariables, since: semver.MustParse("3.5.0")},
{fn: semantic.ValidateInputPackagesPolicyTemplates, types: []string{"input"}},
{fn: semantic.ValidateMinimumAgentVersion},
{fn: semantic.ValidateIntegrationPolicyTemplates, types: []string{"integration"}},
}

var validationRules validationRules
Expand Down
18 changes: 18 additions & 0 deletions code/go/pkg/validator/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,24 @@ func TestValidateFile(t *testing.T) {
"field conditions.agent: version is required",
},
},
"bad_integration_stream_template_path": {
"manifest.yml",
[]string{
"policy template \"sample\" references input template_path: error validating input from streams \"logfile\": template file not found",
},
},
"bad_integration_stream_template_path_default": {
"manifest.yml",
[]string{
"policy template \"sample\" references input template_path: error validating input from streams \"logfile\": template file not found",
},
},
"bad_integration_input_template_path": {
"manifest.yml",
[]string{
"policy template \"sample\" references input template_path: error validating input \"logfile\": template file not found",
},
},
}

for pkgName, test := range tests {
Expand Down
Loading