Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ Optional for tests/proxies:
export TOPLINE_BASE_URL="https://services.leadconnectorhq.com"
```

SQL/query commands use the hosted `os-mcp` warehouse HTTP API, not the raw
LeadConnector REST API. That surface requires a connection-bound token minted by
the remote MCP, because raw PITs are intentionally rejected for read-only SQL:

```bash
export TOPLINE_QUERY_TOKEN="signed_connection_token_from_/connect"
export TOPLINE_QUERY_BASE_URL="https://os-mcp.topline.com" # optional default
```

Generate `TOPLINE_QUERY_TOKEN` at <https://os-mcp.topline.com/connect> using the
same PIT + Location ID. Keep it out of command history and commits.

## Quick start

```bash
Expand Down Expand Up @@ -79,6 +91,27 @@ so agents do not need a second lookup just to name the touched deals. If
answer; run a fallback conversation/message join. Use `--skip-activity` when you
only need the open count/value/stage breakdown.

Warehouse SQL/query API:

```bash
topline --agent query schema
topline --agent query explain --tables opportunities,pipeline_stages,messages
topline --agent query sql --sql '
SELECT status, COUNT(*) AS n, SUM(monetary_value) AS value
FROM opportunities
GROUP BY status
ORDER BY n DESC
'
```

`query` delegates to the hosted `Topline-com/os-mcp` SQL surface
(`/query/api/*`): schema/catalog discovery, table explanation, and safe
`SELECT` / `WITH ... SELECT` execution. The worker enforces the same read-only
SQL gate as MCP (`DDL`, `DML`, `PRAGMA`, `ATTACH`, multi-statement SQL, and
hidden tables are rejected) and caps results at 5,000 rows. Use this for
Streamlined-style analytics questions where a relational scan beats paginated
REST fan-out.

Local SQLite foundation:

```bash
Expand Down Expand Up @@ -124,6 +157,7 @@ Parity command scaffolding exists for the public MCP action surface:
Agent-native foundations included now:

- `pipeline audit` with recent conversation scan + parallel message/task joins
- `query schema|catalog|explain|sql` against the hosted MCP warehouse HTTP API
- `sync init`
- `--agent`
- `--mask-pii`
Expand Down
24 changes: 24 additions & 0 deletions docs/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,30 @@ sales question directly. If `activityJoinIncluded` is missing or false, use a
fallback conversation/message join before reporting zero activity.
Use `--skip-activity` for a fast snapshot-only count/value/stage breakdown.

## Warehouse SQL analytics

```bash
export TOPLINE_QUERY_TOKEN="signed_connection_token_from_/connect"

topline --agent query schema

topline --agent query explain --tables opportunities,pipeline_stages,messages

topline --agent query sql --sql '
SELECT p.name AS pipeline, COUNT(*) AS open_deals, SUM(o.monetary_value) AS value
FROM opportunities o
JOIN pipelines p ON p.id = o.pipeline_id
WHERE o.status = "open"
GROUP BY p.name
ORDER BY value DESC
'
```

Use `query` for broad analytics and audits when SQL over the synced warehouse is
better than paginating live REST endpoints. The hosted MCP query API enforces
read-only SQLite: one `SELECT` / `WITH ... SELECT`, exposed tables only, 5,000-row
cap.

## Search open opportunities

```bash
Expand Down
5 changes: 5 additions & 0 deletions internal/commands/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ func Execute(args []string, stdout, stderr io.Writer) error {
if len(rest) >= 2 && rest[0] == "pipeline" && rest[1] == "audit" {
return runPipelineAudit(rest[2:], stdout, globals)
}
if len(rest) >= 1 && rest[0] == "query" {
return runQueryCommand(rest[1:], stdout, globals)
}
if len(rest) >= 2 && rest[0] == "sync" && rest[1] == "init" {
flags, err := parseFlags(rest[2:])
if err != nil {
Expand Down Expand Up @@ -230,9 +233,11 @@ func printHelp(w io.Writer) {
_, _ = fmt.Fprintln(w, "\nUsage:")
_, _ = fmt.Fprintln(w, " topline <command> [--flag value]")
_, _ = fmt.Fprintln(w, " topline --agent pipeline audit --pipeline-id PIPE --since this-week-et")
_, _ = fmt.Fprintln(w, " topline --agent query sql --sql 'SELECT COUNT(*) AS n FROM opportunities'")
_, _ = fmt.Fprintln(w, " topline raw request GET /contacts/ --query '{\"limit\":1}'")
_, _ = fmt.Fprintln(w, "\nAgent-native commands:")
_, _ = fmt.Fprintln(w, " pipeline audit --pipeline-id PIPE --since YYYY-MM-DD|this-week-et [--concurrency 8] [--skip-activity]")
_, _ = fmt.Fprintln(w, " query schema | catalog | explain --tables a,b | sql --sql SELECT...")
_, _ = fmt.Fprintln(w, " sync init --db topline.db")
_, _ = fmt.Fprintln(w, "\nParity commands:")
for _, cmd := range commands {
Expand Down
108 changes: 108 additions & 0 deletions internal/commands/query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package commands

import (
"context"
"errors"
"fmt"
"io"
"net/url"
"os"
"strings"

"github.com/Topline-com/os-cli/internal/output"
"github.com/Topline-com/os-cli/internal/topline"
)

func runQueryCommand(args []string, stdout io.Writer, globals globalOptions) error {
if len(args) == 0 || args[0] == "help" || args[0] == "--help" || args[0] == "-h" {
printQueryHelp(stdout)
return nil
}

subcommand := args[0]
flags, err := parseFlags(args[1:])
if err != nil {
return err
}
cfg, err := topline.LoadQueryConfig()
if err != nil {
return err
}
if flags["url"] != "" {
cfg.BaseURL = flags["url"]
}
client := topline.NewQueryClient(cfg)
ctx := context.Background()

var result any
switch subcommand {
case "schema", "describe", "overview":
err = client.Get(ctx, "/query/api/get-overview", nil, &result)
case "catalog":
err = client.Get(ctx, "/query/api/catalog", nil, &result)
case "explain", "tables":
tables := splitCSV(firstNonEmptyQueryFlag(flags["tables"], flags["table"]))
if len(tables) == 0 {
return errors.New("usage: topline query explain --tables contacts,opportunities")
}
values := url.Values{}
for _, table := range tables {
values.Add("table", table)
}
err = client.Get(ctx, "/query/api/explain-tables", values, &result)
case "sql", "execute":
sqlText := strings.TrimSpace(flags["sql"])
if sqlText == "" && flags["file"] != "" {
b, readErr := os.ReadFile(flags["file"])
if readErr != nil {
return readErr
}
sqlText = strings.TrimSpace(string(b))
}
if sqlText == "" {
return errors.New("usage: topline query sql --sql 'SELECT COUNT(*) FROM contacts' [--url https://os-mcp.topline.com]")
}
err = client.PostJSON(ctx, "/query/api/execute-sql", map[string]any{"sql": sqlText}, &result)
default:
return fmt.Errorf("unknown query command %q; try topline query help", subcommand)
}
if err != nil {
return err
}
return output.WriteJSON(stdout, result, globals.MaskPII)
}

func printQueryHelp(w io.Writer) {
_, _ = fmt.Fprintln(w, "Topline SQL/query commands")
_, _ = fmt.Fprintln(w, "")
_, _ = fmt.Fprintln(w, "Usage:")
_, _ = fmt.Fprintln(w, " topline query schema")
_, _ = fmt.Fprintln(w, " topline query catalog")
_, _ = fmt.Fprintln(w, " topline query explain --tables contacts,opportunities")
_, _ = fmt.Fprintln(w, " topline query sql --sql 'SELECT COUNT(*) AS n FROM contacts'")
_, _ = fmt.Fprintln(w, "")
_, _ = fmt.Fprintln(w, "Environment:")
_, _ = fmt.Fprintln(w, " TOPLINE_QUERY_TOKEN Connection-bound token from https://os-mcp.topline.com/connect")
_, _ = fmt.Fprintln(w, " TOPLINE_QUERY_BASE_URL Defaults to https://os-mcp.topline.com")
}

func splitCSV(s string) []string {
parts := strings.Split(s, ",")
out := make([]string, 0, len(parts))
for _, part := range parts {
part = strings.TrimSpace(part)
if part != "" {
out = append(out, part)
}
}
return out
}

func firstNonEmptyQueryFlag(values ...string) string {
for _, value := range values {
if strings.TrimSpace(value) != "" {
return strings.TrimSpace(value)
}
}
return ""
}
121 changes: 121 additions & 0 deletions internal/commands/query_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package commands

import (
"bytes"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
)

func TestQuerySchemaUsesHTTPQueryAPI(t *testing.T) {
t.Setenv("TOPLINE_QUERY_TOKEN", "signed-query-token")

var gotPath, gotAuth string
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gotPath = r.URL.Path
gotAuth = r.Header.Get("Authorization")
if r.Method != http.MethodGet {
t.Fatalf("method = %s, want GET", r.Method)
}
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"tables":[{"name":"opportunities","row_count":242}]}`))
}))
defer server.Close()
t.Setenv("TOPLINE_QUERY_BASE_URL", server.URL)

var stdout bytes.Buffer
if err := Execute([]string{"--agent", "query", "schema"}, &stdout, io.Discard); err != nil {
t.Fatalf("Execute returned error: %v", err)
}
if gotPath != "/query/api/get-overview" {
t.Fatalf("path = %q", gotPath)
}
if gotAuth != "Bearer signed-query-token" {
t.Fatalf("Authorization header = %q", gotAuth)
}
if !strings.Contains(stdout.String(), "opportunities") || !strings.Contains(stdout.String(), "242") {
t.Fatalf("unexpected output: %s", stdout.String())
}
}

func TestQuerySQLPostsSQLToExecuteEndpoint(t *testing.T) {
t.Setenv("TOPLINE_QUERY_TOKEN", "signed-query-token")

var gotSQL string
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/query/api/execute-sql" {
t.Fatalf("path = %q", r.URL.Path)
}
if r.Method != http.MethodPost {
t.Fatalf("method = %s, want POST", r.Method)
}
if ct := r.Header.Get("Content-Type"); !strings.Contains(ct, "application/json") {
t.Fatalf("Content-Type = %q", ct)
}
var body struct {
SQL string `json:"sql"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
t.Fatalf("decode body: %v", err)
}
gotSQL = body.SQL
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"columns":["n"],"rows":[[242]],"elapsed_ms":3,"truncated":false,"effective_limit":5000,"rewritten_sql":"SELECT COUNT(*) AS n FROM opportunities LIMIT 5000"}`))
}))
defer server.Close()
t.Setenv("TOPLINE_QUERY_BASE_URL", server.URL)

var stdout bytes.Buffer
err := Execute([]string{"query", "sql", "--sql", "SELECT COUNT(*) AS n FROM opportunities"}, &stdout, io.Discard)
if err != nil {
t.Fatalf("Execute returned error: %v", err)
}
if gotSQL != "SELECT COUNT(*) AS n FROM opportunities" {
t.Fatalf("sql = %q", gotSQL)
}
if !strings.Contains(stdout.String(), "242") || !strings.Contains(stdout.String(), "rewritten_sql") {
t.Fatalf("unexpected output: %s", stdout.String())
}
}

func TestQueryExplainBuildsRepeatedTableParams(t *testing.T) {
t.Setenv("TOPLINE_QUERY_TOKEN", "signed-query-token")

var gotTables []string
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/query/api/explain-tables" {
t.Fatalf("path = %q", r.URL.Path)
}
gotTables = r.URL.Query()["table"]
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"tables":[]}`))
}))
defer server.Close()
t.Setenv("TOPLINE_QUERY_BASE_URL", server.URL)

var stdout bytes.Buffer
err := Execute([]string{"query", "explain", "--tables", "contacts,opportunities"}, &stdout, io.Discard)
if err != nil {
t.Fatalf("Execute returned error: %v", err)
}
if strings.Join(gotTables, ",") != "contacts,opportunities" {
t.Fatalf("tables = %#v", gotTables)
}
}

func TestQueryRejectsRawPITToken(t *testing.T) {
t.Setenv("TOPLINE_QUERY_TOKEN", "pit-example-token")
t.Setenv("TOPLINE_QUERY_BASE_URL", "http://127.0.0.1:1")

var stdout bytes.Buffer
err := Execute([]string{"query", "schema"}, &stdout, io.Discard)
if err == nil {
t.Fatal("expected raw PIT token error")
}
if !strings.Contains(err.Error(), "connection-bound") || !strings.Contains(err.Error(), "not a raw PIT") {
t.Fatalf("unexpected error: %v", err)
}
}
Loading
Loading