diff --git a/README.md b/README.md index d6f317248d..c457f80c5d 100644 --- a/README.md +++ b/README.md @@ -95,4 +95,6 @@ $ ./cortex.sh install cli - **Log streaming:** Cortex streams logs from your deployed models to your CLI. +- **Prediction Monitoring:** Cortex can monitor network metrics and track predictions. + - **CPU / GPU support:** Cortex can run inference on CPU or GPU infrastructure. diff --git a/cli/cmd/get.go b/cli/cmd/get.go index 9326a410d3..70df5933a7 100644 --- a/cli/cmd/get.go +++ b/cli/cmd/get.go @@ -26,6 +26,7 @@ import ( "github.com/cortexlabs/yaml" "github.com/spf13/cobra" + "github.com/cortexlabs/cortex/pkg/consts" "github.com/cortexlabs/cortex/pkg/lib/console" "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/json" @@ -126,7 +127,7 @@ func allDeploymentsStr() (string, error) { } if len(resourcesRes.Deployments) == 0 { - return "no deployments found", nil + return console.Bold("\nno deployments found"), nil } rows := make([][]interface{}, len(resourcesRes.Deployments)) @@ -464,15 +465,25 @@ func describeAPI(name string, resourcesRes *schema.GetResourcesResponse, flagVer apiEndpoint := urls.Join(resourcesRes.APIsBaseURL, anyAPIStatus.Path) out := "\n" + console.Bold("url: ") + apiEndpoint + "\n" - out += fmt.Sprintf("%s curl -X POST -H \"Content-Type: application/json\" %s -d @samples.json\n\n", console.Bold("curl:"), apiEndpoint) - out += fmt.Sprintf(console.Bold("updated at:")+" %s\n", libtime.LocalTimestamp(updatedAt)) + out += fmt.Sprintf("%s curl -X POST -H \"Content-Type: application/json\" %s -d @samples.json\n", console.Bold("curl:"), apiEndpoint) + out += fmt.Sprintf(console.Bold("updated at:")+" %s\n\n", libtime.LocalTimestamp(updatedAt)) - out += "\n" - t := table.Table{ + statusTable := table.Table{ Headers: headers, Rows: [][]interface{}{row}, } - out += table.MustFormat(t) + + var predictionMetrics string + apiMetrics, err := getAPIMetrics(ctx.App.Name, api.Name) + if err != nil || apiMetrics == nil { + predictionMetrics = "\n\nmetrics are not available yet" + } else { + statusTable = appendNetworkMetrics(statusTable, apiMetrics) + predictionMetrics = "\n\n" + predictionMetricsTable(apiMetrics, api) + } + + out += table.MustFormat(statusTable) + out += predictionMetrics if !flagVerbose { return out, nil @@ -507,6 +518,145 @@ func describeAPI(name string, resourcesRes *schema.GetResourcesResponse, flagVer return out, nil } +func getAPIMetrics(appName, apiName string) (*schema.APIMetrics, error) { + params := map[string]string{"appName": appName, "apiName": apiName} + httpResponse, err := HTTPGet("/metrics", params) + if err != nil { + return nil, err + } + + var apiMetrics schema.APIMetrics + err = json.Unmarshal(httpResponse, &apiMetrics) + if err != nil { + return nil, err + } + + return &apiMetrics, nil +} + +func appendNetworkMetrics(apiTable table.Table, apiMetrics *schema.APIMetrics) table.Table { + latency := "-" + if apiMetrics.NetworkStats.Latency != nil { + latency = fmt.Sprintf("%.9g", *apiMetrics.NetworkStats.Latency) + } + + headers := []table.Header{ + {Title: "avg latency"}, + {Title: "2XX", Hidden: apiMetrics.NetworkStats.Code2XX == 0}, + {Title: "4XX", Hidden: apiMetrics.NetworkStats.Code4XX == 0}, + {Title: "5XX", Hidden: apiMetrics.NetworkStats.Code5XX == 0}, + } + + row := []interface{}{ + latency, + apiMetrics.NetworkStats.Code2XX, + apiMetrics.NetworkStats.Code4XX, + apiMetrics.NetworkStats.Code5XX, + } + + apiTable.Headers = append(apiTable.Headers, headers...) + apiTable.Rows[0] = append(apiTable.Rows[0], row...) + + return apiTable +} + +func predictionMetricsTable(apiMetrics *schema.APIMetrics, api *context.API) string { + if api.Tracker == nil { + return "a tracker has not been configured to record predictions" + } + + if api.Tracker.ModelType == userconfig.ClassificationModelType { + return classificationMetricsTable(apiMetrics) + } + return regressionMetricsTable(apiMetrics) +} + +func regressionMetricsTable(apiMetrics *schema.APIMetrics) string { + minStr := "-" + if apiMetrics.RegressionStats.Min != nil { + minStr = fmt.Sprintf("%.9g", *apiMetrics.RegressionStats.Min) + } + + maxStr := "-" + if apiMetrics.RegressionStats.Max != nil { + maxStr = fmt.Sprintf("%.9g", *apiMetrics.RegressionStats.Max) + } + + avgStr := "-" + if apiMetrics.RegressionStats.Avg != nil { + avgStr = fmt.Sprintf("%.9g", *apiMetrics.RegressionStats.Avg) + } + + t := table.Table{ + Headers: []table.Header{ + {Title: "min", MaxWidth: 10}, + {Title: "max", MaxWidth: 10}, + {Title: "avg", MaxWidth: 10}, + }, + Rows: [][]interface{}{{minStr, maxStr, avgStr}}, + } + + return table.MustFormat(t) +} + +func classificationMetricsTable(apiMetrics *schema.APIMetrics) string { + classList := make([]string, len(apiMetrics.ClassDistribution)) + + i := 0 + for inputName := range apiMetrics.ClassDistribution { + classList[i] = inputName + i++ + } + sort.Strings(classList) + + if len(classList) > 0 && len(classList) < 4 { + row := []interface{}{} + headers := []table.Header{} + + for _, className := range classList { + headers = append(headers, table.Header{Title: s.TruncateEllipses(className, 20), MaxWidth: 20}) + row = append(row, apiMetrics.ClassDistribution[className]) + } + + t := table.Table{ + Headers: headers, + Rows: [][]interface{}{row}, + } + + return table.MustFormat(t) + } + + rows := make([][]interface{}, len(classList)) + for rowNum, className := range classList { + rows[rowNum] = []interface{}{ + className, + apiMetrics.ClassDistribution[className], + } + } + + if len(classList) == 0 { + rows = append(rows, []interface{}{ + "-", + "-", + }) + } + + t := table.Table{ + Headers: []table.Header{ + {Title: "class", MaxWidth: 40}, + {Title: "count", MaxWidth: 20}, + }, + Rows: rows, + } + + out := table.MustFormat(t) + + if len(classList) == consts.MaxClassesPerRequest { + out += fmt.Sprintf("\n\nlisting at most %d classes, the complete list can be found in your cloudwatch dashboard", consts.MaxClassesPerRequest) + } + return out +} + func describeModelInput(groupStatus *resource.APIGroupStatus, apiEndpoint string) string { if groupStatus.Available() == 0 { return "waiting for api to be ready" diff --git a/docs/deployments/apis.md b/docs/deployments/apis.md index 1fd213fcd1..3ac3bf7e2d 100644 --- a/docs/deployments/apis.md +++ b/docs/deployments/apis.md @@ -10,6 +10,9 @@ Serve models at scale. model: # path to an exported model (e.g. s3://my-bucket/model.zip) model_format: # model format, must be "tensorflow" or "onnx" (default: "onnx" if model path ends with .onnx, "tensorflow" if model path ends with .zip) request_handler: # path to the request handler implementation file, relative to the cortex root + tracker: + key: # json key to track in the response payload + model_type: # model type, must be "classification" or "regression" compute: min_replicas: # minimum number of replicas (default: 1) max_replicas: # maximum number of replicas (default: 100) diff --git a/manager/manifests/istio-metrics.yaml b/manager/manifests/istio-metrics.yaml index def8b358f2..e994d9c76a 100644 --- a/manager/manifests/istio-metrics.yaml +++ b/manager/manifests/istio-metrics.yaml @@ -22,7 +22,7 @@ spec: params: value: response.duration dimensions: - REQUEST_PATH: request.url_path | "unknown" + RequestPath: request.url_path | "unknown" --- apiVersion: config.istio.io/v1alpha2 kind: handler diff --git a/pkg/consts/consts.go b/pkg/consts/consts.go index 7e3dcd8bc0..a31e6d0d64 100644 --- a/pkg/consts/consts.go +++ b/pkg/consts/consts.go @@ -58,4 +58,6 @@ var ( MetadataDir = "metadata" TelemetryURL = "https://telemetry.cortexlabs.dev" + + MaxClassesPerRequest = 75 // cloudwatch.GeMetricData can get up to 100 metrics per request, avoid multiple requests and have room for other stats ) diff --git a/pkg/lib/aws/aws.go b/pkg/lib/aws/aws.go index 34fed976eb..4e1fd6012e 100644 --- a/pkg/lib/aws/aws.go +++ b/pkg/lib/aws/aws.go @@ -19,6 +19,7 @@ package aws import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/sts" @@ -33,6 +34,7 @@ type Client struct { s3Client *s3.S3 stsClient *sts.STS cloudWatchLogsClient *cloudwatchlogs.CloudWatchLogs + CloudWatchMetrics *cloudwatch.CloudWatch awsAccountID string HashedAccountID string } @@ -48,6 +50,7 @@ func New(region, bucket string) *Client { Region: region, s3Client: s3.New(sess), stsClient: sts.New(sess), + CloudWatchMetrics: cloudwatch.New(sess), cloudWatchLogsClient: cloudwatchlogs.New(sess), } response, err := awsClient.stsClient.GetCallerIdentity(nil) diff --git a/pkg/lib/slices/float64_ptr.go b/pkg/lib/slices/float64_ptr.go new file mode 100644 index 0000000000..360d9813b1 --- /dev/null +++ b/pkg/lib/slices/float64_ptr.go @@ -0,0 +1,86 @@ +/* +Copyright 2019 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package slices + +import ( + "github.com/cortexlabs/cortex/pkg/lib/errors" +) + +// For adding integers stored in floats +func Float64PtrSumInt(floats ...*float64) int { + sum := 0 + for _, num := range floats { + if num != nil { + sum += int(*num) + } + } + return sum +} + +func Float64PtrMin(floats ...*float64) *float64 { + var min *float64 + + for _, num := range floats { + switch { + case num != nil && min != nil && *num < *min: + min = num + case num != nil && min == nil: + min = num + } + } + return min +} + +func Float64PtrMax(floats ...*float64) *float64 { + var max *float64 + + for _, num := range floats { + switch { + case num != nil && max != nil && *num > *max: + max = num + case num != nil && max == nil: + max = num + } + } + return max +} + +func Float64PtrAvg(values []*float64, weights []*float64) (*float64, error) { + if len(values) != len(weights) { + return nil, errors.New("length of values is not equal to length of weights") + } + + totalWeight := 0.0 + for i, valPtr := range values { + if valPtr != nil && weights[i] != nil && *weights[i] > 0 { + totalWeight += *weights[i] + } + } + + if totalWeight == 0.0 { + return nil, nil + } + + avg := 0.0 + for i, valPtr := range values { + if valPtr != nil && weights[i] != nil && *weights[i] > 0 { + avg += (*valPtr) * (*weights[i]) / float64(totalWeight) + } + } + + return &avg, nil +} diff --git a/pkg/lib/slices/float64_ptr_test.go b/pkg/lib/slices/float64_ptr_test.go new file mode 100644 index 0000000000..08ad02c7a4 --- /dev/null +++ b/pkg/lib/slices/float64_ptr_test.go @@ -0,0 +1,88 @@ +/* +Copyright 2019 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package slices + +import ( + "testing" + + "github.com/cortexlabs/cortex/pkg/lib/pointer" + "github.com/stretchr/testify/require" +) + +var float64NilPtr = (*float64)(nil) + +func TestFloat64PtrSumInt(t *testing.T) { + require.Equal(t, 0, Float64PtrSumInt(nil)) + require.Equal(t, 1, Float64PtrSumInt(pointer.Float64(1))) + require.Equal(t, 2, Float64PtrSumInt(pointer.Float64(1), pointer.Float64(1.5))) +} + +func TestFloat64PtrMin(t *testing.T) { + require.Equal(t, float64NilPtr, Float64PtrMin()) + require.Equal(t, float64NilPtr, Float64PtrMin(nil)) + require.Equal(t, pointer.Float64(1), Float64PtrMin(pointer.Float64(1))) + require.Equal(t, pointer.Float64(-1), Float64PtrMin(float64NilPtr, pointer.Float64(1), pointer.Float64(-1))) +} + +func TestFloat64PtrMax(t *testing.T) { + require.Equal(t, float64NilPtr, Float64PtrMax()) + require.Equal(t, float64NilPtr, Float64PtrMax(nil)) + require.Equal(t, pointer.Float64(1), Float64PtrMax(pointer.Float64(1))) + require.Equal(t, pointer.Float64(1.5), Float64PtrMax(pointer.Float64(1), pointer.Float64(1.5), float64NilPtr)) +} + +func TestFloat64PtrAvg(t *testing.T) { + var err error + var avg *float64 + + avg, err = Float64PtrAvg([]*float64{pointer.Float64(1)}, []*float64{pointer.Float64(10)}) + require.Equal(t, pointer.Float64(1), avg) + require.NoError(t, err) + + avg, err = Float64PtrAvg([]*float64{pointer.Float64(10)}, []*float64{pointer.Float64(1)}) + require.Equal(t, pointer.Float64(10), avg) + require.NoError(t, err) + + avg, err = Float64PtrAvg([]*float64{pointer.Float64(1), pointer.Float64(4), float64NilPtr}, []*float64{pointer.Float64(2), pointer.Float64(1), pointer.Float64(1)}) + require.Equal(t, pointer.Float64(2), avg) + require.NoError(t, err) + + avg, err = Float64PtrAvg([]*float64{pointer.Float64(1), pointer.Float64(4), pointer.Float64(1)}, []*float64{pointer.Float64(2), pointer.Float64(1), float64NilPtr}) + require.Equal(t, pointer.Float64(2), avg) + require.NoError(t, err) + + avg, err = Float64PtrAvg([]*float64{pointer.Float64(1), pointer.Float64(4), float64NilPtr}, []*float64{pointer.Float64(2), pointer.Float64(1), float64NilPtr}) + require.Equal(t, pointer.Float64(2), avg) + require.NoError(t, err) + + avg, err = Float64PtrAvg([]*float64{pointer.Float64(1)}, []*float64{pointer.Float64(2), pointer.Float64(1)}) + require.Equal(t, float64NilPtr, avg) + require.Error(t, err) + + avg, err = Float64PtrAvg([]*float64{pointer.Float64(2)}, []*float64{pointer.Float64(0)}) + require.Equal(t, float64NilPtr, avg) + require.NoError(t, err) + + avg, err = Float64PtrAvg([]*float64{pointer.Float64(0)}, []*float64{pointer.Float64(2)}) + require.Equal(t, pointer.Float64(0), avg) + require.NoError(t, err) + + avg, err = Float64PtrAvg([]*float64{nil}, []*float64{pointer.Float64(2)}) + require.Equal(t, float64NilPtr, avg) + require.NoError(t, err) + +} diff --git a/pkg/lib/strings/stringify.go b/pkg/lib/strings/stringify.go index f11762af75..bc5f3170bc 100644 --- a/pkg/lib/strings/stringify.go +++ b/pkg/lib/strings/stringify.go @@ -373,3 +373,12 @@ func Indent(str string, indent string) string { } return out[:len(out)-1] } + +func TruncateEllipses(str string, maxLength int) string { + ellipses := "..." + if len(str) > maxLength { + str = str[:maxLength-len(ellipses)] + str += ellipses + } + return str +} diff --git a/pkg/lib/table/table.go b/pkg/lib/table/table.go index 82a307876d..97ea2cbb0c 100644 --- a/pkg/lib/table/table.go +++ b/pkg/lib/table/table.go @@ -35,7 +35,7 @@ type Table struct { type Header struct { Title string - MaxWidth int // Max width of the text (not including spacing). Items that are longer will be truncated to less than MaxWidth to fit the elipses. If 0 is provided, it defaults to no max. + MaxWidth int // Max width of the text (not including spacing). Items that are longer will be truncated to less than MaxWidth to fit the ellipses. If 0 is provided, it defaults to no max. MinWidth int // Min width of the text (not including spacing) Hidden bool } diff --git a/pkg/operator/api/schema/metrics.go b/pkg/operator/api/schema/metrics.go new file mode 100644 index 0000000000..cb212d0873 --- /dev/null +++ b/pkg/operator/api/schema/metrics.go @@ -0,0 +1,114 @@ +/* +Copyright 2019 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schema + +import ( + "github.com/cortexlabs/cortex/pkg/lib/pointer" + "github.com/cortexlabs/cortex/pkg/lib/slices" +) + +type RegressionStats struct { + Min *float64 `json:"min"` + Max *float64 `json:"max"` + Avg *float64 `json:"avg"` + SampleCount int `json:"sample_count"` +} + +func (left RegressionStats) Merge(right RegressionStats) RegressionStats { + totalSampleCount := left.SampleCount + right.SampleCount + + return RegressionStats{ + Min: slices.Float64PtrMin(left.Min, right.Min), + Max: slices.Float64PtrMax(left.Max, right.Max), + Avg: mergeAvg(left.Avg, left.SampleCount, right.Avg, right.SampleCount), + SampleCount: totalSampleCount, + } +} + +type NetworkStats struct { + Latency *float64 `json:"latency"` + Code2XX int `json:"code_2xx"` + Code4XX int `json:"code_4xx"` + Code5XX int `json:"code_5xx"` + Total int `json:"total"` +} + +func (left NetworkStats) Merge(right NetworkStats) NetworkStats { + return NetworkStats{ + Latency: mergeAvg(left.Latency, left.Total, right.Latency, right.Total), + Code2XX: left.Code2XX + right.Code2XX, + Code4XX: left.Code4XX + right.Code4XX, + Code5XX: left.Code5XX + right.Code5XX, + Total: left.Total + right.Total, + } +} + +type APIMetrics struct { + NetworkStats *NetworkStats `json:"network_stats"` + ClassDistribution map[string]int `json:"class_distribution"` + RegressionStats *RegressionStats `json:"regression_stats"` +} + +func (left APIMetrics) Merge(right APIMetrics) APIMetrics { + mergedClassDistribution := left.ClassDistribution + + if right.ClassDistribution != nil { + if left.ClassDistribution == nil { + mergedClassDistribution = right.ClassDistribution + } else { + for className, count := range right.ClassDistribution { + mergedClassDistribution[className] += count + } + } + } + + var mergedNetworkStats *NetworkStats + switch { + case left.NetworkStats != nil && right.NetworkStats != nil: + merged := (*left.NetworkStats).Merge(*right.NetworkStats) + mergedNetworkStats = &merged + case left.NetworkStats != nil: + mergedNetworkStats = left.NetworkStats + case right.NetworkStats != nil: + mergedNetworkStats = right.NetworkStats + } + + var mergedRegressionStats *RegressionStats + switch { + case left.RegressionStats != nil && right.RegressionStats != nil: + merged := (*left.RegressionStats).Merge(*right.RegressionStats) + mergedRegressionStats = &merged + case left.RegressionStats != nil: + mergedRegressionStats = left.RegressionStats + case right.RegressionStats != nil: + mergedRegressionStats = right.RegressionStats + } + + return APIMetrics{ + NetworkStats: mergedNetworkStats, + RegressionStats: mergedRegressionStats, + ClassDistribution: mergedClassDistribution, + } +} + +func mergeAvg(left *float64, leftCount int, right *float64, rightCount int) *float64 { + leftCountFloat64Ptr := pointer.Float64(float64(leftCount)) + rightCountFloat64Ptr := pointer.Float64(float64(rightCount)) + + avg, _ := slices.Float64PtrAvg([]*float64{left, right}, []*float64{leftCountFloat64Ptr, rightCountFloat64Ptr}) + return avg +} diff --git a/pkg/operator/api/schema/metrics_test.go b/pkg/operator/api/schema/metrics_test.go new file mode 100644 index 0000000000..ebb64c0c03 --- /dev/null +++ b/pkg/operator/api/schema/metrics_test.go @@ -0,0 +1,162 @@ +/* +Copyright 2019 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schema + +import ( + "testing" + + "github.com/cortexlabs/cortex/pkg/lib/pointer" + "github.com/stretchr/testify/require" +) + +func TestMergeAvg(t *testing.T) { + floatNilPtr := (*float64)(nil) + + require.Equal(t, floatNilPtr, mergeAvg(nil, 0, nil, 0)) + require.Equal(t, floatNilPtr, mergeAvg(nil, 1, nil, 0)) + + require.Equal(t, float64(1), *mergeAvg(pointer.Float64(1), 1, nil, 1)) + require.Equal(t, pointer.Float64(1), mergeAvg(nil, 1, pointer.Float64(1), 1)) + + require.Equal(t, floatNilPtr, mergeAvg(pointer.Float64(1), 0, nil, 1)) + require.Equal(t, floatNilPtr, mergeAvg(nil, 1, pointer.Float64(1), 0)) + + require.Equal(t, float64(1.25), *mergeAvg(pointer.Float64(1.25), 5, nil, 0)) + require.Equal(t, float64(1.25), *mergeAvg(nil, 0, pointer.Float64(1.25), 5)) + + require.Equal(t, float64(1.25), *mergeAvg(pointer.Float64(1), 3, pointer.Float64(2), 1)) + require.Equal(t, float64(1.25), *mergeAvg(pointer.Float64(2), 1, pointer.Float64(1), 3)) +} + +func TestRegressionStatsMerge(t *testing.T) { + require.Equal(t, RegressionStats{}, RegressionStats{}.Merge(RegressionStats{})) + require.Equal(t, RegressionStats{Min: pointer.Float64(1)}, RegressionStats{Min: pointer.Float64(1)}.Merge(RegressionStats{})) + require.Equal(t, RegressionStats{Min: pointer.Float64(1)}, RegressionStats{}.Merge(RegressionStats{Min: pointer.Float64(1)})) + require.Equal(t, RegressionStats{Min: pointer.Float64(1)}, RegressionStats{Min: pointer.Float64(2)}.Merge(RegressionStats{Min: pointer.Float64(1)})) + require.Equal(t, RegressionStats{Min: pointer.Float64(1)}, RegressionStats{Min: pointer.Float64(1)}.Merge(RegressionStats{Min: pointer.Float64(2)})) + + require.Equal(t, RegressionStats{Max: pointer.Float64(1)}, RegressionStats{Max: pointer.Float64(1)}.Merge(RegressionStats{})) + require.Equal(t, RegressionStats{Max: pointer.Float64(1)}, RegressionStats{}.Merge(RegressionStats{Max: pointer.Float64(1)})) + require.Equal(t, RegressionStats{Max: pointer.Float64(2)}, RegressionStats{Max: pointer.Float64(2)}.Merge(RegressionStats{Max: pointer.Float64(1)})) + require.Equal(t, RegressionStats{Max: pointer.Float64(2)}, RegressionStats{Max: pointer.Float64(1)}.Merge(RegressionStats{Max: pointer.Float64(2)})) + + left := RegressionStats{ + Max: pointer.Float64(5), + Min: pointer.Float64(2), + Avg: pointer.Float64(3.5), + SampleCount: 4, + } + + right := RegressionStats{ + Max: pointer.Float64(6), + Min: pointer.Float64(1), + Avg: pointer.Float64(3.5), + SampleCount: 6, + } + + merged := RegressionStats{ + Max: pointer.Float64(6), + Min: pointer.Float64(1), + Avg: pointer.Float64(3.5), + SampleCount: 10, + } + + require.Equal(t, merged, left.Merge(right)) + require.Equal(t, merged, right.Merge(left)) +} + +func TestNetworkStatsMerge(t *testing.T) { + require.Equal(t, NetworkStats{}, NetworkStats{}.Merge(NetworkStats{})) + + right := NetworkStats{ + Code2XX: 3, + Code4XX: 4, + Code5XX: 5, + Latency: pointer.Float64(30), + Total: 12, + } + + left := NetworkStats{ + Code2XX: 1, + Code4XX: 3, + Code5XX: 4, + Latency: pointer.Float64(5), + Total: 8, + } + + merged := NetworkStats{ + Code2XX: 4, + Code4XX: 7, + Code5XX: 9, + Latency: pointer.Float64(20), + Total: 20, + } + + require.Equal(t, merged, left.Merge(right)) + require.Equal(t, merged, right.Merge(left)) +} + +func TestAPIMetricsMerge(t *testing.T) { + require.Equal(t, APIMetrics{}, APIMetrics{}.Merge(APIMetrics{})) + + classDistribution := map[string]int{ + "class_a": 1, + "class_b": 2, + "class_c": 4, + } + + networkStats := NetworkStats{ + Code2XX: 3, + Code4XX: 4, + Code5XX: 5, + Latency: pointer.Float64(30), + Total: 12, + } + + regressionStats := RegressionStats{ + Max: pointer.Float64(6), + Min: pointer.Float64(1), + Avg: pointer.Float64(3.5), + SampleCount: 6, + } + + mergedNetworkStats := networkStats.Merge(networkStats) + mergedRegressionStats := regressionStats.Merge(regressionStats) + + mergedClassDistribution := map[string]int{ + "class_a": 2, + "class_b": 4, + "class_c": 8, + } + + require.Equal(t, APIMetrics{ClassDistribution: classDistribution}, APIMetrics{ClassDistribution: classDistribution}.Merge(APIMetrics{})) + require.Equal(t, APIMetrics{ClassDistribution: classDistribution}, APIMetrics{}.Merge(APIMetrics{ClassDistribution: classDistribution})) + + mergedAPIMetrics := APIMetrics{ + ClassDistribution: mergedClassDistribution, + NetworkStats: &mergedNetworkStats, + RegressionStats: &mergedRegressionStats, + } + + apiMetrics := APIMetrics{ + ClassDistribution: classDistribution, + NetworkStats: &networkStats, + RegressionStats: ®ressionStats, + } + + require.Equal(t, mergedAPIMetrics, apiMetrics.Merge(apiMetrics)) +} diff --git a/pkg/operator/api/userconfig/apis.go b/pkg/operator/api/userconfig/apis.go index af0ca915c1..10b78900da 100644 --- a/pkg/operator/api/userconfig/apis.go +++ b/pkg/operator/api/userconfig/apis.go @@ -35,11 +35,17 @@ type API struct { ResourceFields Model string `json:"model" yaml:"model"` ModelFormat ModelFormat `json:"model_format" yaml:"model_format"` + Tracker *Tracker `json:"tracker" yaml:"tracker"` RequestHandler *string `json:"request_handler" yaml:"request_handler"` Compute *APICompute `json:"compute" yaml:"compute"` Tags Tags `json:"tags" yaml:"tags"` } +type Tracker struct { + Key string `json:"key" yaml:"key"` + ModelType ModelType `json:"model_type" yaml:"model_type"` +} + var apiValidation = &cr.StructValidation{ StructFieldValidations: []*cr.StructFieldValidation{ { @@ -56,6 +62,31 @@ var apiValidation = &cr.StructValidation{ AllowCortexResources: true, }, }, + { + StructField: "Tracker", + StructValidation: &cr.StructValidation{ + DefaultNil: true, + StructFieldValidations: []*cr.StructFieldValidation{ + { + StructField: "Key", + StringValidation: &cr.StringValidation{ + Required: true, + }, + }, + { + StructField: "ModelType", + StringValidation: &cr.StringValidation{ + Required: false, + AllowEmpty: true, + AllowedValues: ModelTypeStrings(), + }, + Parser: func(str string) (interface{}, error) { + return ModelTypeFromString(str), nil + }, + }, + }, + }, + }, { StructField: "RequestHandler", StringPtrValidation: &cr.StringPtrValidation{}, diff --git a/pkg/operator/api/userconfig/model_type.go b/pkg/operator/api/userconfig/model_type.go new file mode 100644 index 0000000000..800daf008e --- /dev/null +++ b/pkg/operator/api/userconfig/model_type.go @@ -0,0 +1,78 @@ +/* +Copyright 2019 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package userconfig + +type ModelType int + +const ( + UnknownModelType ModelType = iota + ClassificationModelType + RegressionModelType +) + +var modelTypes = []string{ + "unknown", + "classification", + "regression", +} + +func ModelTypeFromString(s string) ModelType { + for i := 0; i < len(modelTypes); i++ { + if s == modelTypes[i] { + return ModelType(i) + } + } + return UnknownModelType +} + +func ModelTypeStrings() []string { + return modelTypes[1:] +} + +func (t ModelType) String() string { + return modelTypes[t] +} + +// MarshalText satisfies TextMarshaler +func (t ModelType) MarshalText() ([]byte, error) { + return []byte(t.String()), nil +} + +// UnmarshalText satisfies TextUnmarshaler +func (t *ModelType) UnmarshalText(text []byte) error { + enum := string(text) + for i := 0; i < len(modelTypes); i++ { + if enum == modelTypes[i] { + *t = ModelType(i) + return nil + } + } + + *t = UnknownModelType + return nil +} + +// UnmarshalBinary satisfies BinaryUnmarshaler +// Needed for msgpack +func (t *ModelType) UnmarshalBinary(data []byte) error { + return t.UnmarshalText(data) +} + +// MarshalBinary satisfies BinaryMarshaler +func (t ModelType) MarshalBinary() ([]byte, error) { + return []byte(t.String()), nil +} diff --git a/pkg/operator/context/apis.go b/pkg/operator/context/apis.go index 76ea5c7671..35a3bb7c8a 100644 --- a/pkg/operator/context/apis.go +++ b/pkg/operator/context/apis.go @@ -25,6 +25,7 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/hash" "github.com/cortexlabs/cortex/pkg/lib/pointer" "github.com/cortexlabs/cortex/pkg/lib/sets/strset" + s "github.com/cortexlabs/cortex/pkg/lib/strings" "github.com/cortexlabs/cortex/pkg/operator/api/context" "github.com/cortexlabs/cortex/pkg/operator/api/resource" "github.com/cortexlabs/cortex/pkg/operator/api/userconfig" @@ -47,6 +48,7 @@ func getAPIs(config *userconfig.Config, var buf bytes.Buffer var requestHandlerImplKey *string buf.WriteString(apiConfig.Name) + buf.WriteString(s.Obj(apiConfig.Tracker)) buf.WriteString(apiConfig.ModelFormat.String()) if apiConfig.RequestHandler != nil { diff --git a/pkg/operator/endpoints/metrics.go b/pkg/operator/endpoints/metrics.go new file mode 100644 index 0000000000..b2e060aa24 --- /dev/null +++ b/pkg/operator/endpoints/metrics.go @@ -0,0 +1,45 @@ +/* +Copyright 2019 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endpoints + +import ( + "net/http" + + "github.com/cortexlabs/cortex/pkg/operator/workloads" +) + +func GetMetrics(w http.ResponseWriter, r *http.Request) { + appName, err := getRequiredQueryParam("appName", r) + if err != nil { + RespondError(w, err) + return + } + + apiName, err := getRequiredQueryParam("apiName", r) + if err != nil { + RespondError(w, err) + return + } + + apiMetrics, err := workloads.GetMetrics(appName, apiName) + if err != nil { + RespondError(w, err) + return + } + + Respond(w, apiMetrics) +} diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index ae99110baa..3724930db3 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -59,6 +59,7 @@ func main() { router.HandleFunc("/deploy", endpoints.Deploy).Methods("POST") router.HandleFunc("/delete", endpoints.Delete).Methods("POST") router.HandleFunc("/deployments", endpoints.GetDeployments).Methods("GET") + router.HandleFunc("/metrics", endpoints.GetMetrics).Methods("GET") router.HandleFunc("/resources", endpoints.GetResources).Methods("GET") router.HandleFunc("/aggregate/{id}", endpoints.GetAggregate).Methods("GET") router.HandleFunc("/logs/read", endpoints.ReadLogs) diff --git a/pkg/operator/workloads/api_saved_status.go b/pkg/operator/workloads/api_saved_status.go index 73ae3f0404..94a88444d9 100644 --- a/pkg/operator/workloads/api_saved_status.go +++ b/pkg/operator/workloads/api_saved_status.go @@ -68,7 +68,6 @@ func getAPISavedStatus(resourceID string, workloadID string, appName string) (*r var savedStatus resource.APISavedStatus err := config.AWS.ReadJSONFromS3(&savedStatus, key) if aws.IsNoSuchKeyErr(err) { - cacheNilAPISavedStatus(resourceID, workloadID, appName) return nil, nil } if err != nil { diff --git a/pkg/operator/workloads/api_saved_status_cache.go b/pkg/operator/workloads/api_saved_status_cache.go index 9dc042bfe8..79cb7e7d66 100644 --- a/pkg/operator/workloads/api_saved_status_cache.go +++ b/pkg/operator/workloads/api_saved_status_cache.go @@ -57,12 +57,6 @@ func cacheAPISavedStatus(savedStatus *resource.APISavedStatus) { savedStatus.AppName, savedStatus.Copy(), apiStatusCache.m) } -func cacheNilAPISavedStatus(resourceID string, workloadID string, appName string) { - apiStatusCache.Lock() - defer apiStatusCache.Unlock() - setAPISavedStatusMap(resourceID, workloadID, appName, nil, apiStatusCache.m) -} - func apiSavedStatusesToMap( savedStatuses []*resource.APISavedStatus, ) map[string]map[string]map[string]*resource.APISavedStatus { @@ -83,7 +77,6 @@ func getStaleAPISavedStatuses( apiStatusCache.RLock() defer apiStatusCache.RUnlock() - var staleSavedStatuses []*resource.APISavedStatus for appName := range apiStatusCache.m { for resourceID := range apiStatusCache.m[appName] { diff --git a/pkg/operator/workloads/errors.go b/pkg/operator/workloads/errors.go index d051a01ecb..9f489376e3 100644 --- a/pkg/operator/workloads/errors.go +++ b/pkg/operator/workloads/errors.go @@ -24,6 +24,7 @@ const ( ErrCortexInstallationBroken ErrLoadBalancerInitializing ErrNotFound + ErrAPIInitializing ) var errorKinds = []string{ @@ -32,9 +33,10 @@ var errorKinds = []string{ "err_cortex_installation_broken", "err_load_balancer_initializing", "err_not_found", + "err_api_initializing", } -var _ = [1]int{}[int(ErrNotFound)-(len(errorKinds)-1)] // Ensure list length matches +var _ = [1]int{}[int(ErrAPIInitializing)-(len(errorKinds)-1)] // Ensure list length matches func (t ErrorKind) String() string { return errorKinds[t] @@ -106,3 +108,10 @@ func ErrorNotFound() error { message: "not found", } } + +func ErrorAPIInitializing() error { + return Error{ + Kind: ErrAPIInitializing, + message: "api is still initializing", + } +} diff --git a/pkg/operator/workloads/metrics.go b/pkg/operator/workloads/metrics.go new file mode 100644 index 0000000000..2ae14174fe --- /dev/null +++ b/pkg/operator/workloads/metrics.go @@ -0,0 +1,460 @@ +/* +Copyright 2019 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workloads + +import ( + "fmt" + "strings" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/cloudwatch" + + "github.com/cortexlabs/cortex/pkg/consts" + "github.com/cortexlabs/cortex/pkg/lib/errors" + "github.com/cortexlabs/cortex/pkg/lib/parallel" + "github.com/cortexlabs/cortex/pkg/lib/slices" + "github.com/cortexlabs/cortex/pkg/operator/api/context" + "github.com/cortexlabs/cortex/pkg/operator/api/schema" + "github.com/cortexlabs/cortex/pkg/operator/api/userconfig" + "github.com/cortexlabs/cortex/pkg/operator/config" +) + +func GetMetrics(appName, apiName string) (*schema.APIMetrics, error) { + ctx := CurrentContext(appName) + api := ctx.APIs[apiName] + + apiSavedStatus, err := getAPISavedStatus(api.ID, api.WorkloadID, appName) + if err != nil { + return nil, err + } + + if apiSavedStatus == nil { + return nil, errors.Wrap(ErrorAPIInitializing(), api.Name) + } + + if apiSavedStatus.Start == nil { + return nil, errors.Wrap(ErrorAPIInitializing(), api.Name) + } + + apiStartTime := apiSavedStatus.Start.Truncate(time.Second) + + // Get realtime metrics for the seconds elapsed in the latest minute + realTimeEnd := time.Now().Truncate(time.Second) + realTimeStart := realTimeEnd.Truncate(time.Minute) + + realTimeMetrics := schema.APIMetrics{} + batchMetrics := schema.APIMetrics{} + + requestList := []func() error{} + if realTimeStart.Before(realTimeEnd) { + requestList = append(requestList, getAPIMetricsFunc(appName, api, 1, &realTimeStart, &realTimeEnd, &realTimeMetrics)) + } + + if apiStartTime.Before(realTimeStart) { + batchEnd := realTimeStart + twoWeeksAgo := batchEnd.Add(-14 * 24 * time.Hour) + + var batchStart time.Time + if twoWeeksAgo.Before(*apiSavedStatus.Start) { + batchStart = *apiSavedStatus.Start + } else { + batchStart = twoWeeksAgo + } + requestList = append(requestList, getAPIMetricsFunc(appName, api, 60*60, &batchStart, &batchEnd, &batchMetrics)) + } + + if len(requestList) != 0 { + err = parallel.RunFirstErr(requestList...) + if err != nil { + return nil, err + } + } + + mergedMetrics := realTimeMetrics.Merge(batchMetrics) + return &mergedMetrics, nil +} + +func getAPIMetricsFunc(appName string, api *context.API, period int64, startTime *time.Time, endTime *time.Time, apiMetrics *schema.APIMetrics) func() error { + return func() error { + metricDataResults, err := queryMetrics(appName, api, period, startTime, endTime) + if err != nil { + return err + } + networkStats, err := extractNetworkMetrics(metricDataResults) + if err != nil { + return err + } + apiMetrics.NetworkStats = networkStats + + if api.Tracker != nil { + if api.Tracker.ModelType == userconfig.ClassificationModelType { + apiMetrics.ClassDistribution = extractClassificationMetrics(metricDataResults) + } else { + regressionStats, err := extractRegressionMetrics(metricDataResults) + if err != nil { + return err + } + apiMetrics.RegressionStats = regressionStats + } + } + return nil + } +} + +func queryMetrics(appName string, api *context.API, period int64, startTime *time.Time, endTime *time.Time) ([]*cloudwatch.MetricDataResult, error) { + networkDataQueries := getNetworkStatsDef(appName, api, period) + latencyMetrics := getLatencyMetricsDef(api.Path, period) + allMetrics := append(latencyMetrics, networkDataQueries...) + + if api.Tracker != nil { + if api.Tracker.ModelType == userconfig.ClassificationModelType { + classMetrics, err := getClassesMetricDef(appName, api, period) + if err != nil { + return nil, err + } + allMetrics = append(allMetrics, classMetrics...) + } else { + regressionMetrics := getRegressionMetricDef(appName, api, period) + allMetrics = append(allMetrics, regressionMetrics...) + } + } + + metricsDataQuery := cloudwatch.GetMetricDataInput{ + EndTime: endTime, + StartTime: startTime, + MetricDataQueries: allMetrics, + } + output, err := config.AWS.CloudWatchMetrics.GetMetricData(&metricsDataQuery) + if err != nil { + return nil, err + } + return output.MetricDataResults, nil +} + +func extractNetworkMetrics(metricsDataResults []*cloudwatch.MetricDataResult) (*schema.NetworkStats, error) { + var networkStats schema.NetworkStats + var requestCounts []*float64 + var latencyAvgs []*float64 + + for _, metricData := range metricsDataResults { + if metricData.Values == nil { + continue + } + + switch { + case *metricData.Label == "2XX": + networkStats.Code2XX = slices.Float64PtrSumInt(metricData.Values...) + case *metricData.Label == "4XX": + networkStats.Code4XX = slices.Float64PtrSumInt(metricData.Values...) + case *metricData.Label == "5XX": + networkStats.Code5XX = slices.Float64PtrSumInt(metricData.Values...) + case *metricData.Label == "Latency": + latencyAvgs = metricData.Values + case *metricData.Label == "RequestCount": + requestCounts = metricData.Values + } + } + + avg, err := slices.Float64PtrAvg(latencyAvgs, requestCounts) + if err != nil { + return nil, err + } + networkStats.Latency = avg + + networkStats.Total = networkStats.Code2XX + networkStats.Code4XX + networkStats.Code5XX + return &networkStats, nil +} + +func extractClassificationMetrics(metricsDataResults []*cloudwatch.MetricDataResult) map[string]int { + classDistribution := map[string]int{} + for _, metricData := range metricsDataResults { + if metricData.Values == nil { + continue + } + + if strings.HasPrefix(*metricData.Label, "class_") { + className := (*metricData.Label)[len("class_"):] + classDistribution[className] = slices.Float64PtrSumInt(metricData.Values...) + } + } + return classDistribution +} + +func extractRegressionMetrics(metricsDataResults []*cloudwatch.MetricDataResult) (*schema.RegressionStats, error) { + var regressionStats schema.RegressionStats + var predictionAvgs []*float64 + var requestCounts []*float64 + + for _, metricData := range metricsDataResults { + if metricData.Values == nil { + continue + } + + switch { + case *metricData.Label == "Min": + regressionStats.Min = slices.Float64PtrMin(metricData.Values...) + case *metricData.Label == "Max": + regressionStats.Max = slices.Float64PtrMax(metricData.Values...) + case *metricData.Label == "SampleCount": + regressionStats.SampleCount = slices.Float64PtrSumInt(metricData.Values...) + requestCounts = metricData.Values + case *metricData.Label == "Avg": + predictionAvgs = metricData.Values + } + } + + avg, err := slices.Float64PtrAvg(predictionAvgs, requestCounts) + if err != nil { + return nil, err + } + regressionStats.Avg = avg + + return ®ressionStats, nil +} + +func getAPIDimensions(appName string, api *context.API) []*cloudwatch.Dimension { + return []*cloudwatch.Dimension{ + { + Name: aws.String("AppName"), + Value: aws.String(appName), + }, + { + Name: aws.String("APIName"), + Value: aws.String(api.Name), + }, + { + Name: aws.String("APIID"), + Value: aws.String(api.ID), + }, + } +} + +func getLatencyMetricsDef(routeName string, period int64) []*cloudwatch.MetricDataQuery { + networkDataQueries := []*cloudwatch.MetricDataQuery{ + { + Id: aws.String("latency"), + Label: aws.String("Latency"), + MetricStat: &cloudwatch.MetricStat{ + Metric: &cloudwatch.Metric{ + Namespace: aws.String(config.Cortex.LogGroup), + MetricName: aws.String("response-time.instance.cortex"), + Dimensions: []*cloudwatch.Dimension{ + { + Name: aws.String("RequestPath"), + Value: aws.String(routeName), + }, + }, + }, + Stat: aws.String("Average"), + Period: aws.Int64(period), + }, + }, + { + Id: aws.String("request_count"), + Label: aws.String("RequestCount"), + MetricStat: &cloudwatch.MetricStat{ + Metric: &cloudwatch.Metric{ + Namespace: aws.String(config.Cortex.LogGroup), + MetricName: aws.String("response-time.instance.cortex"), + Dimensions: []*cloudwatch.Dimension{ + { + Name: aws.String("RequestPath"), + Value: aws.String(routeName), + }, + }, + }, + Stat: aws.String("SampleCount"), + Period: aws.Int64(period), + }, + }, + } + + return networkDataQueries +} + +func getRegressionMetricDef(appName string, api *context.API, period int64) []*cloudwatch.MetricDataQuery { + metric := &cloudwatch.Metric{ + Namespace: aws.String(config.Cortex.LogGroup), + MetricName: aws.String("Prediction"), + Dimensions: getAPIDimensions(appName, api), + } + + regressionMetric := []*cloudwatch.MetricDataQuery{ + { + Id: aws.String("min"), + Label: aws.String("Min"), + MetricStat: &cloudwatch.MetricStat{ + Metric: metric, + Stat: aws.String("Minimum"), + Period: aws.Int64(period), + }, + }, + { + Id: aws.String("max"), + Label: aws.String("Max"), + MetricStat: &cloudwatch.MetricStat{ + Metric: metric, + Stat: aws.String("Maximum"), + Period: aws.Int64(period), + }, + }, + { + Id: aws.String("sample_count"), + Label: aws.String("SampleCount"), + MetricStat: &cloudwatch.MetricStat{ + Metric: metric, + Stat: aws.String("SampleCount"), + Period: aws.Int64(period), + }, + }, + { + Id: aws.String("avg"), + Label: aws.String("Avg"), + MetricStat: &cloudwatch.MetricStat{ + Metric: metric, + Stat: aws.String("Average"), + Period: aws.Int64(period), + }, + }, + } + + return regressionMetric +} + +func getNetworkStatsDef(appName string, api *context.API, period int64) []*cloudwatch.MetricDataQuery { + dimensions := getAPIDimensions(appName, api) + + status200 := append(dimensions, &cloudwatch.Dimension{ + Name: aws.String("Code"), + Value: aws.String("2XX"), + }) + status400 := append(dimensions, &cloudwatch.Dimension{ + Name: aws.String("Code"), + Value: aws.String("4XX"), + }) + status500 := append(dimensions, &cloudwatch.Dimension{ + Name: aws.String("Code"), + Value: aws.String("5XX"), + }) + + networkDataQueries := []*cloudwatch.MetricDataQuery{ + { + Id: aws.String("datapoints_2XX"), + Label: aws.String("2XX"), + MetricStat: &cloudwatch.MetricStat{ + Metric: &cloudwatch.Metric{ + Namespace: aws.String(config.Cortex.LogGroup), + MetricName: aws.String("StatusCode"), + Dimensions: status200, + }, + Stat: aws.String("Sum"), + Period: aws.Int64(period), + }, + }, + { + Id: aws.String("datapoints_4XX"), + Label: aws.String("4XX"), + MetricStat: &cloudwatch.MetricStat{ + Metric: &cloudwatch.Metric{ + Namespace: aws.String(config.Cortex.LogGroup), + MetricName: aws.String("StatusCode"), + Dimensions: status400, + }, + Stat: aws.String("Sum"), + Period: aws.Int64(period), + }, + }, + { + Id: aws.String("datapoints_5XX"), + Label: aws.String("5XX"), + MetricStat: &cloudwatch.MetricStat{ + Metric: &cloudwatch.Metric{ + Namespace: aws.String(config.Cortex.LogGroup), + MetricName: aws.String("StatusCode"), + Dimensions: status500, + }, + Stat: aws.String("Sum"), + Period: aws.Int64(period), + }, + }, + } + + return networkDataQueries +} + +func getClassesMetricDef(appName string, api *context.API, period int64) ([]*cloudwatch.MetricDataQuery, error) { + listMetricsInput := &cloudwatch.ListMetricsInput{ + Namespace: aws.String(config.Cortex.LogGroup), + MetricName: aws.String("Prediction"), + Dimensions: []*cloudwatch.DimensionFilter{ + { + Name: aws.String("AppName"), + Value: aws.String(appName), + }, + { + Name: aws.String("APIName"), + Value: aws.String(api.Name), + }, + { + Name: aws.String("APIID"), + Value: aws.String(api.ID), + }, + }, + } + + listMetricsOutput, err := config.AWS.CloudWatchMetrics.ListMetrics(listMetricsInput) + if err != nil { + return nil, err + } + + if listMetricsOutput.Metrics == nil { + return nil, nil + } + + classMetricQueries := []*cloudwatch.MetricDataQuery{} + + classCount := 0 + for i, metric := range listMetricsOutput.Metrics { + if classCount >= consts.MaxClassesPerRequest { + break + } + + var className string + for _, dim := range metric.Dimensions { + if *dim.Name == "Class" { + className = *dim.Value + } + } + + if len(className) == 0 { + continue + } + + classMetricQueries = append(classMetricQueries, &cloudwatch.MetricDataQuery{ + Id: aws.String(fmt.Sprintf("id_%d", i)), + MetricStat: &cloudwatch.MetricStat{ + Metric: metric, + Stat: aws.String("Sum"), + Period: aws.Int64(period), + }, + Label: aws.String("class_" + className), + }) + classCount++ + } + return classMetricQueries, nil +} diff --git a/pkg/workloads/cortex/lib/api_utils.py b/pkg/workloads/cortex/lib/api_utils.py new file mode 100644 index 0000000000..c25f61ff2f --- /dev/null +++ b/pkg/workloads/cortex/lib/api_utils.py @@ -0,0 +1,113 @@ +# Copyright 2019 Cortex Labs, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from cortex.lib.exceptions import UserException, CortexException +from cortex.lib.log import get_logger + +logger = get_logger() + + +def api_metric_dimensions(ctx, api_name): + api = ctx.apis[api_name] + return [ + {"Name": "AppName", "Value": ctx.app["name"]}, + {"Name": "APIName", "Value": api["name"]}, + {"Name": "APIID", "Value": api["id"]}, + ] + + +def status_code_metric(dimensions, status_code): + status_code_series = int(status_code / 100) + status_code_dimensions = dimensions + [ + {"Name": "Code", "Value": "{}XX".format(status_code_series)} + ] + return [{"MetricName": "StatusCode", "Dimensions": status_code_dimensions, "Value": 1}] + + +def predictions_per_request_metric(dimensions, prediction_count): + return [ + {"MetricName": "PredictionsPerRequest", "Dimensions": dimensions, "Value": prediction_count} + ] + + +def prediction_metrics(dimensions, api, predictions): + metric_list = [] + tracker = api.get("tracker") + for prediction in predictions: + predicted_value = prediction.get(tracker["key"]) + if predicted_value is None: + logger.warn( + "failed to track key '{}': not found in response payload".format(tracker["key"]) + ) + return [] + + if tracker["model_type"] == "classification": + if type(predicted_value) == str or type(predicted_value) == int: + dimensions_with_class = dimensions + [ + {"Name": "Class", "Value": str(predicted_value)} + ] + metric = { + "MetricName": "Prediction", + "Dimensions": dimensions_with_class, + "Unit": "Count", + "Value": 1, + } + + metric_list.append(metric) + else: + logger.warn( + "failed to track key '{}': expected type 'str' or 'int' but encountered '{}'".format( + tracker["key"], type(predicted_value) + ) + ) + return [] + else: + if type(predicted_value) == float or type(predicted_value) == int: # allow ints + metric = { + "MetricName": "Prediction", + "Dimensions": dimensions, + "Value": float(predicted_value), + } + metric_list.append(metric) + else: + logger.warn( + "failed to track key '{}': expected type 'float' or 'int' but encountered '{}'".format( + tracker["key"], type(predicted_value) + ) + ) + return [] + return metric_list + + +def post_request_metrics(ctx, api, response, predictions): + try: + api_name = api["name"] + + api_dimensions = api_metric_dimensions(ctx, api_name) + metrics_list = [] + metrics_list += status_code_metric(api_dimensions, response.status_code) + + if predictions is not None: + metrics_list += predictions_per_request_metric(api_dimensions, len(predictions)) + + if api.get("tracker") is not None: + metrics_list += prediction_metrics(api_dimensions, api, predictions) + ctx.publish_metrics(metrics_list) + + except CortexException as e: + e.wrap("error") + logger.warn(str(e), exc_info=True) + except Exception as e: + logger.warn(str(e), exc_info=True) diff --git a/pkg/workloads/cortex/lib/context.py b/pkg/workloads/cortex/lib/context.py index 275a640457..7c253bd22a 100644 --- a/pkg/workloads/cortex/lib/context.py +++ b/pkg/workloads/cortex/lib/context.py @@ -19,6 +19,8 @@ import importlib from datetime import datetime from copy import deepcopy +import boto3 + from botocore.exceptions import ClientError from cortex import consts @@ -82,6 +84,7 @@ def __init__(self, **kwargs): self.apis = self.ctx["apis"] or {} self.training_datasets = {k: v["dataset"] for k, v in self.models.items()} self.api_version = self.cortex_config["api_version"] + self.monitoring = None if "local_storage_path" in kwargs: self.storage = LocalStorage(base_dir=kwargs["local_storage_path"]) @@ -91,6 +94,7 @@ def __init__(self, **kwargs): region=self.cortex_config["region"], client_config={}, ) + self.monitoring = boto3.client("cloudwatch", region_name=self.cortex_config["region"]) if self.api_version != consts.CORTEX_VERSION: raise ValueError( @@ -621,6 +625,18 @@ def populate_values(self, input, input_schema, preserve_column_refs): ) return cast_compound_type(input, input_schema["_type"]) + def publish_metrics(self, metrics): + if self.monitoring is None: + raise CortexException("monitoring client not initialized") # unexpected + + response = self.monitoring.put_metric_data( + MetricData=metrics, Namespace=self.cortex_config["log_group"] + ) + + if int(response["ResponseMetadata"]["HTTPStatusCode"] / 100) != 2: + logger.warn(response) + raise Exception("failed to publish metrics") + def input_schema_from_type_schema(type_schema): return { diff --git a/pkg/workloads/cortex/onnx_serve/api.py b/pkg/workloads/cortex/onnx_serve/api.py index 1e10830172..b8183465c8 100644 --- a/pkg/workloads/cortex/onnx_serve/api.py +++ b/pkg/workloads/cortex/onnx_serve/api.py @@ -16,21 +16,21 @@ import os import json import argparse -import traceback -import time -from flask import Flask, request, jsonify + +from flask import Flask, request, jsonify, g from flask_api import status from waitress import serve import onnxruntime as rt import numpy as np +import json_tricks +import boto3 from cortex.lib.storage import S3 +from cortex.lib import api_utils from cortex import consts from cortex.lib import util, package, Context from cortex.lib.log import get_logger from cortex.lib.exceptions import CortexException, UserRuntimeException, UserException -import logging -import json_tricks logger = get_logger() logger.propagate = False # prevent double logging (flask modifies root logger) @@ -66,6 +66,24 @@ } +@app.after_request +def after_request(response): + api = local_cache["api"] + ctx = local_cache["ctx"] + + if request.path != "/{}/{}".format(ctx.app["name"], api["name"]): + return response + + logger.info("[%s] %s", util.now_timestamp_rfc_3339(), response.status) + + predictions = None + if "predictions" in g: + predictions = g.predictions + api_utils.post_request_metrics(ctx, api, response, predictions) + + return response + + def prediction_failed(reason): message = "prediction failed: " + reason logger.error(message) @@ -141,6 +159,7 @@ def predict(app_name, api_name): sess = local_cache["sess"] api = local_cache["api"] + ctx = local_cache["ctx"] request_handler = local_cache.get("request_handler") input_metadata = local_cache["input_metadata"] output_metadata = local_cache["output_metadata"] @@ -173,7 +192,6 @@ def predict(app_name, api_name): if request_handler is not None and util.has_function(request_handler, "post_inference"): result = request_handler.post_inference(result, output_metadata) - prediction = {"prediction": result} except CortexException as e: e.wrap("error", "sample {}".format(i + 1)) logger.error(str(e)) @@ -187,8 +205,8 @@ def predict(app_name, api_name): ) return prediction_failed(str(e)) - predictions.append(prediction) - + predictions.append(result) + g.predictions = predictions response["predictions"] = predictions response["resource_id"] = api["id"] @@ -207,14 +225,6 @@ def get_signature(app_name, api_name): return jsonify(response) -@app.after_request -def after_request(response): - if request.full_path.startswith("/healthz"): - return response - logger.info("[%s] %s", util.now_timestamp_rfc_3339(), response.status) - return response - - @app.errorhandler(Exception) def exceptions(e): logger.exception(e) diff --git a/pkg/workloads/cortex/tf_api/api.py b/pkg/workloads/cortex/tf_api/api.py index aaa4f4ff81..c3783ea6db 100644 --- a/pkg/workloads/cortex/tf_api/api.py +++ b/pkg/workloads/cortex/tf_api/api.py @@ -16,10 +16,10 @@ import os import json import argparse -import tensorflow as tf -import traceback import time -from flask import Flask, request, jsonify + +import tensorflow as tf +from flask import Flask, request, jsonify, g from flask_api import status from waitress import serve import grpc @@ -29,7 +29,7 @@ from google.protobuf import json_format from cortex import consts -from cortex.lib import util, tf_lib, package, Context +from cortex.lib import util, tf_lib, package, Context, api_utils from cortex.lib.log import get_logger from cortex.lib.storage import S3, LocalStorage from cortex.lib.exceptions import CortexException, UserRuntimeException, UserException @@ -94,6 +94,24 @@ } +@app.after_request +def after_request(response): + api = local_cache["api"] + ctx = local_cache["ctx"] + + if request.path != "/{}/{}".format(ctx.app["name"], api["name"]): + return response + + logger.info("[%s] %s", util.now_timestamp_rfc_3339(), response.status) + + predictions = None + if "predictions" in g: + predictions = g.predictions + api_utils.post_request_metrics(ctx, api, response, predictions) + + return response + + def transform_sample(sample): ctx = local_cache["ctx"] model = local_cache["model"] @@ -390,7 +408,7 @@ def predict(deployment_name, api_name): return prediction_failed(str(e)) predictions.append(result) - + g.predictions = predictions response["predictions"] = predictions response["resource_id"] = api["id"] @@ -475,14 +493,6 @@ def validate_model_dir(model_dir): ) -@app.after_request -def after_request(response): - if request.full_path.startswith("/healthz"): - return response - logger.info("[%s] %s", util.now_timestamp_rfc_3339(), response.status) - return response - - @app.errorhandler(Exception) def exceptions(e): logger.exception(e)