Skip to content

Commit 85e55dc

Browse files
interop: update client for xds testing support (#4108)
1 parent 6a318bb commit 85e55dc

File tree

1 file changed

+135
-21
lines changed

1 file changed

+135
-21
lines changed

interop/xds/client/client.go

Lines changed: 135 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ import (
3838
_ "google.golang.org/grpc/xds"
3939
)
4040

41+
func init() {
42+
rpcCfgs.Store([]*rpcConfig{{typ: unaryCall}})
43+
}
44+
4145
type statsWatcherKey struct {
4246
startID int32
4347
endID int32
@@ -73,21 +77,84 @@ func (watcher *statsWatcher) buildResp() *testpb.LoadBalancerStatsResponse {
7377
}
7478
}
7579

80+
type accumulatedStats struct {
81+
mu sync.Mutex
82+
numRpcsStartedByMethod map[string]int32
83+
numRpcsSucceededByMethod map[string]int32
84+
numRpcsFailedByMethod map[string]int32
85+
}
86+
87+
// copyStatsMap makes a copy of the map, and also replaces the RPC type string
88+
// to the proto string. E.g. "UnaryCall" -> "UNARY_CALL".
89+
func copyStatsMap(originalMap map[string]int32) (newMap map[string]int32) {
90+
newMap = make(map[string]int32)
91+
for k, v := range originalMap {
92+
var kk string
93+
switch k {
94+
case unaryCall:
95+
kk = testpb.ClientConfigureRequest_UNARY_CALL.String()
96+
case emptyCall:
97+
kk = testpb.ClientConfigureRequest_EMPTY_CALL.String()
98+
default:
99+
logger.Warningf("unrecognized rpc type: %s", k)
100+
}
101+
if kk == "" {
102+
continue
103+
}
104+
newMap[kk] = v
105+
}
106+
return newMap
107+
}
108+
109+
func (as *accumulatedStats) buildResp() *testpb.LoadBalancerAccumulatedStatsResponse {
110+
as.mu.Lock()
111+
defer as.mu.Unlock()
112+
return &testpb.LoadBalancerAccumulatedStatsResponse{
113+
NumRpcsStartedByMethod: copyStatsMap(as.numRpcsStartedByMethod),
114+
NumRpcsSucceededByMethod: copyStatsMap(as.numRpcsSucceededByMethod),
115+
NumRpcsFailedByMethod: copyStatsMap(as.numRpcsFailedByMethod),
116+
}
117+
}
118+
119+
func (as *accumulatedStats) startRPC(rpcType string) {
120+
as.mu.Lock()
121+
defer as.mu.Unlock()
122+
as.numRpcsStartedByMethod[rpcType]++
123+
}
124+
125+
func (as *accumulatedStats) finishRPC(rpcType string, failed bool) {
126+
as.mu.Lock()
127+
defer as.mu.Unlock()
128+
if failed {
129+
as.numRpcsFailedByMethod[rpcType]++
130+
return
131+
}
132+
as.numRpcsSucceededByMethod[rpcType]++
133+
}
134+
76135
var (
77136
failOnFailedRPC = flag.Bool("fail_on_failed_rpc", false, "Fail client if any RPCs fail after first success")
78137
numChannels = flag.Int("num_channels", 1, "Num of channels")
79138
printResponse = flag.Bool("print_response", false, "Write RPC response to stdout")
80139
qps = flag.Int("qps", 1, "QPS per channel, for each type of RPC")
81-
rpc = flag.String("rpc", "UnaryCall", "Types of RPCs to make, ',' separated string. RPCs can be EmptyCall or UnaryCall")
82-
rpcMetadata = flag.String("metadata", "", "The metadata to send with RPC, in format EmptyCall:key1:value1,UnaryCall:key2:value2")
140+
rpc = flag.String("rpc", "UnaryCall", "Types of RPCs to make, ',' separated string. RPCs can be EmptyCall or UnaryCall. Deprecated: Use Configure RPC to XdsUpdateClientConfigureServiceServer instead.")
141+
rpcMetadata = flag.String("metadata", "", "The metadata to send with RPC, in format EmptyCall:key1:value1,UnaryCall:key2:value2. Deprecated: Use Configure RPC to XdsUpdateClientConfigureServiceServer instead.")
83142
rpcTimeout = flag.Duration("rpc_timeout", 20*time.Second, "Per RPC timeout")
84143
server = flag.String("server", "localhost:8080", "Address of server to connect to")
85144
statsPort = flag.Int("stats_port", 8081, "Port to expose peer distribution stats service")
86145

146+
rpcCfgs atomic.Value
147+
87148
mu sync.Mutex
88149
currentRequestID int32
89150
watchers = make(map[statsWatcherKey]*statsWatcher)
90151

152+
accStats = accumulatedStats{
153+
numRpcsStartedByMethod: make(map[string]int32),
154+
numRpcsSucceededByMethod: make(map[string]int32),
155+
numRpcsFailedByMethod: make(map[string]int32),
156+
}
157+
91158
// 0 or 1 representing an RPC has succeeded. Use hasRPCSucceeded and
92159
// setRPCSucceeded to access in a safe manner.
93160
rpcSucceeded uint32
@@ -163,6 +230,47 @@ func (s *statsService) GetClientStats(ctx context.Context, in *testpb.LoadBalanc
163230
}
164231
}
165232

233+
func (s *statsService) GetClientAccumulatedStats(ctx context.Context, in *testpb.LoadBalancerAccumulatedStatsRequest) (*testpb.LoadBalancerAccumulatedStatsResponse, error) {
234+
return accStats.buildResp(), nil
235+
}
236+
237+
type configureService struct {
238+
testpb.UnimplementedXdsUpdateClientConfigureServiceServer
239+
}
240+
241+
func (s *configureService) Configure(ctx context.Context, in *testpb.ClientConfigureRequest) (*testpb.ClientConfigureResponse, error) {
242+
rpcsToMD := make(map[testpb.ClientConfigureRequest_RpcType][]string)
243+
for _, typ := range in.GetTypes() {
244+
rpcsToMD[typ] = nil
245+
}
246+
for _, md := range in.GetMetadata() {
247+
typ := md.GetType()
248+
strs, ok := rpcsToMD[typ]
249+
if !ok {
250+
continue
251+
}
252+
rpcsToMD[typ] = append(strs, md.GetKey(), md.GetValue())
253+
}
254+
cfgs := make([]*rpcConfig, 0, len(rpcsToMD))
255+
for typ, md := range rpcsToMD {
256+
var rpcType string
257+
switch typ {
258+
case testpb.ClientConfigureRequest_UNARY_CALL:
259+
rpcType = unaryCall
260+
case testpb.ClientConfigureRequest_EMPTY_CALL:
261+
rpcType = emptyCall
262+
default:
263+
return nil, fmt.Errorf("unsupported RPC type: %v", typ)
264+
}
265+
cfgs = append(cfgs, &rpcConfig{
266+
typ: rpcType,
267+
md: metadata.Pairs(md...),
268+
})
269+
}
270+
rpcCfgs.Store(cfgs)
271+
return &testpb.ClientConfigureResponse{}, nil
272+
}
273+
166274
const (
167275
unaryCall string = "UnaryCall"
168276
emptyCall string = "EmptyCall"
@@ -218,7 +326,7 @@ func parseRPCMetadata(rpcMetadataStr string, rpcs []string) []*rpcConfig {
218326

219327
func main() {
220328
flag.Parse()
221-
rpcCfgs := parseRPCMetadata(*rpcMetadata, parseRPCTypes(*rpc))
329+
rpcCfgs.Store(parseRPCMetadata(*rpcMetadata, parseRPCTypes(*rpc)))
222330

223331
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *statsPort))
224332
if err != nil {
@@ -227,6 +335,7 @@ func main() {
227335
s := grpc.NewServer()
228336
defer s.Stop()
229337
testpb.RegisterLoadBalancerStatsServiceServer(s, &statsService{})
338+
testpb.RegisterXdsUpdateClientConfigureServiceServer(s, &configureService{})
230339
go s.Serve(lis)
231340

232341
clients := make([]testpb.TestServiceClient, *numChannels)
@@ -240,7 +349,7 @@ func main() {
240349
}
241350
ticker := time.NewTicker(time.Second / time.Duration(*qps**numChannels))
242351
defer ticker.Stop()
243-
sendRPCs(clients, rpcCfgs, ticker)
352+
sendRPCs(clients, ticker)
244353
}
245354

246355
func makeOneRPC(c testpb.TestServiceClient, cfg *rpcConfig) (*peer.Peer, *rpcInfo, error) {
@@ -257,6 +366,7 @@ func makeOneRPC(c testpb.TestServiceClient, cfg *rpcConfig) (*peer.Peer, *rpcInf
257366
header metadata.MD
258367
err error
259368
)
369+
accStats.startRPC(cfg.typ)
260370
switch cfg.typ {
261371
case unaryCall:
262372
var resp *testpb.SimpleResponse
@@ -270,8 +380,10 @@ func makeOneRPC(c testpb.TestServiceClient, cfg *rpcConfig) (*peer.Peer, *rpcInf
270380
_, err = c.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&p), grpc.Header(&header))
271381
}
272382
if err != nil {
383+
accStats.finishRPC(cfg.typ, true)
273384
return nil, nil, err
274385
}
386+
accStats.finishRPC(cfg.typ, false)
275387

276388
hosts := header["hostname"]
277389
if len(hosts) > 0 {
@@ -280,26 +392,28 @@ func makeOneRPC(c testpb.TestServiceClient, cfg *rpcConfig) (*peer.Peer, *rpcInf
280392
return &p, &info, err
281393
}
282394

283-
func sendRPCs(clients []testpb.TestServiceClient, cfgs []*rpcConfig, ticker *time.Ticker) {
395+
func sendRPCs(clients []testpb.TestServiceClient, ticker *time.Ticker) {
284396
var i int
285397
for range ticker.C {
286-
go func(i int) {
287-
// Get and increment request ID, and save a list of watchers that
288-
// are interested in this RPC.
289-
mu.Lock()
290-
savedRequestID := currentRequestID
291-
currentRequestID++
292-
savedWatchers := []*statsWatcher{}
293-
for key, value := range watchers {
294-
if key.startID <= savedRequestID && savedRequestID < key.endID {
295-
savedWatchers = append(savedWatchers, value)
296-
}
398+
// Get and increment request ID, and save a list of watchers that are
399+
// interested in this RPC.
400+
mu.Lock()
401+
savedRequestID := currentRequestID
402+
currentRequestID++
403+
savedWatchers := []*statsWatcher{}
404+
for key, value := range watchers {
405+
if key.startID <= savedRequestID && savedRequestID < key.endID {
406+
savedWatchers = append(savedWatchers, value)
297407
}
298-
mu.Unlock()
408+
}
409+
mu.Unlock()
299410

300-
c := clients[i]
411+
// Get the RPC metadata configurations from the Configure RPC.
412+
cfgs := rpcCfgs.Load().([]*rpcConfig)
301413

302-
for _, cfg := range cfgs {
414+
c := clients[i]
415+
for _, cfg := range cfgs {
416+
go func(cfg *rpcConfig) {
303417
p, info, err := makeOneRPC(c, cfg)
304418

305419
for _, watcher := range savedWatchers {
@@ -325,8 +439,8 @@ func sendRPCs(clients []testpb.TestServiceClient, cfgs []*rpcConfig, ticker *tim
325439
fmt.Printf("RPC %q, failed with %v\n", cfg.typ, err)
326440
}
327441
}
328-
}
329-
}(i)
442+
}(cfg)
443+
}
330444
i = (i + 1) % len(clients)
331445
}
332446
}

0 commit comments

Comments
 (0)