Skip to content

Commit 93abfe2

Browse files
committed
add gRPC plugin supports for unary, server stream,
client stream, and bidirectional stream.
1 parent 1360193 commit 93abfe2

25 files changed

+3304
-0
lines changed
14.5 MB
Binary file not shown.

cmd/tools/grpc_test_server/main.go

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"io"
6+
"log"
7+
"net"
8+
"time"
9+
10+
"google.golang.org/grpc"
11+
"google.golang.org/grpc/health"
12+
healthpb "google.golang.org/grpc/health/grpc_health_v1"
13+
"google.golang.org/grpc/reflection"
14+
15+
structpb "google.golang.org/protobuf/types/known/structpb"
16+
)
17+
18+
// Manual registration for bidi service using google.protobuf.Struct
19+
func chatStreamHandler(_ interface{}, stream grpc.ServerStream) error {
20+
for {
21+
in := new(structpb.Struct)
22+
if err := stream.RecvMsg(in); err != nil {
23+
if err == io.EOF {
24+
return nil
25+
}
26+
return err
27+
}
28+
// Enrich response with server metadata and timestamp so client logs are visible
29+
enriched := map[string]interface{}{}
30+
for k, v := range in.AsMap() {
31+
enriched[k] = v
32+
}
33+
enriched["server"] = "grpc_test_server"
34+
enriched["ts"] = time.Now().Format(time.RFC3339Nano)
35+
enriched["note"] = "bidi echo"
36+
out, _ := structpb.NewStruct(enriched)
37+
if err := stream.SendMsg(out); err != nil {
38+
return err
39+
}
40+
}
41+
}
42+
43+
var chatServiceDesc = grpc.ServiceDesc{
44+
ServiceName: "chat.Chat",
45+
HandlerType: (*interface{})(nil),
46+
Streams: []grpc.StreamDesc{{
47+
StreamName: "Stream",
48+
Handler: chatStreamHandler,
49+
ServerStreams: true,
50+
ClientStreams: true,
51+
}},
52+
}
53+
54+
// Server-stream-only echo service using Struct
55+
func echoStreamHandler(_ interface{}, stream grpc.ServerStream) error {
56+
in := new(structpb.Struct)
57+
if err := stream.RecvMsg(in); err != nil {
58+
if err == io.EOF {
59+
return nil
60+
}
61+
return err
62+
}
63+
// emit a few echoes and finish
64+
for i := 0; i < 3; i++ {
65+
if err := stream.SendMsg(in); err != nil {
66+
return err
67+
}
68+
time.Sleep(200 * time.Millisecond)
69+
}
70+
return nil
71+
}
72+
73+
var echoServiceDesc = grpc.ServiceDesc{
74+
ServiceName: "echo.Echo",
75+
HandlerType: (*interface{})(nil),
76+
Streams: []grpc.StreamDesc{{
77+
StreamName: "Stream",
78+
Handler: echoStreamHandler,
79+
ServerStreams: true,
80+
ClientStreams: false,
81+
}},
82+
}
83+
84+
// Client-stream-only ingest service: counts messages and returns a final result
85+
func ingestStreamHandler(_ interface{}, stream grpc.ServerStream) error {
86+
count := 0
87+
for {
88+
in := new(structpb.Struct)
89+
if err := stream.RecvMsg(in); err != nil {
90+
if err == io.EOF {
91+
break
92+
}
93+
return err
94+
}
95+
count++
96+
}
97+
// respond once with count
98+
out, _ := structpb.NewStruct(map[string]interface{}{"count": count})
99+
return stream.SendMsg(out)
100+
}
101+
102+
var ingestServiceDesc = grpc.ServiceDesc{
103+
ServiceName: "ingest.Ingest",
104+
HandlerType: (*interface{})(nil),
105+
Streams: []grpc.StreamDesc{{
106+
StreamName: "Stream",
107+
Handler: ingestStreamHandler,
108+
ServerStreams: false,
109+
ClientStreams: true,
110+
}},
111+
}
112+
113+
func main() {
114+
lis, err := net.Listen("tcp", ":50051")
115+
if err != nil {
116+
log.Fatalf("listen: %v", err)
117+
}
118+
s := grpc.NewServer()
119+
120+
// Health service
121+
hs := health.NewServer()
122+
// Set default service to SERVING
123+
hs.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
124+
125+
healthpb.RegisterHealthServer(s, hs)
126+
127+
// Reflection for dynamic clients
128+
reflection.Register(s)
129+
130+
// Register manual chat service
131+
s.RegisterService(&chatServiceDesc, nil)
132+
// Register echo server-stream-only service
133+
s.RegisterService(&echoServiceDesc, nil)
134+
// Register ingest client-stream-only service
135+
s.RegisterService(&ingestServiceDesc, nil)
136+
137+
log.Println("grpc test server listening on :50051")
138+
if err := s.Serve(lis); err != nil {
139+
log.Fatalf("serve: %v", err)
140+
}
141+
}
142+
143+
// Ensure unused import of context isn't optimized out in newer toolchains
144+
var _ = context.Background
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
Generate Go bindings (requires protoc and protoc-gen-go, protoc-gen-go-grpc):
2+
3+
protoc --go_out=. --go-grpc_out=. chat.proto
4+
5+
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
syntax = "proto3";
2+
package chat;
3+
4+
import "google/protobuf/struct.proto";
5+
6+
service Chat {
7+
rpc Stream(stream google.protobuf.Struct) returns (stream google.protobuf.Struct);
8+
}
9+
10+
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
syntax = "proto3";
2+
package echo;
3+
4+
import "google/protobuf/struct.proto";
5+
6+
service Echo {
7+
rpc Stream(google.protobuf.Struct) returns (stream google.protobuf.Struct);
8+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
syntax = "proto3";
2+
package google.protobuf;
3+
4+
option go_package = "google.golang.org/protobuf/types/known/structpb";
5+
6+
message Struct {
7+
map<string, Value> fields = 1;
8+
}
9+
10+
message Value {
11+
oneof kind {
12+
NullValue null_value = 1;
13+
double number_value = 2;
14+
string string_value = 3;
15+
bool bool_value = 4;
16+
Struct struct_value = 5;
17+
ListValue list_value = 6;
18+
}
19+
}
20+
21+
enum NullValue {
22+
NULL_VALUE = 0;
23+
}
24+
25+
message ListValue {
26+
repeated Value values = 1;
27+
}
28+
29+
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
syntax = "proto3";
2+
3+
package grpc.health.v1;
4+
5+
// Standard gRPC health checking protocol
6+
service Health {
7+
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
8+
}
9+
10+
message HealthCheckRequest {
11+
string service = 1;
12+
}
13+
14+
enum HealthCheckResponse_ServingStatus {
15+
UNKNOWN = 0;
16+
SERVING = 1;
17+
NOT_SERVING = 2;
18+
SERVICE_UNKNOWN = 3;
19+
}
20+
21+
message HealthCheckResponse {
22+
HealthCheckResponse_ServingStatus status = 1;
23+
}
24+
25+
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
syntax = "proto3";
2+
package ingest;
3+
4+
import "google/protobuf/struct.proto";
5+
6+
service Ingest {
7+
rpc Stream(stream google.protobuf.Struct) returns (google.protobuf.Struct);
8+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
input:
2+
label: ""
3+
generate:
4+
interval: 1s
5+
mapping: |
6+
root = {
7+
"session_id": "demo",
8+
"message": "hello"
9+
}
10+
11+
pipeline:
12+
processors: []
13+
14+
output:
15+
label: ""
16+
grpc_client:
17+
# minimal required fields
18+
address: "127.0.0.1:50051"
19+
method: "/chat.Chat/Stream"
20+
rpc_type: "bidi"
21+
proto_files:
22+
- chat.proto
23+
- google/protobuf/struct.proto
24+
include_paths:
25+
- cmd/tools/grpc_test_server/pb
26+
# optional timeouts
27+
call_timeout: "30s"
28+
connect_timeout: "2s"
29+
# optional retries
30+
retry_max_attempts: 3
31+
retry_initial_backoff: "200ms"
32+
retry_max_backoff: "2s"
33+
retry_backoff_multiplier: 2
34+
# optional auth headers
35+
# bearer_token: "your-token-here"
36+
# auth_headers:
37+
# x-api-key: "demo"
38+
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
input:
2+
label: ""
3+
generate:
4+
count: 3
5+
interval: 200ms
6+
mapping: |
7+
root = {}
8+
9+
pipeline:
10+
processors: []
11+
12+
output:
13+
label: ""
14+
grpc_client:
15+
address: "127.0.0.1:50051"
16+
method: "/ingest.Ingest/Stream"
17+
rpc_type: "client_stream"
18+
proto_files:
19+
- ingest.proto
20+
- google/protobuf/struct.proto
21+
include_paths:
22+
- cmd/tools/grpc_test_server/pb
23+
call_timeout: "30s"
24+
connect_timeout: "2s"
25+
retry_max_attempts: 2
26+
retry_initial_backoff: "100ms"
27+
retry_max_backoff: "1s"
28+
retry_backoff_multiplier: 2
29+

0 commit comments

Comments
 (0)