Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 5 additions & 59 deletions manager/manifests/fluentd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ data:
pos_file /var/log/fluentd-containers.log.pos
tag *
read_from_head true
refresh_interval 1
<parse>
@type json
time_format %Y-%m-%dT%H:%M:%S.%NZ
Expand All @@ -81,79 +82,24 @@ data:
@id filter_kube_metadata
</filter>

<match **>
@type route
<route **>
copy
@label @by_pod
</route>
<route **>
copy
@label @by_endpoint
</route>
</match>
</label>

<label @by_pod>
<filter **>
@type record_transformer
@id filter_containers_stream_transformer
<record>
stream_name ${tag_parts[3]}
</record>
remove_keys kubernetes,docker,stream
</filter>
<match **>
@type cloudwatch_logs
region "#{ENV['AWS_REGION']}"
log_group_name "#{ENV['LOG_GROUP_NAME']}"
log_stream_name_key stream_name
remove_log_stream_name_key true
auto_create_stream true
<buffer>
flush_interval 5
chunk_limit_size 2m
queued_chunks_limit_size 32
retry_forever true
</buffer>
</match>
</label>

<label @by_endpoint>
<filter **>
@type record_transformer
enable_ruby
<record>
workload_type ${record.dig("kubernetes", "labels", "workloadType") || "unknown"}
stream_name ${record.dig("kubernetes", "pod_name")}_${record.dig("kubernetes", "container_name")}
log ${record.dig("log").rstrip}
</record>
remove_keys docker,stream
</filter>

<filter **>
@type grep
regexp1 workload_type api
</filter>
<filter **>
@type record_transformer
enable_ruby
<record>
pod_name ${record.dig("kubernetes", "pod_name")}
container_name ${record.dig("kubernetes", "container_name")}
workload_id ${record.dig("kubernetes", "labels", "workloadID")}
stream_name ${record.dig("kubernetes", "labels", "appName")}.${record.dig("kubernetes", "labels", "apiName")}
</record>
remove_keys kubernetes,api_name,app_name,workload_type
remove_keys kubernetes,docker,stream
</filter>
<match **>
@type cloudwatch_logs
@id out_cloudwatch_logs_endpoints
region "#{ENV['AWS_REGION']}"
log_group_name "#{ENV['LOG_GROUP_NAME']}"
log_stream_name_key stream_name
remove_log_stream_name_key true
auto_create_stream true
<buffer>
flush_interval 5
flush_interval 2
chunk_limit_size 2m
queued_chunks_limit_size 32
retry_forever true
Expand Down
4 changes: 2 additions & 2 deletions pkg/lib/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Client struct {
Bucket string
S3 *s3.S3
stsClient *sts.STS
cloudWatchLogsClient *cloudwatchlogs.CloudWatchLogs
CloudWatchLogsClient *cloudwatchlogs.CloudWatchLogs
CloudWatchMetrics *cloudwatch.CloudWatch
awsAccountID string
HashedAccountID string
Expand All @@ -51,7 +51,7 @@ func New(region string, bucket string, withAccountID bool) (*Client, error) {
S3: s3.New(sess),
stsClient: sts.New(sess),
CloudWatchMetrics: cloudwatch.New(sess),
cloudWatchLogsClient: cloudwatchlogs.New(sess),
CloudWatchLogsClient: cloudwatchlogs.New(sess),
}

if withAccountID {
Expand Down
6 changes: 3 additions & 3 deletions pkg/lib/aws/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ func (t ErrorKind) MarshalBinary() ([]byte, error) {
}

func IsNoSuchKeyErr(err error) bool {
return checkErrCode(err, "NoSuchKey")
return CheckErrCode(err, "NoSuchKey")
}

func IsNotFoundErr(err error) bool {
return checkErrCode(err, "NotFound")
return CheckErrCode(err, "NotFound")
}

func checkErrCode(err error, errorCode string) bool {
func CheckErrCode(err error, errorCode string) bool {
awsErr, ok := errors.Cause(err).(awserr.Error)
if !ok {
return false
Expand Down
83 changes: 0 additions & 83 deletions pkg/lib/aws/logs.go

This file was deleted.

Loading