Skip to content

Commit bcce80e

Browse files
authored
Create loggroup per api stream from log group (#466)
1 parent 5196885 commit bcce80e

File tree

10 files changed

+154
-57
lines changed

10 files changed

+154
-57
lines changed

dev/versions.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ Note: overriding horizontal-pod-autoscaler-sync-period on EKS is currently not s
151151

152152
1. Find the latest release on [Dockerhub](https://hub.docker.com/r/fluent/fluentd-kubernetes-daemonset/)
153153
1. Update the base image version in `images/fluentd/Dockerfile`
154+
1. Update record-modifier in `images/fluentd/Dockerfile` to the latest version [here](https://github.com/repeatedly/fluent-plugin-record-modifier/blob/master/VERSION)
154155
1. Update `fluentd.yaml` as necessary (make sure to maintain all Cortex environment variables)
155156

156157
## Statsd

docs/cluster/uninstall.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,5 +43,5 @@ aws s3 ls
4343
aws s3 rb --force s3://<bucket-name>
4444

4545
# Delete the log group
46-
aws logs delete-log-group --log-group-name cortex --region us-west-2
46+
aws logs describe-log-groups --log-group-name-prefix=<log_group_name> --query logGroups[*].[logGroupName] --output text | xargs -I {} aws logs delete-log-group --log-group-name {}
4747
```

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ require (
1818
github.com/tcnksm/go-input v0.0.0-20180404061846-548a7d7a8ee8
1919
github.com/ugorji/go/codec v1.1.7
2020
github.com/xlab/treeprint v0.0.0-20181112141820-a009c3971eca
21+
gopkg.in/karalabe/cookiejar.v2 v2.0.0-20150724131613-8dcd6a7f4951
2122
k8s.io/api v0.0.0-20190620084959-7cf5895f2711
2223
k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719
2324
k8s.io/client-go v0.0.0-20190620085101-78d2af792bab

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,8 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
130130
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
131131
gopkg.in/inf.v0 v0.9.0 h1:3zYtXIO92bvsdS3ggAdA8Gb4Azj0YU+TVY1uGYNFA8o=
132132
gopkg.in/inf.v0 v0.9.0/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
133+
gopkg.in/karalabe/cookiejar.v2 v2.0.0-20150724131613-8dcd6a7f4951 h1:DMTcQRFbEH62YPRWwOI647s2e5mHda3oBPMHfrLs2bw=
134+
gopkg.in/karalabe/cookiejar.v2 v2.0.0-20150724131613-8dcd6a7f4951/go.mod h1:owOxCRGGeAx1uugABik6K9oeNu1cgxP/R9ItzLDxNWA=
133135
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
134136
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
135137
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

images/fluentd/Dockerfile

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,2 @@
11
FROM fluent/fluentd-kubernetes-daemonset:v1.7.1-debian-cloudwatch-1.0
2-
RUN fluent-gem install fluent-plugin-grep
3-
RUN fluent-gem install fluent-plugin-route
2+
RUN fluent-gem install fluent-plugin-record-modifier -v 2.0.1 --no-document

manager/manifests/fluentd.yaml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,9 @@ data:
8383
</filter>
8484
8585
<filter **>
86-
@type record_transformer
87-
enable_ruby
86+
@type record_modifier
8887
<record>
88+
group_name ${record.dig("kubernetes", "labels", "logGroupName") || ENV['LOG_GROUP_NAME']}
8989
stream_name ${record.dig("kubernetes", "pod_name")}_${record.dig("kubernetes", "container_name")}
9090
log ${record.dig("log").rstrip}
9191
</record>
@@ -94,9 +94,10 @@ data:
9494
<match **>
9595
@type cloudwatch_logs
9696
region "#{ENV['AWS_REGION']}"
97-
log_group_name "#{ENV['LOG_GROUP_NAME']}"
97+
log_group_name_key group_name
9898
log_stream_name_key stream_name
9999
remove_log_stream_name_key true
100+
remove_log_group_name_key true
100101
auto_create_stream true
101102
<buffer>
102103
flush_interval 2

pkg/lib/time/time.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,3 +109,13 @@ func LocalHourNow() string {
109109
func OlderThanSeconds(t time.Time, secs float64) bool {
110110
return time.Since(t).Seconds() > secs
111111
}
112+
113+
func MillisToTime(epochMillis int64) time.Time {
114+
seconds := epochMillis / 1000
115+
millis := epochMillis % 1000
116+
return time.Unix(seconds, millis*int64(time.Millisecond))
117+
}
118+
119+
func ToMillis(t time.Time) int64 {
120+
return t.UnixNano() / int64(time.Millisecond)
121+
}

pkg/operator/api/context/context.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,11 @@ func (ctx *Context) VisibleResourceByNameAndType(name string, resourceTypeStr st
209209
return nil, resource.ErrorInvalidType(resourceTypeStr)
210210
}
211211

212+
func (ctx *Context) LogGroupName(apiName string) string {
213+
name := ctx.CortexConfig.LogGroup + "." + ctx.App.Name + "." + apiName
214+
return name
215+
}
216+
212217
func (ctx *Context) Validate() error {
213218
return nil
214219
}

pkg/operator/workloads/api_workload.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ func tfAPISpec(
307307
"resourceID": ctx.APIs[api.Name].ID,
308308
"workloadID": workloadID,
309309
"userFacing": "true",
310+
"logGroupName": ctx.LogGroupName(api.Name),
310311
},
311312
Annotations: map[string]string{
312313
"traffic.sidecar.istio.io/excludeOutboundIPRanges": "0.0.0.0/0",
@@ -482,6 +483,7 @@ func onnxAPISpec(
482483
"resourceID": ctx.APIs[api.Name].ID,
483484
"workloadID": workloadID,
484485
"userFacing": "true",
486+
"logGroupName": ctx.LogGroupName(api.Name),
485487
},
486488
Annotations: map[string]string{
487489
"traffic.sidecar.istio.io/excludeOutboundIPRanges": "0.0.0.0/0",

pkg/operator/workloads/logs.go

Lines changed: 127 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,13 @@ import (
2323
"github.com/aws/aws-sdk-go/aws"
2424
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
2525
"github.com/gorilla/websocket"
26+
"gopkg.in/karalabe/cookiejar.v2/collections/deque"
2627

2728
awslib "github.com/cortexlabs/cortex/pkg/lib/aws"
29+
"github.com/cortexlabs/cortex/pkg/lib/errors"
2830
"github.com/cortexlabs/cortex/pkg/lib/sets/strset"
2931
s "github.com/cortexlabs/cortex/pkg/lib/strings"
32+
libtime "github.com/cortexlabs/cortex/pkg/lib/time"
3033
"github.com/cortexlabs/cortex/pkg/operator/api/context"
3134
"github.com/cortexlabs/cortex/pkg/operator/api/resource"
3235
"github.com/cortexlabs/cortex/pkg/operator/config"
@@ -37,10 +40,40 @@ const (
3740
socketCloseGracePeriod = 10 * time.Second
3841
socketMaxMessageSize = 8192
3942

43+
maxCacheSize = 10000
4044
maxLogLinesPerRequest = 500
41-
pollPeriod = 250 // milliseconds
45+
maxStreamsPerRequest = 50
46+
pollPeriod = 250 * time.Millisecond
47+
streamRefreshPeriod = 2 * time.Second
4248
)
4349

50+
type eventCache struct {
51+
size int
52+
seen strset.Set
53+
eventQueue *deque.Deque
54+
}
55+
56+
func newEventCache(cacheSize int) eventCache {
57+
return eventCache{
58+
size: cacheSize,
59+
seen: strset.New(),
60+
eventQueue: deque.New(),
61+
}
62+
}
63+
64+
func (c *eventCache) Has(eventID string) bool {
65+
return c.seen.Has(eventID)
66+
}
67+
68+
func (c *eventCache) Add(eventID string) {
69+
if c.eventQueue.Size() == c.size {
70+
eventID := c.eventQueue.PopLeft().(string)
71+
c.seen.Remove(eventID)
72+
}
73+
c.seen.Add(eventID)
74+
c.eventQueue.PushRight(eventID)
75+
}
76+
4477
func ReadLogs(appName string, podLabels map[string]string, socket *websocket.Conn) {
4578
podCheckCancel := make(chan struct{})
4679
defer close(podCheckCancel)
@@ -67,14 +100,29 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel
67100
timer := time.NewTimer(0)
68101
defer timer.Stop()
69102

70-
lastTimestamp := int64(0)
71-
previousEvents := strset.New()
103+
lastLogTime := time.Now()
104+
lastLogStreamUpdateTime := time.Now().Add(-1 * streamRefreshPeriod)
105+
106+
logStreamNames := strset.New()
72107

73108
var currentContextID string
74109
var prefix string
75-
var ctx *context.Context
76110
var err error
77111

112+
var ctx = CurrentContext(appName)
113+
eventCache := newEventCache(maxCacheSize)
114+
115+
if ctx == nil {
116+
writeAndCloseSocket(socket, "\ndeployment "+appName+" not found")
117+
return
118+
}
119+
120+
logGroupName, err := getLogGroupName(ctx, podLabels)
121+
if err != nil {
122+
writeAndCloseSocket(socket, err.Error()) // unexpected
123+
return
124+
}
125+
78126
for {
79127
select {
80128
case <-podCheckCancel:
@@ -83,8 +131,7 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel
83131
ctx = CurrentContext(appName)
84132

85133
if ctx == nil {
86-
writeString(socket, "\ndeployment "+appName+" not found")
87-
closeSocket(socket)
134+
writeAndCloseSocket(socket, "\ndeployment "+appName+" not found")
88135
continue
89136
}
90137

@@ -93,105 +140,129 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel
93140
if podLabels["workloadType"] == resource.APIType.String() {
94141
apiName := podLabels["apiName"]
95142
if _, ok := ctx.APIs[apiName]; !ok {
96-
writeString(socket, "\napi "+apiName+" was not found in latest deployment")
97-
closeSocket(socket)
143+
writeAndCloseSocket(socket, "\napi "+apiName+" was not found in latest deployment")
98144
continue
99145
}
100146
writeString(socket, "\na new deployment was detected, streaming logs from the latest deployment")
101147
} else {
102-
writeString(socket, "\nlogging non-api workloads is not supported") // unexpected
103-
closeSocket(socket)
148+
writeAndCloseSocket(socket, "\nlogging non-api workloads is not supported") // unexpected
104149
continue
105150
}
106151
} else {
107-
lastTimestamp = ctx.CreatedEpoch * 1000
108-
}
109-
110-
if podLabels["workloadType"] == resource.APIType.String() {
111-
podLabels["workloadID"] = ctx.APIs[podLabels["apiName"]].WorkloadID
152+
lastLogTime, _ = getPodStartTime(podLabels)
112153
}
113154

114155
currentContextID = ctx.ID
115-
116156
writeString(socket, "\nretrieving logs...")
117-
prefix = ""
118157
}
119158

120-
if len(prefix) == 0 {
121-
prefix, err = getPrefix(podLabels)
159+
if lastLogStreamUpdateTime.Add(streamRefreshPeriod).Before(time.Now()) {
160+
newLogStreamNames, err := getLogStreams(logGroupName)
122161
if err != nil {
123-
writeString(socket, err.Error())
124-
closeSocket(socket)
162+
writeAndCloseSocket(socket, "error encountered while searching for log streams: "+err.Error())
125163
continue
126164
}
165+
166+
if !logStreamNames.IsEqual(newLogStreamNames) {
167+
lastLogTime = lastLogTime.Add(-streamRefreshPeriod)
168+
logStreamNames = newLogStreamNames
169+
}
170+
lastLogStreamUpdateTime = time.Now()
127171
}
128172

129-
if len(prefix) == 0 {
173+
if len(logStreamNames) == 0 {
130174
timer.Reset(pollPeriod)
131175
continue
132176
}
133177

134-
endTime := time.Now().Unix() * 1000
135-
startTime := lastTimestamp - pollPeriod
178+
endTime := libtime.ToMillis(time.Now())
179+
136180
logEventsOutput, err := config.AWS.CloudWatchLogsClient.FilterLogEvents(&cloudwatchlogs.FilterLogEventsInput{
137-
LogGroupName: aws.String(config.Cortex.LogGroup),
138-
LogStreamNamePrefix: aws.String(prefix),
139-
StartTime: aws.Int64(startTime),
140-
EndTime: aws.Int64(endTime), // requires milliseconds
141-
Limit: aws.Int64(int64(maxLogLinesPerRequest)),
181+
LogGroupName: aws.String(logGroupName),
182+
LogStreamNames: aws.StringSlice(logStreamNames.Slice()),
183+
StartTime: aws.Int64(libtime.ToMillis(lastLogTime.Add(-pollPeriod))),
184+
EndTime: aws.Int64(endTime),
185+
Limit: aws.Int64(int64(maxLogLinesPerRequest)),
142186
})
143187

144188
if err != nil {
145-
if !awslib.CheckErrCode(err, "ResourceNotFoundException") {
146-
writeString(socket, "error encountered while fetching logs from cloudwatch: "+err.Error())
147-
closeSocket(socket)
189+
if !awslib.CheckErrCode(err, cloudwatchlogs.ErrCodeResourceNotFoundException) {
190+
writeAndCloseSocket(socket, "error encountered while fetching logs from cloudwatch: "+err.Error())
148191
continue
149192
}
150193
}
151194

152-
newEvents := strset.New()
195+
lastLogTimestampMillis := libtime.ToMillis(lastLogTime)
153196
for _, logEvent := range logEventsOutput.Events {
154197
var log FluentdLog
155198
json.Unmarshal([]byte(*logEvent.Message), &log)
156-
157-
if !previousEvents.Has(*logEvent.EventId) {
199+
if !eventCache.Has(*logEvent.EventId) {
158200
socket.WriteMessage(websocket.TextMessage, []byte(log.Log))
159-
if *logEvent.Timestamp > lastTimestamp {
160-
lastTimestamp = *logEvent.Timestamp
201+
if *logEvent.Timestamp > lastLogTimestampMillis {
202+
lastLogTimestampMillis = *logEvent.Timestamp
161203
}
204+
eventCache.Add(*logEvent.EventId)
162205
}
163-
newEvents.Add(*logEvent.EventId)
164206
}
165207

208+
lastLogTime = libtime.MillisToTime(lastLogTimestampMillis)
166209
if len(logEventsOutput.Events) == maxLogLinesPerRequest {
167-
socket.WriteMessage(websocket.TextMessage, []byte("---- Showing at most "+s.Int(maxLogLinesPerRequest)+" lines. Visit AWS cloudwatch logs console and search for \""+prefix+"\" in log group \""+config.Cortex.LogGroup+"\" for complete logs ----"))
168-
lastTimestamp = endTime
210+
writeString(socket, "---- Showing at most "+s.Int(maxLogLinesPerRequest)+" lines. Visit AWS cloudwatch logs console and search for \""+prefix+"\" in log group \""+config.Cortex.LogGroup+"\" for complete logs ----")
211+
lastLogTime = libtime.MillisToTime(endTime)
169212
}
170213

171-
previousEvents = newEvents
172-
timer.Reset(pollPeriod * time.Millisecond)
214+
timer.Reset(pollPeriod)
173215
}
174216
}
175217
}
176218

177-
func getPrefix(searchLabels map[string]string) (string, error) {
219+
func getLogStreams(logGroupName string) (strset.Set, error) {
220+
describeLogStreamsOutput, err := config.AWS.CloudWatchLogsClient.DescribeLogStreams(&cloudwatchlogs.DescribeLogStreamsInput{
221+
OrderBy: aws.String(cloudwatchlogs.OrderByLastEventTime),
222+
Descending: aws.Bool(true),
223+
LogGroupName: aws.String(logGroupName),
224+
Limit: aws.Int64(maxStreamsPerRequest),
225+
})
226+
if err != nil {
227+
if !awslib.CheckErrCode(err, cloudwatchlogs.ErrCodeResourceNotFoundException) {
228+
return nil, err
229+
}
230+
return nil, nil
231+
}
232+
233+
streams := strset.New()
234+
235+
for _, stream := range describeLogStreamsOutput.LogStreams {
236+
streams.Add(*stream.LogStreamName)
237+
}
238+
return streams, nil
239+
}
240+
241+
func getPodStartTime(searchLabels map[string]string) (time.Time, error) {
178242
pods, err := config.Kubernetes.ListPodsByLabels(searchLabels)
179243
if err != nil {
180-
return "", err
244+
return time.Time{}, err
181245
}
182246

183247
if len(pods) == 0 {
184-
return "", nil
248+
return time.Now(), nil
185249
}
186250

187-
podLabels := pods[0].GetLabels()
188-
if apiName, ok := podLabels["apiName"]; ok {
189-
if podTemplateHash, ok := podLabels["pod-template-hash"]; ok {
190-
return internalAPIName(apiName, podLabels["appName"]) + "-" + podTemplateHash, nil
251+
startTime := pods[0].CreationTimestamp.Time
252+
for _, pod := range pods[1:] {
253+
if pod.CreationTimestamp.Time.Before(startTime) {
254+
startTime = pod.CreationTimestamp.Time
191255
}
192-
return "", nil // unexpected, pod template hash not set yet
193256
}
194-
return pods[0].Name, nil // unexpected, logging non api resources
257+
258+
return startTime, nil
259+
}
260+
261+
func getLogGroupName(ctx *context.Context, searchLabels map[string]string) (string, error) {
262+
if searchLabels["workloadType"] == resource.APIType.String() {
263+
return ctx.LogGroupName(searchLabels["apiName"]), nil
264+
}
265+
return "nil", errors.New("unsupported workload type") // unexpected
195266
}
196267

197268
func writeString(socket *websocket.Conn, message string) {
@@ -203,3 +274,8 @@ func closeSocket(socket *websocket.Conn) {
203274
socket.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
204275
time.Sleep(socketCloseGracePeriod)
205276
}
277+
278+
func writeAndCloseSocket(socket *websocket.Conn, message string) {
279+
writeString(socket, message)
280+
closeSocket(socket)
281+
}

0 commit comments

Comments
 (0)