diff --git a/README.md b/README.md index 483a7c4..2d6c0b3 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,10 @@ The following environment variables can be used to configure the application: - `SLOGGO_TCP_PORT`: Port for the TCP Syslog listener (default: `6514`). - `SLOGGO_API_PORT`: Port for the API (default: `8080`). - `SLOGGO_LOG_RETENTION_MINUTES`: Duration in minutes to keep logs before deletion (default: `43200` - 30 days). +- `SLOGGO_LOG_FORMAT`: Log parsing format (default: `auto`). Supported values: + - `auto`: Try RFC 5424 first, then fall back to RFC 3164. + - `RFC5424`: Only parse messages as RFC 5424. + - `RFC3164`: Only parse messages as RFC 3164. ## What Sloggo is diff --git a/backend/db/store_test.go b/backend/db/store_test.go index 588a3b2..7836794 100644 --- a/backend/db/store_test.go +++ b/backend/db/store_test.go @@ -30,8 +30,6 @@ func TestStoreLogEntry(t *testing.T) { t.Fatalf("Failed to process batch: %v", err) } - // time.Sleep(100 * time.Millisecond) - db := GetDBInstance() rows, err := db.Query(` SELECT severity, facility, version, hostname, app_name, procid, msgid, structured_data, msg @@ -138,9 +136,6 @@ func TestBatchProcessing(t *testing.T) { t.Fatalf("Failed to process batch: %v", err) } - // Wait for processing to complete - // time.Sleep(100 * time.Millisecond) - // Verify all entries are in the database db := GetDBInstance() rows, err := db.Query(` diff --git a/backend/formats/rfc3164.go b/backend/formats/rfc3164.go new file mode 100644 index 0000000..2a2f2b5 --- /dev/null +++ b/backend/formats/rfc3164.go @@ -0,0 +1,102 @@ +package formats + +import ( + "errors" + "regexp" + "sloggo/models" + "strconv" + "strings" + "time" +) + +var ( + // Example: <34>Oct 11 22:14:15 mymachine su[123]: 'su root' failed + rfc3164Regex = regexp.MustCompile(`^<(?P\d{1,3})>(?P[A-Z][a-z]{2}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\s+(?P\S+)\s+(?P[A-Za-z0-9_.\-\/]+)(?:\[(?P[^\]]+)\])?:\s*(?P[\s\S]*)$`) +) + +// ParseRFC3164ToLogEntry parses an RFC3164 (BSD) syslog line into a LogEntry +// Best-effort: fills missing fields with defaults compatible with the DB schema +func ParseRFC3164ToLogEntry(line string) (*models.LogEntry, error) { + line = strings.TrimSpace(line) + if line == "" { + return nil, errors.New("empty message") + } + + m := rfc3164Regex.FindStringSubmatch(line) + if m == nil { + return nil, errors.New("not rfc3164 format") + } + + // Extract named groups + groups := make(map[string]string) + for i, name := range rfc3164Regex.SubexpNames() { + if i != 0 && name != "" { + groups[name] = m[i] + } + } + + // Priority -> facility/severity + pri, err := strconv.Atoi(groups["pri"]) + if err != nil { + return nil, err + } + // Validate priority range (0-191) + if pri < 0 || pri > 191 { + return nil, errors.New("priority out of range (must be 0-191)") + } + facility := uint8(pri / 8) + severity := uint8(pri % 8) + + // Timestamp (no year) e.g. "Oct 11 22:14:15" + // RFC3164 doesn't include year, so we need to infer it + now := time.Now() + tsStr := groups["ts"] + // time layout with optional leading space in day + // Jan _2 15:04:05 handles single-digit days + tsParsed, err := time.ParseInLocation("Jan _2 15:04:05", tsStr, now.Location()) + if err != nil { + return nil, errors.New("failed to parse timestamp: " + err.Error()) + } + + // Infer year: start with current year + year := now.Year() + ts := time.Date(year, tsParsed.Month(), tsParsed.Day(), tsParsed.Hour(), tsParsed.Minute(), tsParsed.Second(), 0, now.Location()) + + // Handle year boundary: if it's January and we receive December logs, they're from last year + if now.Month() == time.January && tsParsed.Month() == time.December { + year-- + ts = time.Date(year, tsParsed.Month(), tsParsed.Day(), tsParsed.Hour(), tsParsed.Minute(), tsParsed.Second(), 0, now.Location()) + } + + hostname := groups["host"] + if hostname == "" { + hostname = "-" + } + + appName := groups["tag"] + if appName == "" { + appName = "-" + } + + procID := groups["pid"] + if procID == "" { + procID = "-" + } + + msg := groups["msg"] + + entry := &models.LogEntry{ + Severity: severity, + Facility: facility, + Version: 1, + Timestamp: ts, + Hostname: hostname, + AppName: appName, + ProcID: procID, + MsgID: "-", + StructuredData: "-", + Message: msg, + } + + return entry, nil +} diff --git a/backend/formats/rfc3164_test.go b/backend/formats/rfc3164_test.go new file mode 100644 index 0000000..ae8d0b4 --- /dev/null +++ b/backend/formats/rfc3164_test.go @@ -0,0 +1,135 @@ +package formats + +import ( + "testing" + "time" +) + +func TestParseRFC3164ToLogEntry_Basic(t *testing.T) { + line := "<34>Oct 11 22:14:15 mymachine su: 'su root' failed for lonvick on /dev/pts/8" + entry, err := ParseRFC3164ToLogEntry(line) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if entry.Facility != 4 || entry.Severity != 2 { // 34 / 8 = 4, 34 % 8 = 2 + t.Errorf("facility/severity mismatch: got (%d,%d)", entry.Facility, entry.Severity) + } + if entry.Hostname != "mymachine" { + t.Errorf("hostname: got %q", entry.Hostname) + } + if entry.AppName != "su" { + t.Errorf("appname: got %q", entry.AppName) + } + if entry.ProcID != "-" { + t.Errorf("procid: got %q", entry.ProcID) + } + if entry.Message != "'su root' failed for lonvick on /dev/pts/8" { + t.Errorf("message: got %q", entry.Message) + } + if entry.Timestamp.IsZero() { + t.Error("timestamp should not be zero") + } +} + +func TestParseRFC3164ToLogEntry_WithPID(t *testing.T) { + line := "<190>Nov 6 09:01:02 esphome-device esphome[1234]: Sensor reading: 42" + entry, err := ParseRFC3164ToLogEntry(line) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if entry.Facility != 23 || entry.Severity != 6 { // 190 / 8 = 23, 190 % 8 = 6 + t.Errorf("facility/severity mismatch: got (%d,%d)", entry.Facility, entry.Severity) + } + if entry.Hostname != "esphome-device" { + t.Errorf("hostname: got %q", entry.Hostname) + } + if entry.AppName != "esphome" { + t.Errorf("appname: got %q", entry.AppName) + } + if entry.ProcID != "1234" { + t.Errorf("procid: got %q", entry.ProcID) + } + if entry.Message != "Sensor reading: 42" { + t.Errorf("message: got %q", entry.Message) + } +} + +func TestParseRFC3164ToLogEntry_MultilineMessage(t *testing.T) { + line := "<134>Feb 1 11:37:00 modbus-ble-bridge mdns: [C][mdns:124]: mDNS:\n\n Hostname: modbus-ble-bridge" + entry, err := ParseRFC3164ToLogEntry(line) + if err != nil { + t.Fatalf("unexpected error parsing multiline: %v", err) + } + if entry.Facility != 16 || entry.Severity != 6 { // 134 / 8 = 16, 134 % 8 = 6 + t.Errorf("facility/severity mismatch: got (%d,%d)", entry.Facility, entry.Severity) + } + if entry.Hostname != "modbus-ble-bridge" { + t.Errorf("hostname: got %q", entry.Hostname) + } + if entry.AppName != "mdns" { + t.Errorf("appname: got %q", entry.AppName) + } + expectedMsg := "[C][mdns:124]: mDNS:\n\n Hostname: modbus-ble-bridge" + if entry.Message != expectedMsg { + t.Errorf("message mismatch:\nexpected: %q\n got: %q", expectedMsg, entry.Message) + } +} + +func TestParseRFC3164ToLogEntry_YearBoundary(t *testing.T) { + // Test year boundary handling: December logs received in January + now := time.Now() + + line := "<34>Dec 31 23:59:59 testhost app: Year boundary test" + entry, err := ParseRFC3164ToLogEntry(line) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // If we're in January and the log says December, it must be from last year + if now.Month() == time.January { + if entry.Timestamp.Month() != time.December { + t.Errorf("expected December month, got %v", entry.Timestamp.Month()) + } + expectedYear := now.Year() - 1 + if entry.Timestamp.Year() != expectedYear { + t.Errorf("expected year %d for December log in January, got %d", expectedYear, entry.Timestamp.Year()) + } + } + + // Verify the parsed values + if entry.Hostname != "testhost" { + t.Errorf("hostname: got %q", entry.Hostname) + } + if entry.AppName != "app" { + t.Errorf("appname: got %q", entry.AppName) + } +} + +func TestParseRFC3164ToLogEntry_InvalidPriority(t *testing.T) { + // Test priority out of range + testCases := []struct { + name string + line string + }{ + {"priority too high", "<192>Oct 11 22:14:15 mymachine su: test"}, + {"priority too high 2", "<999>Oct 11 22:14:15 mymachine su: test"}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + _, err := ParseRFC3164ToLogEntry(tc.line) + if err == nil { + t.Error("expected error for invalid priority, got nil") + } + }) + } +} + +func TestParseRFC3164ToLogEntry_InvalidTimestamp(t *testing.T) { + // Test invalid timestamp format + line := "<34>Invalid 99 99:99:99 mymachine su: test" + _, err := ParseRFC3164ToLogEntry(line) + if err == nil { + t.Error("expected error for invalid timestamp, got nil") + } +} diff --git a/backend/listener/tcp.go b/backend/listener/tcp.go index efb2298..de64437 100644 --- a/backend/listener/tcp.go +++ b/backend/listener/tcp.go @@ -11,9 +11,22 @@ import ( "sync" "time" + "github.com/leodido/go-syslog/v4" "github.com/leodido/go-syslog/v4/rfc5424" ) +var ( + rfc5424Parser syslog.Machine + parserOnce sync.Once +) + +func getRFC5424Parser() syslog.Machine { + parserOnce.Do(func() { + rfc5424Parser = rfc5424.NewParser(rfc5424.WithBestEffort()) + }) + return rfc5424Parser +} + func StartTCPListener() { port := utils.TcpPort @@ -69,7 +82,6 @@ func handleTCPConnection(conn net.Conn) { defer conn.Close() scanner := bufio.NewScanner(conn) - parser := rfc5424.NewParser(rfc5424.WithBestEffort()) // Configure scanner with a larger buffer for bigger messages const maxScanSize = 1024 * 1024 // 1MB max message size @@ -103,30 +115,43 @@ func handleTCPConnection(conn net.Conn) { continue } - // Parse the message - syslogMsg, err := parser.Parse([]byte(message)) - if err != nil { - log.Printf("Failed to parse message: %v: %s", err, message) - continue - } - - // Convert to RFC5424 syslog message - rfc5424Msg, ok := syslogMsg.(*rfc5424.SyslogMessage) - if !ok { - log.Printf("Parsed message is not a valid RFC5424 message: %s", message) - continue + parsed := false + var lastErr error + + logFormat := utils.GetLogFormat() + + // Try RFC5424 if enabled + if logFormat == "rfc5424" || logFormat == "auto" { + parser := getRFC5424Parser() + if syslogMsg, err := parser.Parse([]byte(message)); err == nil { + if rfc5424Msg, ok := syslogMsg.(*rfc5424.SyslogMessage); ok { + logEntry := formats.SyslogMessageToLogEntry(rfc5424Msg) + if logEntry != nil { + if err := db.StoreLog(*logEntry); err != nil { + log.Printf("Error storing log: %v", err) + } + parsed = true + } + } + } else { + lastErr = err + } } - // Convert directly to LogEntry for efficient DuckDB insertion - logEntry := formats.SyslogMessageToLogEntry(rfc5424Msg) - - if logEntry == nil { - log.Printf("Failed to convert message to LogEntry: %s", message) + // Try RFC3164 if enabled and not yet parsed + if !parsed && (logFormat == "rfc3164" || logFormat == "auto") { + if logEntry, err := formats.ParseRFC3164ToLogEntry(message); err == nil { + if err := db.StoreLog(*logEntry); err != nil { + log.Printf("Error storing log: %v", err) + } + parsed = true + } else { + lastErr = err + } } - // Store log without blocking if possible - if err := db.StoreLog(*logEntry); err != nil { - log.Printf("Error storing log: %v", err) + if !parsed { + log.Printf("Failed to parse message with format %s: %v: %s", logFormat, lastErr, message) } } } diff --git a/backend/listener/tcp_test.go b/backend/listener/tcp_test.go index 9dcaae0..4bfd359 100644 --- a/backend/listener/tcp_test.go +++ b/backend/listener/tcp_test.go @@ -4,6 +4,7 @@ import ( "fmt" "net" "sloggo/db" + "sloggo/utils" "strings" "testing" "time" @@ -25,6 +26,12 @@ func sendTCPMessage(t *testing.T, conn net.Conn, message string) { } func TestTCPListener(t *testing.T) { + // Save original LogFormat and restore at the end + originalLogFormat := utils.GetLogFormat() + defer func() { + utils.SetLogFormat(originalLogFormat) + }() + // Clean the database before starting tests db := db.GetDBInstance() _, err := db.Exec("DELETE FROM logs") @@ -59,12 +66,18 @@ func TestTCPListener(t *testing.T) { } defer conn.Close() - // Run test cases sequentially on the same connection - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { + // Run test cases sequentially on the same connection for different log formats + formats := []string{"auto", "rfc5424", "rfc3164"} + for _, format := range formats { + // Set format for this test group using thread-safe function + utils.SetLogFormat(format) + + for _, tc := range testCases { + testName := fmt.Sprintf("%s_%s", format, tc.name) + t.Logf("Running test: %s", testName) sendTCPMessage(t, conn, tc.message) // No need to explicitly force batch processing - handled in verifyLogEntry verifyLogEntry(t, tc) - }) + } } } diff --git a/backend/listener/test_helpers.go b/backend/listener/test_helpers.go index 99e7dd4..7d4d3f8 100644 --- a/backend/listener/test_helpers.go +++ b/backend/listener/test_helpers.go @@ -167,5 +167,35 @@ func getTestCases() []testCase { shouldError: false, }, }, + { + name: "RFC3164 basic without pid", + message: "<34>Oct 11 22:14:15 mymachine su: 'su root' failed for lonvick on /dev/pts/8", + expected: expectedResult{ + facility: 4, + severity: 2, + hostname: "mymachine", + appName: "su", + procid: "-", + msgid: "-", + structuredData: "-", + msg: "'su root' failed for lonvick on /dev/pts/8", + shouldError: false, + }, + }, + { + name: "RFC3164 with pid typical esphome", + message: "<190>Nov 6 09:01:02 esphome-device esphome[1234]: Sensor reading: 42", + expected: expectedResult{ + facility: 23, + severity: 6, + hostname: "esphome-device", + appName: "esphome", + procid: "1234", + msgid: "-", + structuredData: "-", + msg: "Sensor reading: 42", + shouldError: false, + }, + }, } } diff --git a/backend/listener/udp.go b/backend/listener/udp.go index 7c96256..00fb180 100644 --- a/backend/listener/udp.go +++ b/backend/listener/udp.go @@ -10,9 +10,22 @@ import ( "sync" "time" + "github.com/leodido/go-syslog/v4" "github.com/leodido/go-syslog/v4/rfc5424" ) +var ( + udpRFC5424Parser syslog.Machine + udpParserOnce sync.Once +) + +func getUDPRFC5424Parser() syslog.Machine { + udpParserOnce.Do(func() { + udpRFC5424Parser = rfc5424.NewParser(rfc5424.WithBestEffort()) + }) + return udpRFC5424Parser +} + func StartUDPListener() { port := utils.UdpPort @@ -83,9 +96,6 @@ func StartUDPListener() { // processUDPMessage handles processing of a single UDP message func processUDPMessage(message []byte) { - // Create a parser with best effort mode - parser := rfc5424.NewParser(rfc5424.WithBestEffort()) - // Process the input using go-syslog parser input := string(message) @@ -99,30 +109,43 @@ func processUDPMessage(message []byte) { continue // Skip empty messages } - // Parse the message - syslogMsg, err := parser.Parse([]byte(part)) - if err != nil { - log.Printf("Failed to parse UDP message: %v: %s", err, input) - continue - } - - // Convert to RFC5424 syslog message - rfc5424Msg, ok := syslogMsg.(*rfc5424.SyslogMessage) - if !ok { - log.Printf("Parsed UDP message is not a valid RFC5424 message: %s", input) - continue + parsed := false + var lastErr error + + // Get current log format in a thread-safe manner + logFormat := utils.GetLogFormat() + + // Try RFC5424 if enabled + if logFormat == "rfc5424" || logFormat == "auto" { + parser := getUDPRFC5424Parser() + if syslogMsg, err := parser.Parse([]byte(part)); err == nil { + if rfc5424Msg, ok := syslogMsg.(*rfc5424.SyslogMessage); ok { + if logEntry := formats.SyslogMessageToLogEntry(rfc5424Msg); logEntry != nil { + if err := db.StoreLog(*logEntry); err != nil { + log.Printf("Error storing UDP log: %v", err) + } + parsed = true + } + } + } else { + lastErr = err + } } - // Convert directly to LogEntry for efficient DuckDB insertion - logEntry := formats.SyslogMessageToLogEntry(rfc5424Msg) - - if logEntry == nil { - log.Printf("Failed to convert message to LogEntry: %s", message) + // Try RFC3164 if enabled and not yet parsed + if !parsed && (logFormat == "rfc3164" || logFormat == "auto") { + if logEntry, err := formats.ParseRFC3164ToLogEntry(part); err == nil { + if err := db.StoreLog(*logEntry); err != nil { + log.Printf("Error storing UDP log: %v", err) + } + parsed = true + } else { + lastErr = err + } } - // Store log without blocking if possible - if err := db.StoreLog(*logEntry); err != nil { - log.Printf("Error storing UDP log: %v", err) + if !parsed { + log.Printf("Failed to parse UDP message with format %s: %v: %s", logFormat, lastErr, input) } } } diff --git a/backend/listener/udp_test.go b/backend/listener/udp_test.go index af1e1e4..1544fda 100644 --- a/backend/listener/udp_test.go +++ b/backend/listener/udp_test.go @@ -3,6 +3,7 @@ package listener import ( "fmt" "net" + "sloggo/utils" "strings" "testing" "time" @@ -30,6 +31,12 @@ func sendUDPMessage(t *testing.T, addr string, message string) { } func TestUDPListener(t *testing.T) { + // Save original LogFormat and restore at the end + originalLogFormat := utils.GetLogFormat() + defer func() { + utils.SetLogFormat(originalLogFormat) + }() + checkSchema(t) port := 5514 @@ -40,10 +47,19 @@ func TestUDPListener(t *testing.T) { testCases := getTestCases() - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { + // Run test cases sequentially for different log formats + // We must test formats sequentially to avoid race conditions on utils.LogFormat + // Note: Not using nested t.Run() to ensure truly serial execution + formats := []string{"auto", "rfc5424", "rfc3164"} + for _, format := range formats { + // Set format for this test group using thread-safe function + utils.SetLogFormat(format) + + for _, tc := range testCases { + testName := fmt.Sprintf("%s_%s", format, tc.name) + t.Logf("Running test: %s", testName) sendUDPMessage(t, fmt.Sprintf("localhost:%d", port), tc.message) verifyLogEntry(t, tc) - }) + } } } diff --git a/backend/main.go b/backend/main.go index 386b2d7..cd2012d 100644 --- a/backend/main.go +++ b/backend/main.go @@ -1,6 +1,7 @@ package main import ( + "log" "slices" "sloggo/server" "sloggo/utils" @@ -9,6 +10,11 @@ import ( ) func main() { + // Startup configuration log + log.Printf("Sloggo version: %s", utils.Version) + log.Printf("Config: listeners=%v udp_port=%s tcp_port=%s api_port=%s", utils.Listeners, utils.UdpPort, utils.TcpPort, utils.ApiPort) + log.Printf("Config: log_format=%s debug=%t retention_minutes=%d", utils.GetLogFormat(), utils.Debug, utils.LogRetentionMinutes) + if slices.Contains(utils.Listeners, "udp") { go listener.StartUDPListener() } diff --git a/backend/utils/vars.go b/backend/utils/vars.go index deab8fd..64a4351 100644 --- a/backend/utils/vars.go +++ b/backend/utils/vars.go @@ -4,6 +4,7 @@ import ( "os" "strconv" "strings" + "sync" ) // The following variables are set at build time (see GitHub Action & Makefile) @@ -22,6 +23,29 @@ var Debug bool var Version string // Set via -X flag during build +// logFormat controls how incoming syslog messages are parsed. +// Supported values (case-insensitive): +// - "auto" : try RFC5424 first, then RFC3164 (default) +// - "rfc5424": only parse as RFC5424 +// - "rfc3164": only parse as RFC3164 +// Any other value falls back to "auto". +var logFormat string +var logFormatMutex sync.RWMutex + +// GetLogFormat returns the current log format in a thread-safe manner +func GetLogFormat() string { + logFormatMutex.RLock() + defer logFormatMutex.RUnlock() + return logFormat +} + +// SetLogFormat sets the log format in a thread-safe manner +func SetLogFormat(format string) { + logFormatMutex.Lock() + defer logFormatMutex.Unlock() + logFormat = format +} + func init() { Listeners = strings.Split(GetSanitizedEnvString("SLOGGO_LISTENERS", "tcp,udp"), ",") UdpPort = GetSanitizedEnvString("SLOGGO_UDP_PORT", "5514") @@ -29,6 +53,16 @@ func init() { ApiPort = GetSanitizedEnvString("SLOGGO_API_PORT", "8080") LogRetentionMinutes = GetSanitizedEnvInt64("SLOGGO_LOG_RETENTION_MINUTES", 30*24*60) // Default to 30 days Debug = GetSanitizedEnvString("SLOGGO_DEBUG", "false") == "true" + + // Configure log format selection + switch GetSanitizedEnvString("SLOGGO_LOG_FORMAT", "auto") { + case "rfc5424": + logFormat = "rfc5424" + case "rfc3164": + logFormat = "rfc3164" + default: + logFormat = "auto" + } } func GetSanitizedEnvString(key string, defaultValue string) string {