Version: 1.0 Last Updated: 2025-11-13 Security Audit: OWASP Top 10:2021
- Security Overview
- OWASP Security Audit
- Critical Security Issues
- High Priority Issues
- Medium Priority Issues
- Security Best Practices
- Secure Configuration
- Vulnerability Disclosure
- Security Roadmap
Overall Rating: MODERATE (6.7/10)
kshark demonstrates strong foundational security practices in several areas, particularly in input validation and credential redaction. However, there are critical areas requiring immediate attention, especially around SSRF protection and credential management.
✅ Excellent Controls:
- Command injection prevention via hostname validation
- Credential redaction in all outputs
- Path traversal protection
- TLS 1.2+ enforcement
- Certificate expiry monitoring
- Non-root Docker execution
- Server-Side Request Forgery (SSRF) vulnerabilities
- Plain-text credential storage
- Insufficient security event logging
- Error message information disclosure
Comprehensive audit performed against OWASP Top 10:2021 standards.
| OWASP Category | Status | Score | Priority |
|---|---|---|---|
| A01 - Broken Access Control | ✅ Secure | 9/10 | - |
| A02 - Cryptographic Failures | 6/10 | HIGH | |
| A03 - Injection | ✅ Secure | 9/10 | - |
| A04 - Insecure Design | ✅ Good | 8/10 | - |
| A05 - Security Misconfiguration | 6/10 | MEDIUM | |
| A06 - Vulnerable Components | 7/10 | MEDIUM | |
| A07 - Authentication Failures | 6/10 | HIGH | |
| A08 - Data Integrity | 7/10 | MEDIUM | |
| A09 - Logging Failures | 5/10 | HIGH | |
| A10 - SSRF | 4/10 | CRITICAL |
Overall Security Score: 6.7/10
Severity: CRITICAL
CVSS Score: 8.6 (High)
Location: cmd/kshark/main.go:761-799, 1136-1159
The application accepts arbitrary URLs from configuration files for Schema Registry and REST Proxy without validation. This allows potential attackers to:
- Scan internal networks
- Access cloud metadata services (e.g., AWS 169.254.169.254)
- Probe internal services
- Bypass firewall restrictions
Schema Registry Check (Lines 761-799):
func checkSchemaRegistry(ctx context.Context, r *Report, p map[string]string) {
url := strings.TrimSpace(p["schema.registry.url"])
if url == "" {
return
}
// ⚠️ NO URL VALIDATION - ACCEPTS ANY URL!
client := httpClientFromTLS(tlsConf, 8*time.Second)
req, _ := http.NewRequestWithContext(ctx, "GET",
strings.TrimRight(url, "/")+"/subjects", nil)
resp, err := client.Do(req)REST Proxy Check (Lines 1136-1159):
if rest := strings.TrimSpace(props["rest.proxy.url"]); rest != "" {
// ⚠️ SAME VULNERABILITY - NO VALIDATION
client := httpClientFromTLS(tlsConf, 8*time.Second)
req, _ := http.NewRequest("GET", strings.TrimRight(rest, "/")+"/topics", nil)
resp, err := client.Do(req)1. Internal Network Scanning:
# Attacker-controlled configuration
schema.registry.url=http://192.168.1.1:22
rest.proxy.url=http://10.0.0.1:33062. Cloud Metadata Service Access:
schema.registry.url=http://169.254.169.254/latest/meta-data/3. Internal Service Probing:
rest.proxy.url=http://localhost:6379 # Redis
schema.registry.url=http://127.0.0.1:9200 # ElasticsearchStep 1: Add URL Validation Function
Create this function in main.go:
// Add after line 817 (after mtuCheck function)
// isAllowedURL validates that a URL is safe to access
func isAllowedURL(rawURL string) error {
u, err := url.Parse(rawURL)
if err != nil {
return fmt.Errorf("invalid URL: %w", err)
}
// Only allow HTTP(S) schemes
if u.Scheme != "https" && u.Scheme != "http" {
return fmt.Errorf("only HTTP(S) schemes allowed, got: %s", u.Scheme)
}
// Resolve hostname to IP
host := u.Hostname()
if host == "" {
return fmt.Errorf("no hostname in URL")
}
// Resolve all IPs
ips, err := net.LookupIP(host)
if err != nil {
return fmt.Errorf("cannot resolve host %s: %w", host, err)
}
// Block private/internal IPs
for _, ip := range ips {
if isPrivateIP(ip) {
return fmt.Errorf("private/internal IP addresses not allowed: %s resolves to %s", host, ip)
}
}
return nil
}
// isPrivateIP checks if an IP is in private/reserved ranges
func isPrivateIP(ip net.IP) bool {
// Private IP ranges to block
privateRanges := []string{
"10.0.0.0/8", // RFC1918
"172.16.0.0/12", // RFC1918
"192.168.0.0/16", // RFC1918
"127.0.0.0/8", // Loopback
"169.254.0.0/16", // Link-local (AWS metadata)
"::1/128", // IPv6 loopback
"fc00::/7", // IPv6 private
"fe80::/10", // IPv6 link-local
"0.0.0.0/8", // Current network
"100.64.0.0/10", // Shared address space
"192.0.0.0/24", // IETF protocol assignments
"192.0.2.0/24", // TEST-NET-1
"198.18.0.0/15", // Benchmarking
"198.51.100.0/24", // TEST-NET-2
"203.0.113.0/24", // TEST-NET-3
"224.0.0.0/4", // Multicast
"240.0.0.0/4", // Reserved
"255.255.255.255/32", // Broadcast
}
for _, cidr := range privateRanges {
_, subnet, _ := net.ParseCIDR(cidr)
if subnet.Contains(ip) {
return true
}
}
return false
}Step 2: Add Redirect Protection
Update httpClientFromTLS function (Line 756):
func httpClientFromTLS(tlsConf *tls.Config, timeout time.Duration) *http.Client {
tr := &http.Transport{
TLSClientConfig: tlsConf,
Proxy: http.ProxyFromEnvironment,
IdleConnTimeout: 10 * time.Second,
}
return &http.Client{
Transport: tr,
Timeout: timeout,
// ✅ ADD THIS: Validate redirects
CheckRedirect: func(req *http.Request, via []*http.Request) error {
if len(via) >= 3 {
return fmt.Errorf("too many redirects (max 3)")
}
// Validate redirect URL
if err := isAllowedURL(req.URL.String()); err != nil {
return fmt.Errorf("redirect blocked: %w", err)
}
return nil
},
}
}Step 3: Apply Validation in Checks
Update checkSchemaRegistry (Line 761):
func checkSchemaRegistry(ctx context.Context, r *Report, p map[string]string, tlsConf *tls.Config) {
url := strings.TrimSpace(p["schema.registry.url"])
if url == "" {
return
}
// ✅ ADD THIS: Validate URL before use
if err := isAllowedURL(url); err != nil {
addRow(r, Row{"schema-registry", url, L7HTTP, FAIL,
fmt.Sprintf("URL validation failed: %v", err),
"Schema Registry URL must be a valid HTTPS URL to a public endpoint"})
return
}
// Rest of function remains the same...Update REST Proxy check similarly (Line 1136).
- Implement
isAllowedURL()function - Implement
isPrivateIP()function - Add redirect validation to
httpClientFromTLS() - Update
checkSchemaRegistry()to validate URLs - Update REST Proxy check to validate URLs
- Add configuration option to allow internal IPs (if needed)
- Test with various malicious URLs
- Document URL validation in configuration guide
Test Cases:
# Should FAIL - Internal IP
schema.registry.url=http://192.168.1.1:8081
# Should FAIL - Localhost
schema.registry.url=http://localhost:8081
# Should FAIL - AWS metadata
schema.registry.url=http://169.254.169.254/latest/meta-data/
# Should PASS - Valid public HTTPS
schema.registry.url=https://schema-registry.example.com
# Should FAIL - Redirect to internal IP
# (Requires redirect test server)Severity: HIGH CVSS Score: 7.5 (High) Location: Multiple files
Credentials are stored in plain text in configuration files:
client.properties- Kafka credentialsai_config.json- AI provider API keys- No file permission validation
- No encryption at rest
client.properties:
sasl.username=YOUR_API_KEY
sasl.password=YOUR_API_SECRET # ⚠️ Plain textai_config.json:
{
"provider": "openai",
"api_key": "sk-xxxx..." // ⚠️ Plain text
}- Credentials exposed if file system is compromised
- Credentials visible in process listings
- Credentials may be logged or cached
- Difficult to rotate credentials
- Compliance issues (PCI-DSS, SOC 2, etc.)
Option 1: Environment Variable Support (Quick Win)
Add environment variable substitution:
// Add to loadProperties function (Line 346)
func loadProperties(path string) (map[string]string, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
}
defer file.Close()
props := map[string]string{}
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" || strings.HasPrefix(line, "#") {
continue
}
parts := strings.SplitN(line, "=", 2)
if len(parts) == 2 {
key := strings.TrimSpace(parts[0])
value := strings.TrimSpace(parts[1])
// ✅ ADD THIS: Expand environment variables
value = os.ExpandEnv(value)
props[key] = value
}
}
return props, scanner.Err()
}Usage:
# client.properties
sasl.username=${KAFKA_USERNAME}
sasl.password=${KAFKA_PASSWORD}export KAFKA_USERNAME="my-api-key"
export KAFKA_PASSWORD="my-secret"
./kshark -props client.propertiesOption 2: File Permission Validation
// Add after loadProperties function
func validateFilePermissions(path string) error {
info, err := os.Stat(path)
if err != nil {
return err
}
mode := info.Mode()
// File should not be readable by group or others
if mode.Perm()&0077 != 0 {
return fmt.Errorf("file %s has insecure permissions %o (should be 0600)",
path, mode.Perm())
}
return nil
}
// Call in main() before loading properties
if err := validateFilePermissions(*propsFile); err != nil {
fmt.Fprintf(os.Stderr, "Warning: %v\n", err)
fmt.Fprintln(os.Stderr, "Hint: Run 'chmod 600 <file>' to secure it")
}Option 3: Encrypted Configuration (Future)
// Encrypt configuration files with age or similar
// Decrypt at runtime with passphrase or key file
func loadEncryptedProperties(path, keyFile string) (map[string]string, error) {
// Implementation using age or similar encryption library
}Option 4: Secret Manager Integration (Best Practice)
// AWS Secrets Manager
func getSecretFromAWS(secretName string) (string, error) {
cfg, _ := config.LoadDefaultConfig(context.TODO())
client := secretsmanager.NewFromConfig(cfg)
result, err := client.GetSecretValue(context.TODO(), &secretsmanager.GetSecretValueInput{
SecretId: aws.String(secretName),
})
if err != nil {
return "", err
}
return *result.SecretString, nil
}
// Usage in properties:
// sasl.password=aws-secret://prod/kafka/password- ✅ Immediate: Add environment variable support (1-2 hours)
- ✅ This Sprint: Add file permission validation (1 hour)
- 📅 Next Sprint: Document secret manager integration patterns
- 📅 Future: Implement built-in encryption support
Severity: HIGH
CVSS Score: 6.5 (Medium)
Location: Throughout main.go
No audit logging for security-relevant events:
- No logs for authentication attempts
- No logs for file access operations
- No logs for configuration loading
- No logs for API key usage
- No structured logging format
- Cannot detect security incidents
- Cannot investigate breaches
- Cannot meet compliance requirements
- Difficult to debug security issues
Step 1: Implement Structured Logging
Use Go's log/slog (available in Go 1.21+):
import (
"log/slog"
"os"
)
// Global logger
var logger *slog.Logger
func init() {
// JSON structured logging
handler := slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
Level: slog.LevelInfo,
})
logger = slog.New(handler)
}Step 2: Log Security Events
// After loading configuration (Line 1018)
logger.Info("configuration loaded",
"file", *propsFile,
"properties_count", len(props),
"user", os.Getenv("USER"),
"timestamp", time.Now())
// Before Kafka connection (Line 1084)
logger.Info("kafka connection attempt",
"broker", host,
"security_protocol", props["security.protocol"],
"sasl_mechanism", props["sasl.mechanism"],
"username", props["sasl.username"]) // Don't log password!
// After successful authentication
logger.Info("kafka authentication successful",
"broker", host,
"mechanism", props["sasl.mechanism"])
// On authentication failure
logger.Error("kafka authentication failed",
"broker", host,
"mechanism", props["sasl.mechanism"],
"error", err)
// AI API usage (Line 165)
logger.Info("ai api request",
"provider", c.config.Provider,
"endpoint", c.config.APIEndpoint,
"model", c.config.Model)Step 3: Log File Access
// In loadProperties (Line 346)
logger.Info("loading configuration file",
"path", path,
"size", info.Size())
// In checkLicense (Line 232)
logger.Info("license validation",
"file", "license.key",
"valid", true/false){"time":"2025-01-13T14:30:22Z","level":"INFO","msg":"configuration loaded","file":"client.properties","properties_count":8,"user":"admin"}
{"time":"2025-01-13T14:30:23Z","level":"INFO","msg":"kafka connection attempt","broker":"broker.example.com:9092","security_protocol":"SASL_SSL","sasl_mechanism":"SCRAM-SHA-256"}
{"time":"2025-01-13T14:30:24Z","level":"INFO","msg":"kafka authentication successful","broker":"broker.example.com:9092","mechanism":"SCRAM-SHA-256"}
{"time":"2025-01-13T14:30:25Z","level":"ERROR","msg":"tls handshake failed","broker":"broker.example.com:9092","error":"certificate expired"}Severity: MEDIUM
CVSS Score: 5.3 (Medium)
Location: main.go:174, 190
Verbose error messages expose internal details:
// Line 174
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("AI API returned a non-200 status code: %d %s",
resp.StatusCode, string(body))
// ⚠️ Exposes full API error response
}
// Line 190
return nil, fmt.Errorf("could not unmarshal AI JSON response: %w. Raw response: %s",
err, apiResp.Choices[0].Message.Content)
// ⚠️ Exposes raw AI response content// Line 174
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
// ✅ Log full error, return sanitized message
logger.Error("ai api error",
"status_code", resp.StatusCode,
"response_body", string(body))
return nil, fmt.Errorf("AI API request failed with status %d", resp.StatusCode)
}
// Line 190
// ✅ Log full error, return generic message
logger.Error("ai response parse error",
"error", err,
"raw_response", apiResp.Choices[0].Message.Content)
return nil, fmt.Errorf("failed to parse AI response: %w", err)Severity: MEDIUM
Location: main.go:1198-1201
if strings.HasPrefix(providerConfig.APIKey, "YOUR_") || providerConfig.APIKey == "" {
fmt.Fprintf(os.Stderr, "Error: API key for provider '%s' is a placeholder...\n")
os.Exit(1)
}
// ⚠️ Only checks prefix, doesn't validate key formatfunc validateAPIKey(provider, key string) error {
if key == "" {
return fmt.Errorf("API key is empty")
}
if strings.HasPrefix(key, "YOUR_") {
return fmt.Errorf("API key is a placeholder")
}
// Provider-specific validation
switch provider {
case "openai":
if !strings.HasPrefix(key, "sk-") || len(key) < 20 {
return fmt.Errorf("invalid OpenAI API key format")
}
case "scalytics":
if len(key) < 32 {
return fmt.Errorf("invalid Scalytics API key format")
}
}
return nil
}
// Usage
if err := validateAPIKey(providerConfig.Provider, providerConfig.APIKey); err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}Severity: MEDIUM
Location: go.mod, go.sum
github.com/segmentio/kafka-go v0.4.49
github.com/klauspost/compress v1.15.9
golang.org/x/net v0.38.0
1. Run Vulnerability Scan:
# Install govulncheck
go install golang.org/x/vuln/cmd/govulncheck@latest
# Scan for vulnerabilities
govulncheck ./...2. Check for Updates:
go list -m -u all3. Add Dependabot Configuration:
Create .github/dependabot.yml:
version: 2
updates:
- package-ecosystem: "gomod"
directory: "/"
schedule:
interval: "weekly"
open-pull-requests-limit: 10
reviewers:
- "security-team"
labels:
- "dependencies"
- "security"4. Add to CI/CD:
# .github/workflows/security.yml
name: Security Scan
on:
push:
branches: [main]
pull_request:
schedule:
- cron: '0 0 * * 0' # Weekly
jobs:
govulncheck:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: '1.23'
- name: Install govulncheck
run: go install golang.org/x/vuln/cmd/govulncheck@latest
- name: Run govulncheck
run: govulncheck ./...# Create secure configuration directory
mkdir -p ~/.kshark
chmod 700 ~/.kshark
# Create configuration with restricted permissions
cat > ~/.kshark/client.properties <<EOF
bootstrap.servers=broker.example.com:9092
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-256
sasl.username=your-username
sasl.password=your-password
EOF
chmod 600 ~/.kshark/client.properties# Set credentials in environment
export KAFKA_USERNAME="my-api-key"
export KAFKA_PASSWORD="my-secret"
# Reference in properties file
cat > client.properties <<EOF
bootstrap.servers=broker.example.com:9092
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-256
sasl.username=${KAFKA_USERNAME}
sasl.password=${KAFKA_PASSWORD}
EOF# Add to .gitignore
cat >> .gitignore <<EOF
*.properties
ai_config.json
license.key
*.key
*.pem
*.crt
*secret*
*credential*
EOF# ✅ GOOD - TLS enabled
security.protocol=SASL_SSL
# ❌ BAD - No encryption
security.protocol=SASL_PLAINTEXT# ✅ BETTER - SCRAM-SHA-256
sasl.mechanism=SCRAM-SHA-256
# ⚠️ OKAY - PLAIN (with TLS only)
sasl.mechanism=PLAINAll user input must be validated:
- ✅ Hostname validation (implemented)
- ✅ Path traversal prevention (implemented)
⚠️ URL validation (needs implementation)
- ✅ Redact credentials in all outputs
- ✅ Never log passwords
⚠️ Implement secret rotation support
# Regular updates
go get -u ./...
go mod tidy
# Security scanning
govulncheck ./...- No hardcoded credentials
- All user input validated
- Errors don't leak sensitive data
- TLS used for all external connections
- Credentials redacted in logs
- File permissions checked
- URLs validated before use
# Broker connection
bootstrap.servers=${KAFKA_BOOTSTRAP_SERVERS}
# Security - Always use SASL_SSL
security.protocol=SASL_SSL
# Authentication - Prefer SCRAM
sasl.mechanism=SCRAM-SHA-256
sasl.username=${KAFKA_USERNAME}
sasl.password=${KAFKA_PASSWORD}
# TLS - Always verify certificates
ssl.ca.location=/path/to/ca-cert.pem
# Optionally use client certificates
ssl.certificate.location=/path/to/client-cert.pem
ssl.key.location=/path/to/client-key.pem
# Schema Registry (validate URL!)
schema.registry.url=https://schema-registry.example.com
basic.auth.user.info=${SR_KEY}:${SR_SECRET}kubectl create secret generic kshark-credentials \
--from-literal=bootstrap.servers=broker.example.com:9092 \
--from-literal=sasl.username=my-username \
--from-literal=sasl.password=my-password \
--dry-run=client -o yaml | kubectl apply -f -# Create secrets
echo "my-username" | docker secret create kafka_username -
echo "my-password" | docker secret create kafka_password -
# Use in compose
services:
kshark:
image: kshark:latest
secrets:
- kafka_username
- kafka_password
environment:
- KAFKA_USERNAME_FILE=/run/secrets/kafka_username
- KAFKA_PASSWORD_FILE=/run/secrets/kafka_passwordDO NOT open public GitHub issues for security vulnerabilities.
Instead, please:
- Email: [email protected]
- Subject: [SECURITY] kshark vulnerability report
- Include:
- Description of the vulnerability
- Steps to reproduce
- Potential impact
- Suggested fix (if available)
- 24 hours: Acknowledgment of report
- 7 days: Initial assessment
- 30 days: Fix developed and tested
- Coordinated disclosure: After fix is released
Security advisories are published at:
- GitHub Security Advisories
- Release notes for patched versions
-
CRITICAL: Implement SSRF protection
- Add
isAllowedURL()function - Add
isPrivateIP()function - Update Schema Registry check
- Update REST Proxy check
- Add redirect validation
- Add
-
HIGH: Environment variable support
- Update
loadProperties()function - Document usage
- Update example files
- Update
-
HIGH: File permission validation
- Add permission check function
- Warn on insecure permissions
- Document secure permissions
-
HIGH: Structured security logging
- Implement slog integration
- Log authentication events
- Log file access events
- Log API usage
-
MEDIUM: Error message sanitization
- Remove sensitive data from errors
- Implement error filtering
- Update error handling
-
MEDIUM: API key validation
- Provider-specific validation
- Format verification
- Optional key verification API calls
-
MEDIUM: Dependency scanning automation
- Add govulncheck to CI/CD
- Configure Dependabot
- Set up automated updates
-
MEDIUM: Secret manager integration
- AWS Secrets Manager support
- HashiCorp Vault support
- Azure Key Vault support
-
LOW: Security testing
- Add security test suite
- Implement fuzzing
- Add integration tests
- Configuration file encryption
- Certificate pinning support
- Automated credential rotation
- Security metrics export (Prometheus)
- SIEM integration support
- SOC 2 compliance documentation
- All credentials use environment variables or secret managers
- Configuration files have 0600 permissions
- SSRF protection implemented
- TLS 1.2+ enforced
- Certificate validation enabled
- Security logging configured
- Dependency vulnerabilities scanned
- Container image scanned (if using Docker)
- Network policies configured (if using Kubernetes)
- Secrets stored in Kubernetes Secrets (not ConfigMaps)
- Non-root user execution
- Read-only filesystem (where possible)
- Minimal container capabilities
- Network policies enforced
- Security monitoring enabled
- Regular security scans scheduled
- Incident response plan documented
These controls should be maintained in all future versions:
-
✅ Command Injection Prevention (Lines 829-840)
func isValidHostname(host string) bool { re := regexp.MustCompile(`^[a-zA-Z0-9\.\-]+$`) return re.MatchString(host) }
-
✅ Credential Redaction (Lines 1339-1350)
func redactProps(p map[string]string) map[string]string { // Redacts: password, secret, token, key, auth info }
-
✅ Path Traversal Prevention (Lines 1297-1316)
func createSafeReportPath(userInputPath string, safeSubDir string) (string, error) { cleanFilename := filepath.Base(userInputPath) // Prevents ../../../etc/passwd attacks }
-
✅ TLS Enforcement (Line 424)
conf := &tls.Config{MinVersion: tls.VersionTLS12}
-
✅ Certificate Expiry Monitoring (Lines 467-475)
-
✅ Timeout Controls - All network operations have timeouts
-
✅ Context Usage - Proper context propagation for cancellation
-
✅ Non-root Docker User (Dockerfile)
-
✅ HTML Template Auto-escaping (Go html/template)
-
✅ Minimal GitHub Actions Permissions
-
✅ Release Checksum Generation (.goreleaser.yaml)
-
✅ Safe JSON Deserialization (No reflection attacks)
Document Version: 1.0 Security Audit Date: 2025-11-13 Next Security Review: 2025-12-13 Contact: [email protected]