diff --git a/internal/sync/pagination_test.go b/internal/sync/pagination_test.go new file mode 100644 index 0000000..0d23b7c --- /dev/null +++ b/internal/sync/pagination_test.go @@ -0,0 +1,180 @@ +package sync + +import ( + "context" + "database/sql" + "encoding/json" + "net/http" + "net/http/httptest" + "path/filepath" + "sync" + "testing" + + "github.com/Topline-com/os-cli/internal/topline" +) + +// TestSync_NumericCursorPagination is a regression guard for the production bug +// where GHL returned meta.startAfter as a JSON number (epoch ms) and the syncer +// silently dropped it (stringField only matched string), causing every "page 2" +// request to replay the page-1 cursor and the loop to break after one upsert. +// +// The fake here returns startAfter as a number, exactly like GHL prod, and +// asserts that the syncer keeps paging until the server stops handing back a +// cursor — pulling all rows down into SQLite. If this test ever fails again +// with "got 1 opp, expected 3", the regression is back. +func TestSync_NumericCursorPagination(t *testing.T) { + // Each entity returns three pages; cursor is a numeric epoch-ms value. + type oppPage struct { + ID string + Cursor float64 + HasNext bool + NextID string + NextCur float64 + } + oppPages := []oppPage{ + {ID: "O1", HasNext: true, NextID: "O1", NextCur: 1700000001000}, + {ID: "O2", HasNext: true, NextID: "O2", NextCur: 1700000002000}, + {ID: "O3", HasNext: false}, + } + contactPages := []oppPage{ + {ID: "C1", HasNext: true, NextID: "C1", NextCur: 1700000001000}, + {ID: "C2", HasNext: true, NextID: "C2", NextCur: 1700000002000}, + {ID: "C3", HasNext: false}, + } + convoPages := []oppPage{ + {ID: "V1", HasNext: true, NextID: "V1", NextCur: 1700000001000}, + {ID: "V2", HasNext: true, NextID: "V2", NextCur: 1700000002000}, + {ID: "V3", HasNext: false}, + } + + var mu sync.Mutex + oppIdx, contactIdx, convoIdx := 0, 0, 0 + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + mu.Lock() + defer mu.Unlock() + switch r.URL.Path { + case "/opportunities/pipelines": + _ = json.NewEncoder(w).Encode(map[string]any{"pipelines": []any{}}) + case "/opportunities/search": + if oppIdx >= len(oppPages) { + _ = json.NewEncoder(w).Encode(map[string]any{"opportunities": []any{}}) + return + } + p := oppPages[oppIdx] + oppIdx++ + body := map[string]any{ + "opportunities": []any{map[string]any{ + "id": p.ID, + "name": p.ID, + "status": "open", + "pipelineId": "PIPE1", + "pipelineStageId": "STG1", + "contactId": "X", + "monetaryValue": 100.0, + }}, + } + if p.HasNext { + body["meta"] = map[string]any{ + "startAfterId": p.NextID, + "startAfter": p.NextCur, // numeric — the actual prod shape + } + } + _ = json.NewEncoder(w).Encode(body) + case "/contacts/search": + if contactIdx >= len(contactPages) { + _ = json.NewEncoder(w).Encode(map[string]any{"contacts": []any{}}) + return + } + p := contactPages[contactIdx] + contactIdx++ + body := map[string]any{ + "contacts": []any{map[string]any{ + "id": p.ID, + "firstName": p.ID, + "email": p.ID + "@example.com", + }}, + } + if p.HasNext { + body["meta"] = map[string]any{ + "startAfterId": p.NextID, + "startAfter": p.NextCur, + } + } + _ = json.NewEncoder(w).Encode(body) + case "/conversations/search": + if convoIdx >= len(convoPages) { + _ = json.NewEncoder(w).Encode(map[string]any{"conversations": []any{}}) + return + } + p := convoPages[convoIdx] + convoIdx++ + body := map[string]any{ + "conversations": []any{map[string]any{ + "id": p.ID, + "contactId": "X", + "lastMessageDate": "2026-05-13T00:00:00Z", + }}, + } + if p.HasNext { + body["meta"] = map[string]any{ + "startAfterId": p.NextID, + "startAfter": p.NextCur, + } + } + _ = json.NewEncoder(w).Encode(body) + default: + _ = json.NewEncoder(w).Encode(map[string]any{"messages": map[string]any{"messages": []any{}}}) + } + })) + defer ts.Close() + + client := topline.NewClient(topline.Config{BaseURL: ts.URL, PIT: "test", LocationID: "LOC"}) + dbPath := filepath.Join(t.TempDir(), "t.db") + if _, err := SyncAll(context.Background(), client, "LOC", dbPath); err != nil { + t.Fatalf("SyncAll: %v", err) + } + + db, err := sql.Open("sqlite", dbPath) + if err != nil { + t.Fatalf("open: %v", err) + } + defer db.Close() + for _, c := range []struct { + table string + want int + }{ + {"opportunities", 3}, + {"contacts", 3}, + {"conversations", 3}, + } { + var n int + if err := db.QueryRow(`SELECT COUNT(*) FROM ` + c.table).Scan(&n); err != nil { + t.Fatalf("count %s: %v", c.table, err) + } + if n != c.want { + t.Fatalf("%s: expected %d rows (pagination must follow numeric cursor), got %d", c.table, c.want, n) + } + } +} + +func TestAnyToString_HandlesNumericCursor(t *testing.T) { + cases := []struct { + in any + want string + }{ + {1700000001000.0, "1700000001000"}, + {"abc", "abc"}, + {nil, ""}, + {int(42), "42"}, + {int64(1700000001000), "1700000001000"}, + {true, "true"}, + } + for _, c := range cases { + got := anyToString(c.in) + if got != c.want { + t.Fatalf("anyToString(%v) = %q, want %q", c.in, got, c.want) + } + } +} diff --git a/internal/sync/sync.go b/internal/sync/sync.go index d9ae02a..a318913 100644 --- a/internal/sync/sync.go +++ b/internal/sync/sync.go @@ -5,6 +5,7 @@ import ( "database/sql" "encoding/json" "fmt" + "strconv" "strings" "time" @@ -219,11 +220,16 @@ func syncOpportunities(ctx context.Context, client *topline.Client, db *sql.DB, if err := tx.Commit(); err != nil { return r, err } - // Pagination: GHL search endpoints expose startAfter (epoch ms) + - // startAfterId via meta.{startAfter,startAfterId} OR nextPageUrl. + // Pagination: GHL search endpoints expose startAfter (epoch ms, encoded + // as a JSON number) + startAfterId via meta. anyToString handles the + // numeric case — stringField alone returns "" for float64 values and + // silently drops the cursor, causing infinite same-page replays. meta, _ := raw["meta"].(map[string]any) nextID := stringField(meta, "startAfterId", "start_after_id") - nextAfter := stringField(meta, "startAfter", "start_after") + nextAfter := anyToString(meta["startAfter"]) + if nextAfter == "" { + nextAfter = anyToString(meta["start_after"]) + } if nextID == "" || (nextID == startAfterID && nextAfter == startAfter) { break } @@ -306,11 +312,16 @@ func syncContacts(ctx context.Context, client *topline.Client, db *sql.DB, locat } // Cursor: GHL contacts/search returns meta.startAfter (epoch ms) + // meta.startAfterId on the next page. Feed those back as a - // searchAfter array. + // searchAfter array. Trust the cursor — not the page length — as the + // stop signal; a "short" page can still be followed by a full one when + // GHL filters server-side after the fetch. meta, _ := raw["meta"].(map[string]any) nextID := stringField(meta, "startAfterId", "start_after_id") nextAfter := meta["startAfter"] - if nextID == "" || len(list) < pageLimit { + if nextAfter == nil { + nextAfter = meta["start_after"] + } + if nextID == "" { break } next := []any{nextAfter, nextID} @@ -394,7 +405,10 @@ func syncConversations(ctx context.Context, client *topline.Client, db *sql.DB, } meta, _ := raw["meta"].(map[string]any) nextID := stringField(meta, "startAfterId", "start_after_id") - nextAfter := stringField(meta, "startAfter", "start_after") + nextAfter := anyToString(meta["startAfter"]) + if nextAfter == "" { + nextAfter = anyToString(meta["start_after"]) + } if nextID == "" || (nextID == startAfterID && nextAfter == startAfter) { break } @@ -545,6 +559,33 @@ func stringField(m map[string]any, keys ...string) string { return "" } +// anyToString converts a JSON-decoded value to its string form, handling the +// numeric case that stringField silently drops. GHL pagination cursors arrive +// as numbers (epoch ms) inside meta.startAfter — formatting them as a string +// without scientific notation is required so we can pass them back in the +// query string for the next page. +func anyToString(v any) string { + if v == nil { + return "" + } + switch x := v.(type) { + case string: + return x + case float64: + return strconv.FormatFloat(x, 'f', -1, 64) + case int: + return strconv.Itoa(x) + case int64: + return strconv.FormatInt(x, 10) + case json.Number: + return string(x) + case bool: + return strconv.FormatBool(x) + default: + return fmt.Sprint(x) + } +} + func floatField(m map[string]any, keys ...string) float64 { for _, k := range keys { switch v := m[k].(type) { diff --git a/skills/claude-code/SKILL.md b/skills/claude-code/SKILL.md index 3a62144..3a96bd6 100644 --- a/skills/claude-code/SKILL.md +++ b/skills/claude-code/SKILL.md @@ -1,6 +1,7 @@ --- name: topline-os-cli description: Use the Topline OS CLI for SQL-first CRM analytics, pipeline audits, deal briefs, and agent-safe sales operations. Default to the composite `topline --agent query audit|snapshot|freshness` commands for standard analytics; use REST-backed commands for live drilldowns and approved writes. Triggers on Topline OS, CRM pipeline, opportunity, deal, sales activity, and `topline` CLI questions. +version: 1.6.1 --- # Topline OS CLI Skill @@ -97,3 +98,9 @@ topline sync init --db topline.db ## Output rules Prefer `--agent` for token-efficient, PII-masked output. Never print PIT or query token values. Avoid markdown tables in chat replies; use bullets. Keep activity separate from movement: conversation activity can happen without opportunity stage/status changes. + +## Common pitfalls + +- **Treating current pipeline as historical origin.** The `opportunities` warehouse table exposes current pipeline/stage state; a won Qualified opportunity is not proof the deal originated in Triage. For Flex conversion questions, answer current-state first (e.g. current pipeline = `Sales - Flex - Qualified`, status = won, created/closed in window), then add a lineage confidence caveat unless a history/audit table or activity event records the move. +- **Counting automated workflow touches as rep effort.** For manual outreach audits, exclude workflow/app automation — in the hosted warehouse, `raw_payload.source = 'app'` on `messages` is the automation exclusion signal. Break out calls/email/SMS separately and report contact counts. +- **Mislabeling SQL/native disagreements as "sync lag".** Only call it lag when `_synced_at` proves lag. Missing UNION branches or coverage gaps are `os-mcp` bugs, not lag — disclose and stop. diff --git a/skills/hermes/SKILL.md b/skills/hermes/SKILL.md index 44d4642..21fd983 100644 --- a/skills/hermes/SKILL.md +++ b/skills/hermes/SKILL.md @@ -1,7 +1,7 @@ --- name: topline-os-cli description: Use the Topline OS CLI for SQL-first CRM analytics, pipeline audits, token-efficient reads, deal briefs, and agent-safe sales operations. Default to the composite `topline --agent query audit|snapshot|freshness` commands for standard analytics; use REST-backed commands for live drilldowns and approved writes. -version: 1.6.0 +version: 1.6.1 --- # Topline OS CLI @@ -170,6 +170,8 @@ topline raw request GET /opportunities/search --query '{"pipelineId":"PIPELINE_I 12. **Editing this skill (or `topline-os-crm-audits`) mid-audit via `skill_manage`.** The contract is read-only during execution. If the skill is wrong, finish the current run honestly (or stop and disclose the gap), then propose the edit in a follow-up turn. 13. **Prompt-rule tightening instead of primitive design.** If repeated runs keep finding new over-calling shapes (REST fan-out → python wrappers → over-decomposed SQL → bash heredocs), stop adding rules and move the workflow into a composite command/view. The standard pipeline audit is now `query doctor` → `query audit` → answer. 14. **Doing math on the audit JSON after the fact.** Real failure mode: agent runs `query doctor` + `query audit` cleanly, then opens a `python3 - <<'PY' vals=[...] PY` heredoc (or `jq` / `awk` / bash arithmetic) to compute averages/totals over the audit's `activity.by_stage`, `deals`, or `snapshot` rollups before answering. The audit response already carries those rollups — `activity.total_messages`, `activity.by_stage[*]`, `deals.open_count`, `deals.open_value_total`, `snapshot.avg_days_in_stage`, `movement.advances`/`movement.regresses`/`movement.stalls`. If a question genuinely needs a number not in the payload (e.g. p95 instead of avg), express it as `topline --agent query sql --sql ...` and disclose that it is non-standard analytics. Computing in Python/jq/bash on the audit JSON is the same anti-pattern as wrapping the CLI in Python — it just moves the violation one step past the CLI boundary. +15. **Treating current pipeline as historical origin.** The `opportunities` warehouse table exposes current pipeline/stage state; it does not, by itself, prove that a won Qualified opportunity started in Triage. For Flex conversion questions, answer in two layers: (a) direct/current-state query (current pipeline = `Sales - Flex - Qualified`, status = won, created/closed in window); (b) lineage confidence caveat unless a history/audit table or activity event explicitly records the pipeline move. Do not report "originated in Triage" as proven just because the deal is now in Qualified. See `references/flex-crm-lineage-and-manual-outreach.md`. +16. **Counting automated workflow touches as rep effort.** For manual outreach/activity audits, exclude workflow/app automation. In the hosted warehouse, use message/activity metadata such as `raw_payload.source = 'app'` as the automation exclusion signal when available, then break out calls/email/SMS separately and report contact counts. ## Reporting rule of thumb diff --git a/skills/hermes/references/flex-crm-lineage-and-manual-outreach.md b/skills/hermes/references/flex-crm-lineage-and-manual-outreach.md new file mode 100644 index 0000000..4b6be37 --- /dev/null +++ b/skills/hermes/references/flex-crm-lineage-and-manual-outreach.md @@ -0,0 +1,75 @@ +# Flex CRM lineage and manual outreach audit notes + +Use this reference when Alex asks about Flex triage → qualified conversion, won deals, rep responsiveness, or manual outreach counts. + +## Definitions + +- **Manual outreach**: rep-authored outbound calls, emails, or SMS. Exclude workflow/app automation; in the warehouse, `raw_payload.source = 'app'` is the key automation signal observed in `messages`. +- **Response SLA**: for lead-response audits, measure the first outbound call within 4 hours of opportunity creation unless Alex specifies another SLA. +- **Owner attribution**: prefer `opportunities.assigned_to`; fall back to `contacts.assigned_to` when opportunity owner is missing. +- **Current month / QTD**: use the live date tool first, then build explicit date boundaries. + +## Pipeline lineage caveat + +The hosted warehouse `opportunities` table represents current opportunity state. A deal currently in `Sales - Flex - Qualified` with status `won` can be a good operational proxy for a triage-converted won deal, but it is not proof that the opportunity originally started in `Sales - Flex - Triage` unless a history/audit surface records the move. + +Observed pipeline IDs: + +- `Sales - Flex - Triage`: `bna6e9DoPgRchNsjeYS3` +- `Sales - Flex - Qualified`: `CLUy1QapsrEeBiNrmQiL` + +When asked “how many Triage leads won,” do not only query current Triage; also query current Qualified wins in the window, then state the limitation: + +- Current Triage + won: direct same-pipeline wins. +- Current Qualified + won + created/closed in window: operational proxy for moved-forward wins. +- True origin lineage: requires opportunity-history/audit table or explicit activity event; if unavailable, say so plainly. + +## Useful SQL patterns + +Manual outreach by rep/month: + +```sql +SELECT + assigned_user_name, + COUNT(*) AS manual_touches, + COUNT(DISTINCT contact_id) AS contacts_touched, + SUM(CASE WHEN channel = 'call' THEN 1 ELSE 0 END) AS calls, + SUM(CASE WHEN channel = 'email' THEN 1 ELSE 0 END) AS emails, + SUM(CASE WHEN channel = 'sms' THEN 1 ELSE 0 END) AS sms +FROM messages +WHERE direction = 'outbound' + AND created_at >= 'YYYY-MM-01' + AND created_at < 'YYYY-MM-NEXT-01' + -- adapt to the warehouse JSON/text dialect; exclude automation where raw_payload.source = 'app' + AND COALESCE(JSON_EXTRACT(raw_payload, '$.source'), '') <> 'app' +GROUP BY assigned_user_name; +``` + +Flex Qualified won proxy: + +```sql +SELECT + contact_name, + company_name, + value, + source, + created_at, + closed_at +FROM opportunities +WHERE pipeline_id = 'CLUy1QapsrEeBiNrmQiL' + AND status = 'won' + AND created_at >= 'YYYY-01-01' +ORDER BY closed_at; +``` + +Before relying on history, check whether the warehouse exposes a movement surface: + +```sql +SELECT name +FROM sqlite_master +WHERE LOWER(name) LIKE '%history%' + OR LOWER(name) LIKE '%audit%' + OR LOWER(name) LIKE '%movement%'; +``` + +If no history surface exists and activity rows do not include stage/pipeline move payloads, qualify the answer as current-state/proxy analysis, not proven origin lineage.