diff --git a/internal/commands/query.go b/internal/commands/query.go index 46b20fd..9c7f886 100644 --- a/internal/commands/query.go +++ b/internal/commands/query.go @@ -8,6 +8,7 @@ import ( "net/url" "os" "strings" + "time" "github.com/Topline-com/os-cli/internal/output" "github.com/Topline-com/os-cli/internal/topline" @@ -66,6 +67,12 @@ func runQueryCommand(args []string, stdout io.Writer, globals globalOptions) err return errors.New("usage: topline query sql --sql 'SELECT COUNT(*) FROM contacts' [--url https://os-mcp.topline.com]") } err = client.PostJSON(ctx, "/query/api/execute-sql", map[string]any{"sql": sqlText}, &result) + case "freshness": + return runQueryFreshness(ctx, client, stdout, globals) + case "snapshot": + return runQuerySnapshot(ctx, client, flags, stdout, globals) + case "audit": + return runQueryAudit(ctx, client, flags, stdout, globals) default: return fmt.Errorf("unknown query command %q; try topline query help", subcommand) } @@ -84,6 +91,13 @@ func printQueryHelp(w io.Writer) { _, _ = fmt.Fprintln(w, " topline query catalog") _, _ = fmt.Fprintln(w, " topline query explain --tables contacts,opportunities") _, _ = fmt.Fprintln(w, " topline query sql --sql 'SELECT COUNT(*) AS n FROM contacts'") + _, _ = fmt.Fprintln(w, " topline query freshness") + _, _ = fmt.Fprintln(w, " topline query snapshot --pipeline ") + _, _ = fmt.Fprintln(w, " topline query audit --pipeline [--since this-week-et] [--status open]") + _, _ = fmt.Fprintln(w, "") + _, _ = fmt.Fprintln(w, "Composite commands (Phase 3): one CLI call wraps the warehouse views") + _, _ = fmt.Fprintln(w, "shipped in Topline-com/os-mcp#1. Use these for pipeline audits instead") + _, _ = fmt.Fprintln(w, "of hand-stitched CTEs.") _, _ = fmt.Fprintln(w, "") _, _ = fmt.Fprintln(w, "Environment:") _, _ = fmt.Fprintln(w, " TOPLINE_QUERY_TOKEN Connection-bound token from https://os-mcp.topline.com/connect") @@ -221,3 +235,159 @@ func firstNonEmptyQueryFlag(values ...string) string { } return "" } + +// executeSQL runs one SQL against /query/api/execute-sql and returns the parsed +// response shape as map[string]any. Used by the composite commands to wrap the +// warehouse views shipped in Topline-com/os-mcp#1. +func executeSQL(ctx context.Context, client *topline.QueryClient, sql string) (map[string]any, error) { + var raw any + if err := client.PostJSON(ctx, "/query/api/execute-sql", map[string]any{"sql": sql}, &raw); err != nil { + return nil, err + } + if m, ok := raw.(map[string]any); ok { + return m, nil + } + return map[string]any{"raw": raw}, nil +} + +func runQueryFreshness(ctx context.Context, client *topline.QueryClient, stdout io.Writer, globals globalOptions) error { + const sqlText = "SELECT table_name, row_count, last_synced_at, lag_seconds FROM warehouse_freshness ORDER BY table_name" + result, err := executeSQL(ctx, client, sqlText) + if err != nil { + return err + } + return output.WriteJSON(stdout, result, globals.MaskPII) +} + +func runQuerySnapshot(ctx context.Context, client *topline.QueryClient, flags map[string]string, stdout io.Writer, globals globalOptions) error { + pipelineID := strings.TrimSpace(firstNonEmptyQueryFlag(flags["pipeline"], flags["pipelineId"], flags["pipelineID"])) + if pipelineID == "" { + return errors.New("usage: topline query snapshot --pipeline ") + } + status := strings.TrimSpace(flags["status"]) + if status == "" { + status = "open" + } + sqlText := fmt.Sprintf( + "SELECT pipeline_id, pipeline_name, pipeline_stage_id, stage_name, stage_position, opportunity_status, "+ + "opportunity_count, pipeline_value, CAST(avg_days_in_stage AS INTEGER) AS avg_days_in_stage "+ + "FROM pipeline_snapshot WHERE pipeline_id = %s%s ORDER BY stage_position", + sqlString(pipelineID), queryStatusClause("opportunity_status", status), + ) + result, err := executeSQL(ctx, client, sqlText) + if err != nil { + return err + } + return output.WriteJSON(stdout, result, globals.MaskPII) +} + +func runQueryAudit(ctx context.Context, client *topline.QueryClient, flags map[string]string, stdout io.Writer, globals globalOptions) error { + pipelineID := strings.TrimSpace(firstNonEmptyQueryFlag(flags["pipeline"], flags["pipelineId"], flags["pipelineID"])) + if pipelineID == "" { + return errors.New("usage: topline query audit --pipeline [--since this-week-et] [--status open]") + } + start, err := parseAuditTime(flags["since"], time.Now().AddDate(0, 0, -7)) + if err != nil { + return err + } + end, err := parseAuditTime(flags["until"], time.Now()) + if err != nil { + return err + } + since := start.UTC().Format(time.RFC3339) + until := end.UTC().Format(time.RFC3339) + status := strings.TrimSpace(flags["status"]) + if status == "" { + status = "open" + } + dealLimit := parsePositiveInt(flags["limit"], 25) + if dealLimit > 100 { + dealLimit = 100 + } + statusClause := queryStatusClause("opportunity_status", status) + + freshnessSQL := "SELECT table_name, row_count, last_synced_at, lag_seconds FROM warehouse_freshness ORDER BY table_name" + snapshotSQL := fmt.Sprintf( + "SELECT pipeline_id, pipeline_name, pipeline_stage_id, stage_name, stage_position, opportunity_status, "+ + "opportunity_count, pipeline_value, CAST(avg_days_in_stage AS INTEGER) AS avg_days_in_stage "+ + "FROM pipeline_snapshot WHERE pipeline_id = %s%s ORDER BY stage_position", + sqlString(pipelineID), statusClause, + ) + activitySQL := fmt.Sprintf( + "SELECT activity_class, direction, COUNT(DISTINCT source_id) AS unique_touches, "+ + "COUNT(DISTINCT opportunity_id) AS opportunities_touched, COUNT(DISTINCT contact_id) AS contacts_touched, "+ + "MIN(event_at) AS first_touch, MAX(event_at) AS last_touch "+ + "FROM pipeline_activity_window WHERE pipeline_id = %s%s AND event_at >= %s AND event_at <= %s "+ + "GROUP BY activity_class, direction ORDER BY unique_touches DESC", + sqlString(pipelineID), statusClause, sqlString(since), sqlString(until), + ) + dealsSQL := fmt.Sprintf( + "SELECT opportunity_id, opportunity_name, contact_id, pipeline_stage_id, owner_user_id, ROUND(monetary_value, 2) AS monetary_value, "+ + "COUNT(DISTINCT source_id) AS unique_touches, "+ + "COUNT(DISTINCT CASE WHEN activity_class = 'message' THEN source_id END) AS message_touches, "+ + "COUNT(DISTINCT CASE WHEN activity_class = 'call' THEN source_id END) AS call_touches, "+ + "COUNT(DISTINCT CASE WHEN activity_class = 'appointment' THEN source_id END) AS appointment_touches, "+ + "COUNT(DISTINCT CASE WHEN direction = 'inbound' THEN source_id END) AS inbound_touches, "+ + "COUNT(DISTINCT CASE WHEN direction = 'outbound' THEN source_id END) AS outbound_touches, "+ + "MIN(event_at) AS first_touch, MAX(event_at) AS last_touch "+ + "FROM pipeline_activity_window WHERE pipeline_id = %s%s AND event_at >= %s AND event_at <= %s "+ + "GROUP BY opportunity_id, opportunity_name, contact_id, pipeline_stage_id, owner_user_id, monetary_value "+ + "ORDER BY unique_touches DESC, monetary_value DESC LIMIT %d", + sqlString(pipelineID), statusClause, sqlString(since), sqlString(until), dealLimit, + ) + movementSQL := fmt.Sprintf( + "SELECT opportunity_id, opportunity_name, contact_id, pipeline_stage_id, stage_name, opportunity_status, monetary_value, "+ + "last_movement_at, last_movement_kind "+ + "FROM pipeline_movement_window WHERE pipeline_id = %s%s AND last_movement_at >= %s AND last_movement_at <= %s "+ + "ORDER BY last_movement_at DESC", + sqlString(pipelineID), statusClause, sqlString(since), sqlString(until), + ) + + freshness, err := executeSQL(ctx, client, freshnessSQL) + if err != nil { + return err + } + snapshot, err := executeSQL(ctx, client, snapshotSQL) + if err != nil { + return err + } + activity, err := executeSQL(ctx, client, activitySQL) + if err != nil { + return err + } + deals, err := executeSQL(ctx, client, dealsSQL) + if err != nil { + return err + } + movement, err := executeSQL(ctx, client, movementSQL) + if err != nil { + return err + } + + result := map[string]any{ + "pipelineId": pipelineID, + "window": map[string]string{"since": since, "until": until}, + "status": status, + "freshness": freshness, + "snapshot": snapshot, + "activity": activity, + "deals": deals, + "movement": movement, + } + return output.WriteJSON(stdout, result, globals.MaskPII) +} + +// sqlString quotes a value as a SQLite single-quoted literal. Used because the +// query API doesn't yet accept bind parameters — pipeline IDs are caller-controlled +// CRM identifiers, not user input, but we still escape embedded single-quotes. +func sqlString(v string) string { + return "'" + strings.ReplaceAll(v, "'", "''") + "'" +} + +func queryStatusClause(column string, status string) string { + trimmed := strings.TrimSpace(status) + if trimmed == "" || strings.EqualFold(trimmed, "all") || strings.EqualFold(trimmed, "any") { + return "" + } + return fmt.Sprintf(" AND %s = %s", column, sqlString(trimmed)) +} diff --git a/internal/commands/query_test.go b/internal/commands/query_test.go index f7dc5b6..9641dde 100644 --- a/internal/commands/query_test.go +++ b/internal/commands/query_test.go @@ -241,3 +241,135 @@ func TestQueryDoctorFlagsMissingTables(t *testing.T) { t.Fatalf("recommendation should call out coverage gap; got %q", rec) } } + +func TestQueryFreshnessHitsExecuteSQLEndpoint(t *testing.T) { + t.Setenv("TOPLINE_QUERY_TOKEN", "signed-query-token") + + var gotSQL string + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/query/api/execute-sql" { + t.Fatalf("path = %q", r.URL.Path) + } + var body struct { + SQL string `json:"sql"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + t.Fatalf("decode body: %v", err) + } + gotSQL = body.SQL + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"columns":["table_name","row_count","last_synced_at","lag_seconds"],"rows":[]}`)) + })) + defer server.Close() + t.Setenv("TOPLINE_QUERY_BASE_URL", server.URL) + + var stdout bytes.Buffer + if err := Execute([]string{"query", "freshness"}, &stdout, io.Discard); err != nil { + t.Fatalf("Execute returned error: %v", err) + } + if !strings.Contains(gotSQL, "FROM warehouse_freshness") { + t.Fatalf("freshness SQL did not reference warehouse_freshness view; got %q", gotSQL) + } +} + +func TestQuerySnapshotRequiresPipeline(t *testing.T) { + t.Setenv("TOPLINE_QUERY_TOKEN", "signed-query-token") + t.Setenv("TOPLINE_QUERY_BASE_URL", "http://127.0.0.1:1") + + var stdout bytes.Buffer + err := Execute([]string{"query", "snapshot"}, &stdout, io.Discard) + if err == nil || !strings.Contains(err.Error(), "--pipeline") { + t.Fatalf("expected --pipeline usage error, got %v", err) + } +} + +func TestQuerySnapshotQueriesPipelineSnapshotView(t *testing.T) { + t.Setenv("TOPLINE_QUERY_TOKEN", "signed-query-token") + + var gotSQL string + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var body struct { + SQL string `json:"sql"` + } + _ = json.NewDecoder(r.Body).Decode(&body) + gotSQL = body.SQL + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"columns":[],"rows":[]}`)) + })) + defer server.Close() + t.Setenv("TOPLINE_QUERY_BASE_URL", server.URL) + + var stdout bytes.Buffer + if err := Execute([]string{"query", "snapshot", "--pipeline", "CLUy1QapsrEeBiNrmQiL"}, &stdout, io.Discard); err != nil { + t.Fatalf("Execute returned error: %v", err) + } + if !strings.Contains(gotSQL, "FROM pipeline_snapshot") { + t.Fatalf("snapshot SQL missing pipeline_snapshot view; got %q", gotSQL) + } + if !strings.Contains(gotSQL, "'CLUy1QapsrEeBiNrmQiL'") { + t.Fatalf("snapshot SQL did not bind pipeline id; got %q", gotSQL) + } +} + +func TestQueryAuditCallsCompositeViewsInOneInvocation(t *testing.T) { + t.Setenv("TOPLINE_QUERY_TOKEN", "signed-query-token") + + var sqls []string + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/query/api/execute-sql" { + t.Fatalf("path = %q", r.URL.Path) + } + var body struct { + SQL string `json:"sql"` + } + _ = json.NewDecoder(r.Body).Decode(&body) + sqls = append(sqls, body.SQL) + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"columns":[],"rows":[]}`)) + })) + defer server.Close() + t.Setenv("TOPLINE_QUERY_BASE_URL", server.URL) + + var stdout bytes.Buffer + err := Execute( + []string{"query", "audit", "--pipeline", "CLUy1QapsrEeBiNrmQiL", "--since", "2026-05-11", "--until", "2026-05-13"}, + &stdout, io.Discard, + ) + if err != nil { + t.Fatalf("Execute returned error: %v", err) + } + if len(sqls) != 5 { + t.Fatalf("expected 5 SQL calls (freshness/snapshot/activity/deals/movement), got %d", len(sqls)) + } + joined := strings.Join(sqls, "\n") + for _, view := range []string{"warehouse_freshness", "pipeline_snapshot", "pipeline_activity_window", "pipeline_movement_window"} { + if !strings.Contains(joined, view) { + t.Fatalf("expected audit to hit %s view; got SQLs:\n%s", view, joined) + } + } + for _, required := range []string{"pipeline_stage_id", "opportunity_status = 'open'", "COUNT(DISTINCT source_id) AS unique_touches", "CAST(avg_days_in_stage AS INTEGER)"} { + if !strings.Contains(joined, required) { + t.Fatalf("expected audit SQL to include %q; got SQLs:\n%s", required, joined) + } + } + out := map[string]any{} + if err := json.Unmarshal(stdout.Bytes(), &out); err != nil { + t.Fatalf("decode audit output: %v", err) + } + for _, key := range []string{"freshness", "snapshot", "activity", "deals", "movement", "pipelineId", "window", "status"} { + if _, ok := out[key]; !ok { + t.Fatalf("audit output missing %q key: %s", key, stdout.String()) + } + } +} + +func TestQueryAuditRequiresPipeline(t *testing.T) { + t.Setenv("TOPLINE_QUERY_TOKEN", "signed-query-token") + t.Setenv("TOPLINE_QUERY_BASE_URL", "http://127.0.0.1:1") + + var stdout bytes.Buffer + err := Execute([]string{"query", "audit"}, &stdout, io.Discard) + if err == nil || !strings.Contains(err.Error(), "--pipeline") { + t.Fatalf("expected --pipeline usage error, got %v", err) + } +}