Skip to content

interop: update client for xds testing support #4108

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 9, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 135 additions & 21 deletions interop/xds/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ import (
_ "google.golang.org/grpc/xds"
)

func init() {
rpcCfgs.Store([]*rpcConfig{{typ: unaryCall}})
}

type statsWatcherKey struct {
startID int32
endID int32
Expand Down Expand Up @@ -73,21 +77,84 @@ func (watcher *statsWatcher) buildResp() *testpb.LoadBalancerStatsResponse {
}
}

type accumulatedStats struct {
mu sync.Mutex
numRpcsStartedByMethod map[string]int32
numRpcsSucceededByMethod map[string]int32
numRpcsFailedByMethod map[string]int32
}

// copyStatsMap makes a copy of the map, and also replaces the RPC type string
// to the proto string. E.g. "UnaryCall" -> "UNARY_CALL".
func copyStatsMap(originalMap map[string]int32) (newMap map[string]int32) {
newMap = make(map[string]int32)
for k, v := range originalMap {
var kk string
switch k {
case unaryCall:
kk = testpb.ClientConfigureRequest_UNARY_CALL.String()
case emptyCall:
kk = testpb.ClientConfigureRequest_EMPTY_CALL.String()
default:
logger.Warningf("unrecognized rpc type: %s", k)
}
if kk == "" {
continue
}
newMap[kk] = v
}
return newMap
}

func (as *accumulatedStats) buildResp() *testpb.LoadBalancerAccumulatedStatsResponse {
as.mu.Lock()
defer as.mu.Unlock()
return &testpb.LoadBalancerAccumulatedStatsResponse{
NumRpcsStartedByMethod: copyStatsMap(as.numRpcsStartedByMethod),
NumRpcsSucceededByMethod: copyStatsMap(as.numRpcsSucceededByMethod),
NumRpcsFailedByMethod: copyStatsMap(as.numRpcsFailedByMethod),
}
}

func (as *accumulatedStats) startRPC(rpcType string) {
as.mu.Lock()
defer as.mu.Unlock()
as.numRpcsStartedByMethod[rpcType]++
}

func (as *accumulatedStats) finishRPC(rpcType string, failed bool) {
as.mu.Lock()
defer as.mu.Unlock()
if failed {
as.numRpcsFailedByMethod[rpcType]++
return
}
as.numRpcsSucceededByMethod[rpcType]++
}

var (
failOnFailedRPC = flag.Bool("fail_on_failed_rpc", false, "Fail client if any RPCs fail after first success")
numChannels = flag.Int("num_channels", 1, "Num of channels")
printResponse = flag.Bool("print_response", false, "Write RPC response to stdout")
qps = flag.Int("qps", 1, "QPS per channel, for each type of RPC")
rpc = flag.String("rpc", "UnaryCall", "Types of RPCs to make, ',' separated string. RPCs can be EmptyCall or UnaryCall")
rpcMetadata = flag.String("metadata", "", "The metadata to send with RPC, in format EmptyCall:key1:value1,UnaryCall:key2:value2")
rpc = flag.String("rpc", "UnaryCall", "Types of RPCs to make, ',' separated string. RPCs can be EmptyCall or UnaryCall. Deprecated: Use Configure RPC to XdsUpdateClientConfigureServiceServer instead.")
rpcMetadata = flag.String("metadata", "", "The metadata to send with RPC, in format EmptyCall:key1:value1,UnaryCall:key2:value2. Deprecated: Use Configure RPC to XdsUpdateClientConfigureServiceServer instead.")
rpcTimeout = flag.Duration("rpc_timeout", 20*time.Second, "Per RPC timeout")
server = flag.String("server", "localhost:8080", "Address of server to connect to")
statsPort = flag.Int("stats_port", 8081, "Port to expose peer distribution stats service")

rpcCfgs atomic.Value

mu sync.Mutex
currentRequestID int32
watchers = make(map[statsWatcherKey]*statsWatcher)

accStats = accumulatedStats{
numRpcsStartedByMethod: make(map[string]int32),
numRpcsSucceededByMethod: make(map[string]int32),
numRpcsFailedByMethod: make(map[string]int32),
}

// 0 or 1 representing an RPC has succeeded. Use hasRPCSucceeded and
// setRPCSucceeded to access in a safe manner.
rpcSucceeded uint32
Expand Down Expand Up @@ -163,6 +230,47 @@ func (s *statsService) GetClientStats(ctx context.Context, in *testpb.LoadBalanc
}
}

func (s *statsService) GetClientAccumulatedStats(ctx context.Context, in *testpb.LoadBalancerAccumulatedStatsRequest) (*testpb.LoadBalancerAccumulatedStatsResponse, error) {
return accStats.buildResp(), nil
}

type configureService struct {
testpb.UnimplementedXdsUpdateClientConfigureServiceServer
}

func (s *configureService) Configure(ctx context.Context, in *testpb.ClientConfigureRequest) (*testpb.ClientConfigureResponse, error) {
rpcsToMD := make(map[testpb.ClientConfigureRequest_RpcType][]string)
for _, typ := range in.GetTypes() {
rpcsToMD[typ] = nil
}
for _, md := range in.GetMetadata() {
typ := md.GetType()
strs, ok := rpcsToMD[typ]
if !ok {
continue
}
rpcsToMD[typ] = append(strs, md.GetKey(), md.GetValue())
}
cfgs := make([]*rpcConfig, 0, len(rpcsToMD))
for typ, md := range rpcsToMD {
var rpcType string
switch typ {
case testpb.ClientConfigureRequest_UNARY_CALL:
rpcType = unaryCall
case testpb.ClientConfigureRequest_EMPTY_CALL:
rpcType = emptyCall
default:
return nil, fmt.Errorf("unsupported RPC type: %v", typ)
}
cfgs = append(cfgs, &rpcConfig{
typ: rpcType,
md: metadata.Pairs(md...),
})
}
rpcCfgs.Store(cfgs)
return &testpb.ClientConfigureResponse{}, nil
}

const (
unaryCall string = "UnaryCall"
emptyCall string = "EmptyCall"
Expand Down Expand Up @@ -218,7 +326,7 @@ func parseRPCMetadata(rpcMetadataStr string, rpcs []string) []*rpcConfig {

func main() {
flag.Parse()
rpcCfgs := parseRPCMetadata(*rpcMetadata, parseRPCTypes(*rpc))
rpcCfgs.Store(parseRPCMetadata(*rpcMetadata, parseRPCTypes(*rpc)))

lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *statsPort))
if err != nil {
Expand All @@ -227,6 +335,7 @@ func main() {
s := grpc.NewServer()
defer s.Stop()
testpb.RegisterLoadBalancerStatsServiceServer(s, &statsService{})
testpb.RegisterXdsUpdateClientConfigureServiceServer(s, &configureService{})
go s.Serve(lis)

clients := make([]testpb.TestServiceClient, *numChannels)
Expand All @@ -240,7 +349,7 @@ func main() {
}
ticker := time.NewTicker(time.Second / time.Duration(*qps**numChannels))
defer ticker.Stop()
sendRPCs(clients, rpcCfgs, ticker)
sendRPCs(clients, ticker)
}

func makeOneRPC(c testpb.TestServiceClient, cfg *rpcConfig) (*peer.Peer, *rpcInfo, error) {
Expand All @@ -257,6 +366,7 @@ func makeOneRPC(c testpb.TestServiceClient, cfg *rpcConfig) (*peer.Peer, *rpcInf
header metadata.MD
err error
)
accStats.startRPC(cfg.typ)
switch cfg.typ {
case unaryCall:
var resp *testpb.SimpleResponse
Expand All @@ -270,8 +380,10 @@ func makeOneRPC(c testpb.TestServiceClient, cfg *rpcConfig) (*peer.Peer, *rpcInf
_, err = c.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&p), grpc.Header(&header))
}
if err != nil {
accStats.finishRPC(cfg.typ, true)
return nil, nil, err
}
accStats.finishRPC(cfg.typ, false)

hosts := header["hostname"]
if len(hosts) > 0 {
Expand All @@ -280,26 +392,28 @@ func makeOneRPC(c testpb.TestServiceClient, cfg *rpcConfig) (*peer.Peer, *rpcInf
return &p, &info, err
}

func sendRPCs(clients []testpb.TestServiceClient, cfgs []*rpcConfig, ticker *time.Ticker) {
func sendRPCs(clients []testpb.TestServiceClient, ticker *time.Ticker) {
var i int
for range ticker.C {
go func(i int) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you move this code out from the goroutine? This won't work if ticker fires faster than the code execution time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that the server is being updated so that the RPCs will hang long enough for a count on in-flight RPCs (versus RPCs failed due to circuit breaking) to be made. The old goroutine loops through each type of RPC (empty or unary). Now the loop is first and the go call is inside the loop so that each RPC is on its own goroutine and won't stop the RPC of the other type from being called if it hangs. I will update the ticker though so the timing is correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But why not just add the goroutine? Why do you need to remove the old goroutine?

Copy link
Contributor Author

@GarrettGutierrez1 GarrettGutierrez1 Dec 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goroutine needs to happen on a basis per the elements in cfgs (i.e. per type of RPC to perform). I didn't remove the old goroutine, just moved it inside the for _, cfg := range cfgs {... loop, since previously it was on a per channel basis. I also had to take the logic of building the savedWatchers out since that needed to stay being done on a per channel basis. Otherwise the goroutine after the change should be the same as the one before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I didn't miss anything, we should keep the per-tick PRC. And this is what I think should happen

  • for each tick, start go func(i) to
    • build savedWatchers
    • get the list of configs
    • for each config
      • start go func(i, cfg) to run the PRC

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW this LGTM - as far as I can tell, we don't really need the outer goroutine, as the operations now not inside the goroutine are nonblocking and pretty lightweight.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I didn't missing anything, this one is not fixed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true. I was just thinking there's no need for this "diff". And it feels safer the old way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true. I was just thinking there's no need for this "diff". And it feels safer the old way.

Some kind of change here was necessary. The old code used one goroutine to send RPCs for all configs (serially). These RPCs were not expected to block unless there was a problem. Now some RPCs are expected to block indefinitely, so it's important for each RPC to happen in its own goroutine. This diff moves the go to directly around the RPC itself, where it could block, instead of around the whole ticker invocation. This seems fine to me. The diffs that aren't real diffs because of indent changing are unfortunate but not problematic.

Let's get the manual tests done ASAP so we can submit this!

// Get and increment request ID, and save a list of watchers that
// are interested in this RPC.
mu.Lock()
savedRequestID := currentRequestID
currentRequestID++
savedWatchers := []*statsWatcher{}
for key, value := range watchers {
if key.startID <= savedRequestID && savedRequestID < key.endID {
savedWatchers = append(savedWatchers, value)
}
// Get and increment request ID, and save a list of watchers that are
// interested in this RPC.
mu.Lock()
savedRequestID := currentRequestID
currentRequestID++
savedWatchers := []*statsWatcher{}
for key, value := range watchers {
if key.startID <= savedRequestID && savedRequestID < key.endID {
savedWatchers = append(savedWatchers, value)
}
mu.Unlock()
}
mu.Unlock()

c := clients[i]
// Get the RPC metadata configurations from the Configure RPC.
cfgs := rpcCfgs.Load().([]*rpcConfig)

for _, cfg := range cfgs {
c := clients[i]
for _, cfg := range cfgs {
go func(cfg *rpcConfig) {
p, info, err := makeOneRPC(c, cfg)

for _, watcher := range savedWatchers {
Expand All @@ -325,8 +439,8 @@ func sendRPCs(clients []testpb.TestServiceClient, cfgs []*rpcConfig, ticker *tim
fmt.Printf("RPC %q, failed with %v\n", cfg.typ, err)
}
}
}
}(i)
}(cfg)
}
i = (i + 1) % len(clients)
}
}