Skip to content

Commit 922270b

Browse files
authored
Force log order (#423)
1 parent 2ede28b commit 922270b

File tree

1 file changed

+52
-7
lines changed

1 file changed

+52
-7
lines changed

pkg/operator/workloads/logs.go

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -188,9 +188,7 @@ func GetLogKeys(pod kcore.Pod) strset.Set {
188188
containerStatuses := append(pod.Status.InitContainerStatuses, pod.Status.ContainerStatuses...)
189189
logKeys := strset.New()
190190
for _, status := range containerStatuses {
191-
if status.State.Terminated != nil && (status.State.Terminated.ExitCode != 0 || status.State.Terminated.StartedAt.After(time.Now().Add(-newPodCheckInterval))) {
192-
logKeys.Add(GetLogKey(pod, status).String())
193-
} else if status.State.Running != nil {
191+
if status.State.Terminated != nil || status.State.Running != nil {
194192
logKeys.Add(GetLogKey(pod, status).String())
195193
}
196194
}
@@ -235,6 +233,10 @@ func podCheck(podCheckCancel chan struct{}, socket *websocket.Conn, initialPodLi
235233
defer outw.Close()
236234
defer outr.Close()
237235

236+
procAttr := os.ProcAttr{
237+
Files: []*os.File{inr, outw, outw},
238+
}
239+
238240
socketWriterError := make(chan error, 1)
239241
defer close(socketWriterError)
240242

@@ -286,10 +288,53 @@ func podCheck(podCheckCancel chan struct{}, socket *websocket.Conn, initialPodLi
286288
wrotePending = false
287289
}
288290

291+
initProcesses := strset.New()
292+
tfServingProcesses := strset.New()
293+
apiProcesses := strset.New()
294+
289295
for logProcess := range processesToAdd {
290-
process, err := createKubectlProcess(StringToLogKey(logProcess), &os.ProcAttr{
291-
Files: []*os.File{inr, outw, outw},
292-
})
296+
logKey := StringToLogKey(logProcess)
297+
switch logKey.ContainerName {
298+
case downloaderInitContainerName:
299+
initProcesses.Add(logProcess)
300+
case tfServingContainerName:
301+
tfServingProcesses.Add(logProcess)
302+
case apiContainerName:
303+
apiProcesses.Add(logProcess)
304+
default:
305+
socketWriterError <- errors.New("unexpected container type encountered " + logKey.ContainerName) // unexpected
306+
return
307+
}
308+
}
309+
310+
for logProcess := range initProcesses {
311+
process, err := createKubectlProcess(StringToLogKey(logProcess), &procAttr)
312+
if err != nil {
313+
socketWriterError <- err
314+
return
315+
}
316+
processMap[logProcess] = process
317+
}
318+
319+
if len(tfServingProcesses) != 0 && len(initProcesses) != 0 {
320+
time.Sleep(100 * time.Millisecond)
321+
}
322+
323+
for logProcess := range tfServingProcesses {
324+
process, err := createKubectlProcess(StringToLogKey(logProcess), &procAttr)
325+
if err != nil {
326+
socketWriterError <- err
327+
return
328+
}
329+
processMap[logProcess] = process
330+
}
331+
332+
if len(apiProcesses) != 0 && (len(tfServingProcesses) != 0 || len(initProcesses) != 0) {
333+
time.Sleep(100 * time.Millisecond)
334+
}
335+
336+
for logProcess := range apiProcesses {
337+
process, err := createKubectlProcess(StringToLogKey(logProcess), &procAttr)
293338
if err != nil {
294339
socketWriterError <- err
295340
return
@@ -387,7 +432,7 @@ func pumpStdout(socket *websocket.Conn, socketWriterError chan error, reader io.
387432
select {
388433
case err := <-socketWriterError:
389434
if err != nil {
390-
writeSocket(err.Error(), socket)
435+
writeSocket(err.Error()+"\n", socket)
391436
}
392437
default:
393438
}

0 commit comments

Comments
 (0)