Skip to content

Commit 612ecb3

Browse files
authored
Merge pull request #1781 from helixml/feature/events
events
2 parents 26abc84 + e44fd63 commit 612ecb3

File tree

4 files changed

+785
-28
lines changed

4 files changed

+785
-28
lines changed

api/pkg/store/store_events.go

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
package store
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"strings"
8+
"time"
9+
10+
"github.com/helixml/helix/api/pkg/pubsub"
11+
"github.com/helixml/helix/api/pkg/types"
12+
)
13+
14+
type StoreEventOperation string
15+
16+
const (
17+
StoreEventOperationCreated StoreEventOperation = "created"
18+
StoreEventOperationUpdated StoreEventOperation = "updated"
19+
StoreEventOperationDeleted StoreEventOperation = "deleted"
20+
)
21+
22+
type StoreEventResourceType string
23+
24+
const (
25+
StoreEventResourceTypeSpecTask StoreEventResourceType = "spec_task"
26+
StoreEventResourceTypeSession StoreEventResourceType = "session"
27+
)
28+
29+
type StoreEvent struct {
30+
Operation StoreEventOperation `json:"operation"`
31+
ResourceType StoreEventResourceType `json:"resource_type"`
32+
ResourceID string `json:"resource_id,omitempty"`
33+
OrganizationID string `json:"organization_id,omitempty"`
34+
ProjectID string `json:"project_id,omitempty"`
35+
OccurredAt time.Time `json:"occurred_at"`
36+
Resource json.RawMessage `json:"resource"`
37+
}
38+
39+
func (e *StoreEvent) UnmarshalResource(dst any) error {
40+
if len(e.Resource) == 0 {
41+
return fmt.Errorf("event resource is empty")
42+
}
43+
44+
if err := json.Unmarshal(e.Resource, dst); err != nil {
45+
return fmt.Errorf("failed to unmarshal event resource: %w", err)
46+
}
47+
48+
return nil
49+
}
50+
51+
type StoreEventSubscriptionFilter struct {
52+
ResourceType StoreEventResourceType
53+
ResourceID string
54+
OrganizationID string
55+
ProjectID string
56+
}
57+
58+
func (f *StoreEventSubscriptionFilter) Matches(event *StoreEvent) bool {
59+
if f == nil {
60+
return true
61+
}
62+
63+
if f.ResourceType != "" && event.ResourceType != f.ResourceType {
64+
return false
65+
}
66+
67+
if f.ResourceID != "" && event.ResourceID != f.ResourceID {
68+
return false
69+
}
70+
71+
if f.OrganizationID != "" && event.OrganizationID != f.OrganizationID {
72+
return false
73+
}
74+
75+
if f.ProjectID != "" && event.ProjectID != f.ProjectID {
76+
return false
77+
}
78+
79+
return true
80+
}
81+
82+
func (s *PostgresStore) publishStoreEvent(ctx context.Context, operation StoreEventOperation, resource any) error {
83+
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
84+
defer cancel()
85+
86+
event, err := buildStoreEvent(operation, resource)
87+
if err != nil {
88+
return err
89+
}
90+
91+
payload, err := json.Marshal(event)
92+
if err != nil {
93+
return fmt.Errorf("failed to marshal store event: %w", err)
94+
}
95+
96+
return s.pubsub.Publish(ctx, newStoreEventSubject(event.ResourceType, event.OrganizationID, event.ProjectID), payload)
97+
}
98+
99+
func (s *PostgresStore) subscribeStoreEvents(ctx context.Context, filter *StoreEventSubscriptionFilter, handler func(event *StoreEvent) error) (pubsub.Subscription, error) {
100+
subject := newStoreEventSubscriptionSubject(filter)
101+
102+
return s.pubsub.Subscribe(ctx, subject, func(payload []byte) error {
103+
var event StoreEvent
104+
if err := json.Unmarshal(payload, &event); err != nil {
105+
return fmt.Errorf("failed to unmarshal store event: %w", err)
106+
}
107+
108+
if !filter.Matches(&event) {
109+
return nil
110+
}
111+
112+
return handler(&event)
113+
})
114+
}
115+
116+
func buildStoreEvent(operation StoreEventOperation, resource any) (*StoreEvent, error) {
117+
resourceType, resourceID, organizationID, projectID, err := extractStoreEventMetadata(resource)
118+
if err != nil {
119+
return nil, err
120+
}
121+
122+
resourcePayload, err := json.Marshal(resource)
123+
if err != nil {
124+
return nil, fmt.Errorf("failed to marshal resource payload: %w", err)
125+
}
126+
127+
return &StoreEvent{
128+
Operation: operation,
129+
ResourceType: resourceType,
130+
ResourceID: resourceID,
131+
OrganizationID: organizationID,
132+
ProjectID: projectID,
133+
OccurredAt: time.Now().UTC(),
134+
Resource: resourcePayload,
135+
}, nil
136+
}
137+
138+
func extractStoreEventMetadata(resource any) (resourceType StoreEventResourceType, resourceID, organizationID, projectID string, err error) {
139+
switch r := resource.(type) {
140+
case *types.SpecTask:
141+
if r == nil {
142+
return "", "", "", "", fmt.Errorf("resource is nil spec task")
143+
}
144+
return StoreEventResourceTypeSpecTask, r.ID, r.OrganizationID, r.ProjectID, nil
145+
case types.SpecTask:
146+
return StoreEventResourceTypeSpecTask, r.ID, r.OrganizationID, r.ProjectID, nil
147+
case *types.Session:
148+
if r == nil {
149+
return "", "", "", "", fmt.Errorf("resource is nil session")
150+
}
151+
return StoreEventResourceTypeSession, r.ID, r.OrganizationID, r.ProjectID, nil
152+
case types.Session:
153+
return StoreEventResourceTypeSession, r.ID, r.OrganizationID, r.ProjectID, nil
154+
default:
155+
return "", "", "", "", fmt.Errorf("unsupported resource type for store event: %T", resource)
156+
}
157+
}
158+
159+
func newStoreEventSubscriptionSubject(filter *StoreEventSubscriptionFilter) string {
160+
resourceToken := "*"
161+
if filter != nil && filter.ResourceType != "" {
162+
resourceToken = string(filter.ResourceType)
163+
}
164+
165+
orgToken := "*"
166+
if filter != nil && filter.OrganizationID != "" {
167+
orgToken = subjectToken(filter.OrganizationID)
168+
}
169+
170+
projectToken := "*"
171+
if filter != nil && filter.ProjectID != "" {
172+
projectToken = subjectToken(filter.ProjectID)
173+
}
174+
175+
return fmt.Sprintf("store.events.%s.%s.%s", resourceToken, orgToken, projectToken)
176+
}
177+
178+
func newStoreEventSubject(resourceType StoreEventResourceType, organizationID, projectID string) string {
179+
return fmt.Sprintf(
180+
"store.events.%s.%s.%s",
181+
resourceType,
182+
subjectToken(organizationID),
183+
subjectToken(projectID),
184+
)
185+
}
186+
187+
func subjectToken(value string) string {
188+
if value == "" {
189+
return "_"
190+
}
191+
192+
return strings.ReplaceAll(value, ".", "_")
193+
}

0 commit comments

Comments
 (0)