diff --git a/README.md b/README.md index 3f48370..f2c4538 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,18 @@ Optional for tests/proxies: export TOPLINE_BASE_URL="https://services.leadconnectorhq.com" ``` +SQL/query commands use the hosted `os-mcp` warehouse HTTP API, not the raw +LeadConnector REST API. That surface requires a connection-bound token minted by +the remote MCP, because raw PITs are intentionally rejected for read-only SQL: + +```bash +export TOPLINE_QUERY_TOKEN="signed_connection_token_from_/connect" +export TOPLINE_QUERY_BASE_URL="https://os-mcp.topline.com" # optional default +``` + +Generate `TOPLINE_QUERY_TOKEN` at using the +same PIT + Location ID. Keep it out of command history and commits. + ## Quick start ```bash @@ -79,6 +91,27 @@ so agents do not need a second lookup just to name the touched deals. If answer; run a fallback conversation/message join. Use `--skip-activity` when you only need the open count/value/stage breakdown. +Warehouse SQL/query API: + +```bash +topline --agent query schema +topline --agent query explain --tables opportunities,pipeline_stages,messages +topline --agent query sql --sql ' + SELECT status, COUNT(*) AS n, SUM(monetary_value) AS value + FROM opportunities + GROUP BY status + ORDER BY n DESC +' +``` + +`query` delegates to the hosted `Topline-com/os-mcp` SQL surface +(`/query/api/*`): schema/catalog discovery, table explanation, and safe +`SELECT` / `WITH ... SELECT` execution. The worker enforces the same read-only +SQL gate as MCP (`DDL`, `DML`, `PRAGMA`, `ATTACH`, multi-statement SQL, and +hidden tables are rejected) and caps results at 5,000 rows. Use this for +Streamlined-style analytics questions where a relational scan beats paginated +REST fan-out. + Local SQLite foundation: ```bash @@ -124,6 +157,7 @@ Parity command scaffolding exists for the public MCP action surface: Agent-native foundations included now: - `pipeline audit` with recent conversation scan + parallel message/task joins +- `query schema|catalog|explain|sql` against the hosted MCP warehouse HTTP API - `sync init` - `--agent` - `--mask-pii` diff --git a/docs/examples.md b/docs/examples.md index 84e43c7..f2ac286 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -20,6 +20,30 @@ sales question directly. If `activityJoinIncluded` is missing or false, use a fallback conversation/message join before reporting zero activity. Use `--skip-activity` for a fast snapshot-only count/value/stage breakdown. +## Warehouse SQL analytics + +```bash +export TOPLINE_QUERY_TOKEN="signed_connection_token_from_/connect" + +topline --agent query schema + +topline --agent query explain --tables opportunities,pipeline_stages,messages + +topline --agent query sql --sql ' + SELECT p.name AS pipeline, COUNT(*) AS open_deals, SUM(o.monetary_value) AS value + FROM opportunities o + JOIN pipelines p ON p.id = o.pipeline_id + WHERE o.status = "open" + GROUP BY p.name + ORDER BY value DESC +' +``` + +Use `query` for broad analytics and audits when SQL over the synced warehouse is +better than paginating live REST endpoints. The hosted MCP query API enforces +read-only SQLite: one `SELECT` / `WITH ... SELECT`, exposed tables only, 5,000-row +cap. + ## Search open opportunities ```bash diff --git a/internal/commands/execute.go b/internal/commands/execute.go index b97b361..0c4e961 100644 --- a/internal/commands/execute.go +++ b/internal/commands/execute.go @@ -44,6 +44,9 @@ func Execute(args []string, stdout, stderr io.Writer) error { if len(rest) >= 2 && rest[0] == "pipeline" && rest[1] == "audit" { return runPipelineAudit(rest[2:], stdout, globals) } + if len(rest) >= 1 && rest[0] == "query" { + return runQueryCommand(rest[1:], stdout, globals) + } if len(rest) >= 2 && rest[0] == "sync" && rest[1] == "init" { flags, err := parseFlags(rest[2:]) if err != nil { @@ -230,9 +233,11 @@ func printHelp(w io.Writer) { _, _ = fmt.Fprintln(w, "\nUsage:") _, _ = fmt.Fprintln(w, " topline [--flag value]") _, _ = fmt.Fprintln(w, " topline --agent pipeline audit --pipeline-id PIPE --since this-week-et") + _, _ = fmt.Fprintln(w, " topline --agent query sql --sql 'SELECT COUNT(*) AS n FROM opportunities'") _, _ = fmt.Fprintln(w, " topline raw request GET /contacts/ --query '{\"limit\":1}'") _, _ = fmt.Fprintln(w, "\nAgent-native commands:") _, _ = fmt.Fprintln(w, " pipeline audit --pipeline-id PIPE --since YYYY-MM-DD|this-week-et [--concurrency 8] [--skip-activity]") + _, _ = fmt.Fprintln(w, " query schema | catalog | explain --tables a,b | sql --sql SELECT...") _, _ = fmt.Fprintln(w, " sync init --db topline.db") _, _ = fmt.Fprintln(w, "\nParity commands:") for _, cmd := range commands { diff --git a/internal/commands/query.go b/internal/commands/query.go new file mode 100644 index 0000000..1aa9e14 --- /dev/null +++ b/internal/commands/query.go @@ -0,0 +1,108 @@ +package commands + +import ( + "context" + "errors" + "fmt" + "io" + "net/url" + "os" + "strings" + + "github.com/Topline-com/os-cli/internal/output" + "github.com/Topline-com/os-cli/internal/topline" +) + +func runQueryCommand(args []string, stdout io.Writer, globals globalOptions) error { + if len(args) == 0 || args[0] == "help" || args[0] == "--help" || args[0] == "-h" { + printQueryHelp(stdout) + return nil + } + + subcommand := args[0] + flags, err := parseFlags(args[1:]) + if err != nil { + return err + } + cfg, err := topline.LoadQueryConfig() + if err != nil { + return err + } + if flags["url"] != "" { + cfg.BaseURL = flags["url"] + } + client := topline.NewQueryClient(cfg) + ctx := context.Background() + + var result any + switch subcommand { + case "schema", "describe", "overview": + err = client.Get(ctx, "/query/api/get-overview", nil, &result) + case "catalog": + err = client.Get(ctx, "/query/api/catalog", nil, &result) + case "explain", "tables": + tables := splitCSV(firstNonEmptyQueryFlag(flags["tables"], flags["table"])) + if len(tables) == 0 { + return errors.New("usage: topline query explain --tables contacts,opportunities") + } + values := url.Values{} + for _, table := range tables { + values.Add("table", table) + } + err = client.Get(ctx, "/query/api/explain-tables", values, &result) + case "sql", "execute": + sqlText := strings.TrimSpace(flags["sql"]) + if sqlText == "" && flags["file"] != "" { + b, readErr := os.ReadFile(flags["file"]) + if readErr != nil { + return readErr + } + sqlText = strings.TrimSpace(string(b)) + } + if sqlText == "" { + return errors.New("usage: topline query sql --sql 'SELECT COUNT(*) FROM contacts' [--url https://os-mcp.topline.com]") + } + err = client.PostJSON(ctx, "/query/api/execute-sql", map[string]any{"sql": sqlText}, &result) + default: + return fmt.Errorf("unknown query command %q; try topline query help", subcommand) + } + if err != nil { + return err + } + return output.WriteJSON(stdout, result, globals.MaskPII) +} + +func printQueryHelp(w io.Writer) { + _, _ = fmt.Fprintln(w, "Topline SQL/query commands") + _, _ = fmt.Fprintln(w, "") + _, _ = fmt.Fprintln(w, "Usage:") + _, _ = fmt.Fprintln(w, " topline query schema") + _, _ = fmt.Fprintln(w, " topline query catalog") + _, _ = fmt.Fprintln(w, " topline query explain --tables contacts,opportunities") + _, _ = fmt.Fprintln(w, " topline query sql --sql 'SELECT COUNT(*) AS n FROM contacts'") + _, _ = fmt.Fprintln(w, "") + _, _ = fmt.Fprintln(w, "Environment:") + _, _ = fmt.Fprintln(w, " TOPLINE_QUERY_TOKEN Connection-bound token from https://os-mcp.topline.com/connect") + _, _ = fmt.Fprintln(w, " TOPLINE_QUERY_BASE_URL Defaults to https://os-mcp.topline.com") +} + +func splitCSV(s string) []string { + parts := strings.Split(s, ",") + out := make([]string, 0, len(parts)) + for _, part := range parts { + part = strings.TrimSpace(part) + if part != "" { + out = append(out, part) + } + } + return out +} + +func firstNonEmptyQueryFlag(values ...string) string { + for _, value := range values { + if strings.TrimSpace(value) != "" { + return strings.TrimSpace(value) + } + } + return "" +} diff --git a/internal/commands/query_test.go b/internal/commands/query_test.go new file mode 100644 index 0000000..9c84390 --- /dev/null +++ b/internal/commands/query_test.go @@ -0,0 +1,121 @@ +package commands + +import ( + "bytes" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" +) + +func TestQuerySchemaUsesHTTPQueryAPI(t *testing.T) { + t.Setenv("TOPLINE_QUERY_TOKEN", "signed-query-token") + + var gotPath, gotAuth string + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotPath = r.URL.Path + gotAuth = r.Header.Get("Authorization") + if r.Method != http.MethodGet { + t.Fatalf("method = %s, want GET", r.Method) + } + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"tables":[{"name":"opportunities","row_count":242}]}`)) + })) + defer server.Close() + t.Setenv("TOPLINE_QUERY_BASE_URL", server.URL) + + var stdout bytes.Buffer + if err := Execute([]string{"--agent", "query", "schema"}, &stdout, io.Discard); err != nil { + t.Fatalf("Execute returned error: %v", err) + } + if gotPath != "/query/api/get-overview" { + t.Fatalf("path = %q", gotPath) + } + if gotAuth != "Bearer signed-query-token" { + t.Fatalf("Authorization header = %q", gotAuth) + } + if !strings.Contains(stdout.String(), "opportunities") || !strings.Contains(stdout.String(), "242") { + t.Fatalf("unexpected output: %s", stdout.String()) + } +} + +func TestQuerySQLPostsSQLToExecuteEndpoint(t *testing.T) { + t.Setenv("TOPLINE_QUERY_TOKEN", "signed-query-token") + + var gotSQL string + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/query/api/execute-sql" { + t.Fatalf("path = %q", r.URL.Path) + } + if r.Method != http.MethodPost { + t.Fatalf("method = %s, want POST", r.Method) + } + if ct := r.Header.Get("Content-Type"); !strings.Contains(ct, "application/json") { + t.Fatalf("Content-Type = %q", ct) + } + var body struct { + SQL string `json:"sql"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + t.Fatalf("decode body: %v", err) + } + gotSQL = body.SQL + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"columns":["n"],"rows":[[242]],"elapsed_ms":3,"truncated":false,"effective_limit":5000,"rewritten_sql":"SELECT COUNT(*) AS n FROM opportunities LIMIT 5000"}`)) + })) + defer server.Close() + t.Setenv("TOPLINE_QUERY_BASE_URL", server.URL) + + var stdout bytes.Buffer + err := Execute([]string{"query", "sql", "--sql", "SELECT COUNT(*) AS n FROM opportunities"}, &stdout, io.Discard) + if err != nil { + t.Fatalf("Execute returned error: %v", err) + } + if gotSQL != "SELECT COUNT(*) AS n FROM opportunities" { + t.Fatalf("sql = %q", gotSQL) + } + if !strings.Contains(stdout.String(), "242") || !strings.Contains(stdout.String(), "rewritten_sql") { + t.Fatalf("unexpected output: %s", stdout.String()) + } +} + +func TestQueryExplainBuildsRepeatedTableParams(t *testing.T) { + t.Setenv("TOPLINE_QUERY_TOKEN", "signed-query-token") + + var gotTables []string + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/query/api/explain-tables" { + t.Fatalf("path = %q", r.URL.Path) + } + gotTables = r.URL.Query()["table"] + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"tables":[]}`)) + })) + defer server.Close() + t.Setenv("TOPLINE_QUERY_BASE_URL", server.URL) + + var stdout bytes.Buffer + err := Execute([]string{"query", "explain", "--tables", "contacts,opportunities"}, &stdout, io.Discard) + if err != nil { + t.Fatalf("Execute returned error: %v", err) + } + if strings.Join(gotTables, ",") != "contacts,opportunities" { + t.Fatalf("tables = %#v", gotTables) + } +} + +func TestQueryRejectsRawPITToken(t *testing.T) { + t.Setenv("TOPLINE_QUERY_TOKEN", "pit-example-token") + t.Setenv("TOPLINE_QUERY_BASE_URL", "http://127.0.0.1:1") + + var stdout bytes.Buffer + err := Execute([]string{"query", "schema"}, &stdout, io.Discard) + if err == nil { + t.Fatal("expected raw PIT token error") + } + if !strings.Contains(err.Error(), "connection-bound") || !strings.Contains(err.Error(), "not a raw PIT") { + t.Fatalf("unexpected error: %v", err) + } +} diff --git a/internal/topline/query_client.go b/internal/topline/query_client.go new file mode 100644 index 0000000..2f9f451 --- /dev/null +++ b/internal/topline/query_client.go @@ -0,0 +1,168 @@ +package topline + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "os" + "strings" + "time" +) + +const DefaultQueryBaseURL = "https://os-mcp.topline.com" + +type QueryConfig struct { + BaseURL string + Token string +} + +type QueryClient struct { + cfg QueryConfig + httpClient *http.Client +} + +type QueryAPIError struct { + StatusCode int + Body any + Message string +} + +func (e *QueryAPIError) Error() string { + return fmt.Sprintf("Topline query API error %d: %s", e.StatusCode, e.Message) +} + +func LoadQueryConfig() (QueryConfig, error) { + cfg := QueryConfig{ + BaseURL: strings.TrimSpace(os.Getenv("TOPLINE_QUERY_BASE_URL")), + Token: firstNonEmptyEnv("TOPLINE_QUERY_TOKEN", "TOPLINE_MCP_ACCESS_TOKEN", "TOPLINE_MCP_TOKEN"), + } + if cfg.BaseURL == "" { + cfg.BaseURL = DefaultQueryBaseURL + } + if cfg.Token == "" { + return cfg, errors.New("TOPLINE_QUERY_TOKEN is required for SQL/query commands; generate a connection-bound token at https://os-mcp.topline.com/connect or set TOPLINE_MCP_ACCESS_TOKEN") + } + if strings.HasPrefix(cfg.Token, "pit-") { + return cfg, errors.New("TOPLINE_QUERY_TOKEN must be a connection-bound MCP/query token, not a raw PIT; generate one at https://os-mcp.topline.com/connect") + } + return cfg, nil +} + +func NewQueryClient(cfg QueryConfig) *QueryClient { + return &QueryClient{cfg: cfg, httpClient: &http.Client{Timeout: 60 * time.Second}} +} + +func (c *QueryClient) Get(ctx context.Context, path string, values url.Values, out any) error { + return c.do(ctx, http.MethodGet, path, values, nil, out) +} + +func (c *QueryClient) PostJSON(ctx context.Context, path string, body any, out any) error { + return c.do(ctx, http.MethodPost, path, nil, body, out) +} + +func (c *QueryClient) do(ctx context.Context, method, path string, values url.Values, body any, out any) error { + endpoint, err := c.url(path, values) + if err != nil { + return err + } + var payload []byte + if body != nil { + payload, err = json.Marshal(body) + if err != nil { + return err + } + } + + var reader io.Reader + if payload != nil { + reader = bytes.NewReader(payload) + } + req, err := http.NewRequestWithContext(ctx, method, endpoint, reader) + if err != nil { + return err + } + req.Header.Set("Authorization", "Bearer "+c.cfg.Token) + req.Header.Set("Accept", "application/json") + if payload != nil { + req.Header.Set("Content-Type", "application/json") + } + res, err := c.httpClient.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + raw, err := io.ReadAll(res.Body) + if err != nil { + return err + } + var parsed any + if len(raw) > 0 { + if err := json.Unmarshal(raw, &parsed); err != nil { + parsed = string(raw) + } + } + if res.StatusCode >= 200 && res.StatusCode < 300 { + if out == nil || len(raw) == 0 { + return nil + } + return json.Unmarshal(raw, out) + } + return shapeQueryAPIError(res.StatusCode, parsed) +} + +func (c *QueryClient) url(path string, values url.Values) (string, error) { + base := strings.TrimRight(c.cfg.BaseURL, "/") + if !strings.HasPrefix(path, "/") { + path = "/" + path + } + u, err := url.Parse(base + path) + if err != nil { + return "", err + } + if len(values) > 0 { + q := u.Query() + for key, vals := range values { + for _, v := range vals { + if v != "" { + q.Add(key, v) + } + } + } + u.RawQuery = q.Encode() + } + return u.String(), nil +} + +func firstNonEmptyEnv(keys ...string) string { + for _, key := range keys { + if v := strings.TrimSpace(os.Getenv(key)); v != "" { + return v + } + } + return "" +} + +func shapeQueryAPIError(status int, body any) error { + msg := fmt.Sprintf("HTTP %d", status) + if m, ok := body.(map[string]any); ok { + if s, ok := m["error"].(string); ok && s != "" { + msg = s + } else if s, ok := m["message"].(string); ok && s != "" { + msg = s + } + } else if s, ok := body.(string); ok && strings.TrimSpace(s) != "" { + msg = strings.TrimSpace(s) + } + msg = strings.ReplaceAll(msg, "Bearer ", "Bearer [REDACTED] ") + if status == http.StatusUnauthorized { + msg = "Authentication failed for Topline query API. Use a connection-bound TOPLINE_QUERY_TOKEN, not TOPLINE_PIT." + } else if status == http.StatusForbidden { + msg = "Forbidden by Topline query API. The token is missing access to this location or SQL surface." + } + return &QueryAPIError{StatusCode: status, Body: body, Message: msg} +} diff --git a/skills/claude-code/SKILL.md b/skills/claude-code/SKILL.md index 9248c9f..63a3a9a 100644 --- a/skills/claude-code/SKILL.md +++ b/skills/claude-code/SKILL.md @@ -29,9 +29,12 @@ Useful switches: ```bash topline setup-check +topline --agent query schema +topline --agent query explain --tables opportunities,pipeline_stages,messages,contacts +topline --agent query sql --sql 'SELECT status, COUNT(*) AS n FROM opportunities GROUP BY status ORDER BY n DESC' topline opportunities pipelines topline opportunities search --pipeline-id PIPE --status open --limit 100 topline sync init --db topline.db ``` -Prefer `--agent` for token-efficient, PII-masked output. Never print PIT/API token values. +Use hosted SQL query commands for broad analytics when `TOPLINE_QUERY_TOKEN` is configured and warehouse freshness is acceptable. `TOPLINE_QUERY_TOKEN` must be a connection-bound token from `https://os-mcp.topline.com/connect`, not a raw PIT. Prefer `--agent` for token-efficient, PII-masked output. Never print PIT/API/query token values. diff --git a/skills/hermes/SKILL.md b/skills/hermes/SKILL.md index 75bcb42..ee9eb2c 100644 --- a/skills/hermes/SKILL.md +++ b/skills/hermes/SKILL.md @@ -13,6 +13,7 @@ Use `topline` when the user asks for sales pipeline reporting, CRM hygiene, deal - `TOPLINE_PIT` - `TOPLINE_LOCATION_ID` - `TOPLINE_BRAND_NAME` optional +- `TOPLINE_QUERY_TOKEN` optional, for hosted MCP warehouse SQL/query commands. Must be a connection-bound token from `https://os-mcp.topline.com/connect`, not a raw PIT. Never print full PIT values. Mask secrets and unnecessary PII by default in summaries. @@ -60,6 +61,9 @@ Use these flags when needed: ## Other preferred commands ```bash +topline --agent query schema +topline --agent query explain --tables opportunities,pipeline_stages,messages,contacts +topline --agent query sql --sql 'SELECT status, COUNT(*) AS n FROM opportunities GROUP BY status ORDER BY n DESC' topline opportunities pipelines topline opportunities search --pipeline-id PIPELINE_ID --status open --limit 100 topline conversations search --contact-id CONTACT_ID --status all --limit 10 @@ -69,4 +73,4 @@ topline sync init --db topline.db ## Reporting rule of thumb -Use raw MCP or `topline raw request` for one-off edge cases. Use the CLI for compound read/reporting workflows. Keep activity separate from movement: conversation activity can happen without opportunity stage/status changes. +Use hosted SQL query commands for broad analytics when `TOPLINE_QUERY_TOKEN` is configured and warehouse freshness is acceptable. Use raw MCP or `topline raw request` for one-off edge cases. Use the CLI for compound read/reporting workflows. Keep activity separate from movement: conversation activity can happen without opportunity stage/status changes.