Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 9 additions & 38 deletions backend/client/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@ import (

"github.com/joho/godotenv"
"github.com/kelseyhightower/envconfig"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
at "mosaic-client.com/gen/audio_transcription"
cb "mosaic-client.com/gen/conversation_briefing"
fd "mosaic-client.com/gen/face_detection"
"mosaic-client.com/internal/handler"
"mosaic-client.com/internal/middleware"
"mosaic-client.com/internal/observability"
)

type Config struct {
Expand All @@ -31,37 +29,15 @@ func main() {
log.Fatalf("failed to load config values: %v", err)
}

var handler slog.Handler
if cfg.ProdMode {
handler = slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})
} else {
handler = slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})
}
logger := slog.New(handler).With("service", "client")
logger := observability.StructuredLogger(cfg.ProdMode)

logger.Info("Starting Mosaic backend server...")

audioConn, err := grpc.NewClient("localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
logger.Error("Unable to start audio gRPC client")
os.Exit(1)
}
gwClient := handler.NewClient("https://api.verturus.com", handler.DefaultRetryConfig)

faceConn, err := grpc.NewClient("localhost:40040", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
logger.Error("Unable to start audio gRPC client")
os.Exit(1)
}

briefingConn, err := grpc.NewClient("localhost:30030", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
logger.Error("Unable to start briefing generation gRPC client")
os.Exit(1)
}

atClient := at.NewAudioTranscriptionServiceClient(audioConn)
fdClient := fd.NewFaceDetectionServiceClient(faceConn)
cbClient := cb.NewConversationBriefingServiceClient(briefingConn)
atClient := at.NewAudioTranscriptionServiceClient(gwClient)
fdClient := fd.NewFaceDetectionServiceClient(gwClient)
cbClient := cb.NewConversationBriefingServiceClient(gwClient)

go websocketServer(cfg, logger, atClient, fdClient, cbClient)

Expand All @@ -74,14 +50,9 @@ func main() {
<-sigChan
logger.Info("Shutting down gracefully...")

err = audioConn.Close()
if err != nil {
logger.Warn("audio grpc connection not closed properly", "err", err)
}

err = faceConn.Close()
err = gwClient.Close()
if err != nil {
logger.Warn("face detection grpc connection not closed properly", "err", err)
logger.Warn("grpc-web connection not closed properly", "err", err)
}
logger.Debug("Closed gRPC connection")
}
Expand All @@ -107,7 +78,7 @@ func websocketServer(

server := http.Server{
Addr: fmt.Sprintf(":%s", cfg.ServerPort),
Handler: middleware.Logging(router),
Handler: observability.HTTPLogger(router),
}

logger.Debug("[client] Server running on", "port", cfg.ServerPort)
Expand Down
7 changes: 2 additions & 5 deletions backend/client/cmd/makefile
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
test_all: unit integration

PKGS := ../internal/handler/... ../internal/middleware/... ../internal/service/...
PKGS := ../internal/handler/... ../internal/observability/... ../internal/service/...

coverage:
go test -tags=unit,integration ${PKGS} -v -coverprofile=coverage.out -covermode=atomic

format:
go fmt .
go fmt ../internal/middleware/... .
go fmt ../internal/handler/... .
go fmt ../internal/service/... .
go fmt ${PKGS}

integration:
go test -tags integration ${PKGS}
Expand Down
201 changes: 201 additions & 0 deletions backend/client/internal/handler/grpc_web_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package handler

import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)

type RetryConfig struct {
MaxAttempts int
InitialBackoff time.Duration
MaxBackoff time.Duration
BackoffMultiplier float64
RetryableCodes []codes.Code
}

var DefaultRetryConfig = RetryConfig{
MaxAttempts: 4,
InitialBackoff: time.Second,
MaxBackoff: 10 * time.Second,
BackoffMultiplier: 2.0,
RetryableCodes: []codes.Code{codes.Unavailable},
}

var NoRetry = RetryConfig{MaxAttempts: 1}

type Client struct {
baseURL string
httpClient *http.Client
retry RetryConfig
}

func NewClient(baseURL string, retry RetryConfig) *Client {
return &Client{
baseURL: strings.TrimRight(baseURL, "/"),
httpClient: &http.Client{},
retry: retry,
}
}

func (c *Client) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...grpc.CallOption) error {
backoff := c.retry.InitialBackoff
var lastErr error

for attempt := 0; attempt < c.retry.MaxAttempts; attempt++ {
if attempt > 0 {
select {
case <-ctx.Done():
return status.FromContextError(ctx.Err()).Err()
case <-time.After(backoff):
}
backoff = time.Duration(float64(backoff) * c.retry.BackoffMultiplier)
if backoff > c.retry.MaxBackoff {
backoff = c.retry.MaxBackoff
}
}

lastErr = c.invoke(ctx, method, args, reply)
if lastErr == nil {
return nil
}
if !c.isRetryable(lastErr) {
return lastErr
}
}
return lastErr
}

func (c *Client) isRetryable(err error) bool {
s, ok := status.FromError(err)
if !ok {
return false
}
for _, code := range c.retry.RetryableCodes {
if s.Code() == code {
return true
}
}
return false
}

func (c *Client) invoke(ctx context.Context, method string, args, reply interface{}) error {
reqMsg, ok := args.(proto.Message)
if !ok {
return status.Error(codes.Internal, "args must be proto.Message")
}
reqBytes, err := proto.Marshal(reqMsg)
if err != nil {
return status.Errorf(codes.Internal, "marshal request: %v", err)
}

frame := make([]byte, 5+len(reqBytes))
frame[0] = 0
binary.BigEndian.PutUint32(frame[1:5], uint32(len(reqBytes)))
copy(frame[5:], reqBytes)

req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+method, bytes.NewReader(frame))
if err != nil {
return status.Errorf(codes.Internal, "create request: %v", err)
}
req.Header.Set("Content-Type", "application/grpc-web+proto")
req.Header.Set("X-Grpc-Web", "1")

resp, err := c.httpClient.Do(req)
if err != nil {
return status.Errorf(codes.Unavailable, "http request: %v", err)
}
defer func() {
err := resp.Body.Close()
if err != nil {
fmt.Printf("error closing resp body: %v", err)
}
}()

if resp.StatusCode != http.StatusOK {
return status.Errorf(codes.Internal, "http status: %d", resp.StatusCode)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return status.Errorf(codes.Internal, "read body: %v", err)
}

var grpcStatus codes.Code
var grpcMessage string
dataRead := false
pos := 0

for pos < len(body) {
if pos+5 > len(body) {
return status.Error(codes.Internal, "truncated frame header")
}
frameType := body[pos]
frameLen := int(binary.BigEndian.Uint32(body[pos+1 : pos+5]))
pos += 5

if pos+frameLen > len(body) {
return status.Error(codes.Internal, "truncated frame body")
}
frameData := body[pos : pos+frameLen]
pos += frameLen

switch frameType {
case 0x00:
replyMsg, ok := reply.(proto.Message)
if !ok {
return status.Error(codes.Internal, "reply must be proto.Message")
}
if err := proto.Unmarshal(frameData, replyMsg); err != nil {
return status.Errorf(codes.Internal, "unmarshal response: %v", err)
}
dataRead = true
case 0x80:
for _, line := range strings.Split(string(frameData), "\r\n") {
k, v, ok := strings.Cut(line, ":")
if !ok {
continue
}
switch strings.TrimSpace(k) {
case "grpc-status":
var code int
_, err := fmt.Sscanf(strings.TrimSpace(v), "%d", &code)
if err != nil {
return status.Error(codes.Internal, "error scanning")
}
grpcStatus = codes.Code(code)
case "grpc-message":
grpcMessage, _ = url.PathUnescape(strings.TrimSpace(v))
}
}
}
}

if !dataRead && grpcStatus == codes.OK {
return status.Error(codes.Internal, "no data frame in response")
}
if grpcStatus != codes.OK {
return status.Error(grpcStatus, grpcMessage)
}
return nil
}

func (c *Client) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
return nil, status.Error(codes.Unimplemented, "streaming not supported")
}

func (c *Client) Close() error {
c.httpClient.CloseIdleConnections()
return nil
}
Loading
Loading