Skip to content

Commit 42d9745

Browse files
committed
add job ids as correlation ids for log messages
1 parent 43d523c commit 42d9745

File tree

7 files changed

+119
-52
lines changed

7 files changed

+119
-52
lines changed

progress-tracking/src/main/java/org/neo4j/gds/core/utils/progress/BatchingProgressLogger.java

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class BatchingProgressLogger implements ProgressLogger {
3636
public static final long MAXIMUM_LOG_INTERVAL = (long) Math.pow(2, 13);
3737

3838
private final Log log;
39+
private final JobId jobId;
3940
private final Concurrency concurrency;
4041
private long taskVolume;
4142
private long batchSize;
@@ -58,12 +59,13 @@ static long calculateBatchSize(long taskVolume, Concurrency concurrency) {
5859
return Math.max(1, BitUtil.nextHighestPowerOfTwo(batchSize));
5960
}
6061

61-
public BatchingProgressLogger(Log log, Task task, Concurrency concurrency) {
62-
this(log, task, calculateBatchSize(task, concurrency), concurrency);
62+
public BatchingProgressLogger(Log log, JobId jobId, Task task, Concurrency concurrency) {
63+
this(log, jobId, task, calculateBatchSize(task, concurrency), concurrency);
6364
}
6465

65-
public BatchingProgressLogger(Log log, Task task, long batchSize, Concurrency concurrency) {
66+
public BatchingProgressLogger(Log log, JobId jobId, Task task, long batchSize, Concurrency concurrency) {
6667
this.log = log;
68+
this.jobId = jobId;
6769
this.taskVolume = task.getProgress().volume();
6870
this.batchSize = batchSize;
6971
this.taskName = task.description();
@@ -136,44 +138,38 @@ private synchronized void doLogPercentage(Supplier<String> msgFactory, long prog
136138
}
137139

138140
private void logProgress(int nextPercentage) {
139-
logInfo(formatWithLocale("[%s] %s %d%%", Thread.currentThread().getName(), taskName, nextPercentage));
141+
logMessage(formatWithLocale("%d%%", nextPercentage));
140142
}
141143

142144
private void logProgressWithMessage(int nextPercentage, String msg) {
143-
logInfo(
144-
formatWithLocale("[%s] %s %d%% %s", Thread.currentThread().getName(), taskName, nextPercentage, msg)
145-
);
145+
logMessage(formatWithLocale("%d%% %s", nextPercentage, msg));
146146
}
147147

148148
@Override
149149
public void logMessage(String msg) {
150-
log.info("[%s] %s %s", Thread.currentThread().getName(), taskName, msg);
150+
log.info("[%s] [%s] %s %s", jobId.asString(), Thread.currentThread().getName(), taskName, msg);
151151
}
152152

153153
@Override
154154
public void logMessage(Supplier<String> msg) {
155155
logMessage(Objects.requireNonNull(msg.get()));
156156
}
157157

158-
private void logInfo(String message) {
159-
log.info(message);
160-
}
161-
162158
@Override
163159
public void logDebug(Supplier<String> msg) {
164160
if (log.isDebugEnabled()) {
165-
log.debug("[%s] %s %s", Thread.currentThread().getName(), taskName, msg.get());
161+
log.debug("[%s] [%s] %s %s", jobId.asString(), Thread.currentThread().getName(), taskName, msg.get());
166162
}
167163
}
168164

169165
@Override
170166
public void logWarning(String message) {
171-
log.warn("[%s] %s %s", Thread.currentThread().getName(), taskName, message);
167+
log.warn("[%s] [%s] %s %s", jobId.asString(), Thread.currentThread().getName(), taskName, message);
172168
}
173169

174170
@Override
175171
public void logError(String message) {
176-
log.error("[%s] %s %s", Thread.currentThread().getName(), taskName, message);
172+
log.error("[%s] [%s] %s %s", jobId.asString(), Thread.currentThread().getName(), taskName, message);
177173
}
178174

179175
@Override

progress-tracking/src/main/java/org/neo4j/gds/core/utils/progress/tasks/TaskProgressLogger.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.jetbrains.annotations.Nullable;
2323
import org.neo4j.gds.core.concurrency.Concurrency;
2424
import org.neo4j.gds.core.utils.progress.BatchingProgressLogger;
25+
import org.neo4j.gds.core.utils.progress.JobId;
2526
import org.neo4j.gds.core.utils.progress.ProgressLogger;
2627
import org.neo4j.gds.logging.Log;
2728

@@ -32,14 +33,14 @@ public class TaskProgressLogger extends BatchingProgressLogger {
3233
private final Task baseTask;
3334
private final TaskVisitor loggingLeafTaskVisitor;
3435

35-
TaskProgressLogger(Log log, Task baseTask, Concurrency concurrency) {
36-
super(log, baseTask, concurrency);
36+
TaskProgressLogger(Log log, JobId jobId, Task baseTask, Concurrency concurrency) {
37+
super(log, jobId, baseTask, concurrency);
3738
this.baseTask = baseTask;
3839
this.loggingLeafTaskVisitor = new LoggingLeafTaskVisitor(this);
39-
4040
}
41-
TaskProgressLogger(Log log, Task baseTask, Concurrency concurrency, TaskVisitor leafTaskVisitor) {
42-
super(log, baseTask, concurrency);
41+
42+
TaskProgressLogger(Log log, JobId jobId, Task baseTask, Concurrency concurrency, TaskVisitor leafTaskVisitor) {
43+
super(log, jobId, baseTask, concurrency);
4344
this.baseTask = baseTask;
4445
this.loggingLeafTaskVisitor = leafTaskVisitor;
4546
}

progress-tracking/src/main/java/org/neo4j/gds/core/utils/progress/tasks/TaskProgressTracker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,10 @@ public TaskProgressTracker(
6969
TaskRegistryFactory taskRegistryFactory,
7070
UserLogRegistryFactory userLogRegistryFactory
7171
) {
72-
this(baseTask, jobId, taskRegistryFactory, new TaskProgressLogger(log, baseTask, concurrency), userLogRegistryFactory);
72+
this(baseTask, jobId, taskRegistryFactory, new TaskProgressLogger(log, jobId, baseTask, concurrency), userLogRegistryFactory);
7373
}
7474

75-
protected TaskProgressTracker(
75+
public TaskProgressTracker(
7676
Task baseTask,
7777
JobId jobId,
7878
TaskRegistryFactory taskRegistryFactory,

progress-tracking/src/main/java/org/neo4j/gds/core/utils/progress/tasks/TaskTreeProgressTracker.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public TaskTreeProgressTracker(
4141
taskRegistryFactory,
4242
new TaskProgressLogger(
4343
log,
44+
jobId,
4445
baseTask,
4546
concurrency,
4647
new PassThroughTaskVisitor()

progress-tracking/src/test/java/org/neo4j/gds/core/utils/BatchingProgressLoggerTest.java renamed to progress-tracking/src/test/java/org/neo4j/gds/core/utils/progress/BatchingProgressLoggerTest.java

Lines changed: 82 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* You should have received a copy of the GNU General Public License
1818
* along with this program. If not, see <http://www.gnu.org/licenses/>.
1919
*/
20-
package org.neo4j.gds.core.utils;
20+
package org.neo4j.gds.core.utils.progress;
2121

2222
import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
2323
import org.eclipse.collections.impl.utility.ListIterate;
@@ -27,7 +27,7 @@
2727
import org.neo4j.gds.compat.TestLog;
2828
import org.neo4j.gds.core.concurrency.Concurrency;
2929
import org.neo4j.gds.core.concurrency.RunWithConcurrency;
30-
import org.neo4j.gds.core.utils.progress.BatchingProgressLogger;
30+
import org.neo4j.gds.core.utils.progress.tasks.LeafTask;
3131
import org.neo4j.gds.core.utils.progress.tasks.Tasks;
3232
import org.neo4j.gds.logging.GdsTestLog;
3333
import org.neo4j.gds.logging.Log;
@@ -46,6 +46,9 @@
4646
import static org.assertj.core.api.Assertions.assertThatThrownBy;
4747
import static org.assertj.core.api.Assertions.fail;
4848
import static org.junit.jupiter.api.Assertions.assertEquals;
49+
import static org.mockito.Mockito.mock;
50+
import static org.mockito.Mockito.verify;
51+
import static org.mockito.Mockito.when;
4952
import static org.neo4j.gds.assertj.Extractors.removingThreadId;
5053
import static org.neo4j.gds.utils.StringFormatting.formatWithLocale;
5154

@@ -60,6 +63,7 @@ void mustLogProgressOnlyAfterBatchSizeInvocations() {
6063
var concurrency = new Concurrency(1);
6164
var logger = new BatchingProgressLogger(
6265
log,
66+
new JobId("some job id"),
6367
Tasks.leaf("foo", taskVolume),
6468
batchSize,
6569
concurrency
@@ -71,13 +75,13 @@ void mustLogProgressOnlyAfterBatchSizeInvocations() {
7175
}
7276

7377
var threadName = Thread.currentThread().getName();
74-
var messageTemplate = "[%s] foo %d%% %d";
78+
var messageTemplate = "[%s] [%s] foo %d%% %d";
7579
var expectedMessages = List.of(
76-
formatWithLocale(messageTemplate, threadName, 1 * batchSize * 100 / taskVolume, 1 * batchSize - 1),
77-
formatWithLocale(messageTemplate, threadName, 2 * batchSize * 100 / taskVolume, 2 * batchSize - 1),
78-
formatWithLocale(messageTemplate, threadName, 3 * batchSize * 100 / taskVolume, 3 * batchSize - 1),
79-
formatWithLocale(messageTemplate, threadName, 4 * batchSize * 100 / taskVolume, 4 * batchSize - 1),
80-
formatWithLocale(messageTemplate, threadName, 5 * batchSize * 100 / taskVolume, 5 * batchSize - 1)
80+
formatWithLocale(messageTemplate, "some job id", threadName, 1 * batchSize * 100 / taskVolume, 1 * batchSize - 1),
81+
formatWithLocale(messageTemplate, "some job id", threadName, 2 * batchSize * 100 / taskVolume, 2 * batchSize - 1),
82+
formatWithLocale(messageTemplate, "some job id", threadName, 3 * batchSize * 100 / taskVolume, 3 * batchSize - 1),
83+
formatWithLocale(messageTemplate, "some job id", threadName, 4 * batchSize * 100 / taskVolume, 4 * batchSize - 1),
84+
formatWithLocale(messageTemplate, "some job id", threadName, 5 * batchSize * 100 / taskVolume, 5 * batchSize - 1)
8185
);
8286

8387
var messages = log.getMessages("info");
@@ -93,6 +97,7 @@ void mustLogProgressOnlyAfterHittingOrExceedingBatchSize() {
9397
var concurrency = new Concurrency(1);
9498
var logger = new BatchingProgressLogger(
9599
log,
100+
new JobId("a job id"),
96101
Tasks.leaf("foo", taskVolume),
97102
batchSize,
98103
concurrency
@@ -103,7 +108,7 @@ void mustLogProgressOnlyAfterHittingOrExceedingBatchSize() {
103108
}
104109

105110
var threadName = Thread.currentThread().getName();
106-
var messageTemplate = "[%s] foo %d%%";
111+
var messageTemplate = "[%s] [%s] foo %d%%";
107112

108113
var progressSteps = IntStream
109114
.iterate(0, i -> i < taskVolume, i -> i + progressStep)
@@ -113,7 +118,7 @@ void mustLogProgressOnlyAfterHittingOrExceedingBatchSize() {
113118

114119
var expectedMessages = loggedProgressSteps.stream()
115120
.skip(1)
116-
.map(i -> formatWithLocale(messageTemplate, threadName, i * 100 / taskVolume))
121+
.map(i -> formatWithLocale(messageTemplate, "a job id", threadName, i * 100 / taskVolume))
117122
.collect(Collectors.toList());
118123

119124
var messages = log.getMessages("info");
@@ -150,7 +155,7 @@ void shouldLogAfterResetWhereACallCountHigherThanBatchSizeIsLeftBehind() {
150155
var taskVolume = 1337;
151156

152157
var log = new GdsTestLog();
153-
var logger = new BatchingProgressLogger(log, Tasks.leaf("Test", taskVolume), concurrency); // batchSize is 13
158+
var logger = new BatchingProgressLogger(log, new JobId(), Tasks.leaf("Test", taskVolume), concurrency); // batchSize is 13
154159
logger.reset(taskVolume);
155160
logger.logProgress(20); // callCount is 20, call count after logging == 20 - 13 = 7
156161
assertThat(log.getMessages(TestLog.INFO))
@@ -167,7 +172,7 @@ void shouldLogAfterResetWhereACallCountHigherThanBatchSizeIsLeftBehind() {
167172
void log100Percent() {
168173
var log = new GdsTestLog();
169174
var concurrency = new Concurrency(1);
170-
var testProgressLogger = new BatchingProgressLogger(log, Tasks.leaf("Test"), concurrency);
175+
var testProgressLogger = new BatchingProgressLogger(log, new JobId(), Tasks.leaf("Test"), concurrency);
171176
testProgressLogger.reset(1337);
172177
testProgressLogger.logFinishPercentage();
173178
assertThat(log.getMessages(TestLog.INFO))
@@ -179,7 +184,7 @@ void log100Percent() {
179184
void shouldLog100OnlyOnce() {
180185
var log = new GdsTestLog();
181186
var concurrency = new Concurrency(1);
182-
var testProgressLogger = new BatchingProgressLogger(log, Tasks.leaf("Test"), concurrency);
187+
var testProgressLogger = new BatchingProgressLogger(log, new JobId(), Tasks.leaf("Test"), concurrency);
183188
testProgressLogger.reset(1);
184189
testProgressLogger.logProgress(1);
185190
testProgressLogger.logFinishPercentage();
@@ -192,7 +197,7 @@ void shouldLog100OnlyOnce() {
192197
void shouldNotExceed100Percent() {
193198
var log = new GdsTestLog();
194199
var concurrency = new Concurrency(1);
195-
var testProgressLogger = new BatchingProgressLogger(log, Tasks.leaf("Test"), concurrency);
200+
var testProgressLogger = new BatchingProgressLogger(log, new JobId(), Tasks.leaf("Test"), concurrency);
196201
testProgressLogger.reset(1);
197202
testProgressLogger.logProgress(1); // reaches 100 %
198203
testProgressLogger.logProgress(1); // exceeds 100 %
@@ -205,6 +210,7 @@ void shouldNotExceed100Percent() {
205210
void closesThreadLocal() {
206211
var logger = new BatchingProgressLogger(
207212
Log.noOpLog(),
213+
new JobId(),
208214
Tasks.leaf("foo", 42),
209215
new Concurrency(1)
210216
);
@@ -226,7 +232,7 @@ void closesThreadLocal() {
226232

227233
private static List<Integer> performLogging(long taskVolume, Concurrency concurrency) {
228234
var log = new GdsTestLog();
229-
var logger = new BatchingProgressLogger(log, Tasks.leaf("Test", taskVolume), concurrency);
235+
var logger = new BatchingProgressLogger(log, new JobId("the_job_id"), Tasks.leaf("Test", taskVolume), concurrency);
230236
logger.reset(taskVolume);
231237

232238
var batchSize = (int) BitUtil.ceilDiv(taskVolume, concurrency.value());
@@ -248,9 +254,69 @@ private static List<Integer> performLogging(long taskVolume, Concurrency concurr
248254
return log
249255
.getMessages(TestLog.INFO)
250256
.stream()
251-
.map(progress -> progress.split(" ")[2].replace("%", ""))
257+
.map(progress -> progress.split(" ")[3].replace("%", ""))
252258
.map(Integer::parseInt)
253259
.collect(Collectors.toList());
254260
}
255261

262+
@Test
263+
void shouldPrependCorrelationIdToInfoLogMessages() {
264+
var log = mock(Log.class);
265+
var batchingProgressLogger = new BatchingProgressLogger(
266+
log,
267+
JobId.parse("my job id"),
268+
new LeafTask("Monsieur Alfonse", 42),
269+
new Concurrency(87)
270+
);
271+
272+
batchingProgressLogger.logMessage("Swiftly, and with style");
273+
274+
verify(log).info("[%s] [%s] %s %s", "my job id", "Test worker", "Monsieur Alfonse", "Swiftly, and with style");
275+
}
276+
277+
@Test
278+
void shouldPrependCorrelationIdToDebugLogMessages() {
279+
var log = mock(Log.class);
280+
var batchingProgressLogger = new BatchingProgressLogger(
281+
log,
282+
JobId.parse("my job id"),
283+
new LeafTask("Monsieur Alfonse", 42),
284+
new Concurrency(87)
285+
);
286+
287+
when(log.isDebugEnabled()).thenReturn(true);
288+
batchingProgressLogger.logDebug("Swiftly, and with style");
289+
290+
verify(log).debug("[%s] [%s] %s %s", "my job id", "Test worker", "Monsieur Alfonse", "Swiftly, and with style");
291+
}
292+
293+
@Test
294+
void shouldPrependCorrelationIdToWarningLogMessages() {
295+
var log = mock(Log.class);
296+
var batchingProgressLogger = new BatchingProgressLogger(
297+
log,
298+
JobId.parse("my job id"),
299+
new LeafTask("Monsieur Alfonse", 42),
300+
new Concurrency(87)
301+
);
302+
303+
batchingProgressLogger.logWarning("Swiftly, and with style");
304+
305+
verify(log).warn("[%s] [%s] %s %s", "my job id", "Test worker", "Monsieur Alfonse", "Swiftly, and with style");
306+
}
307+
308+
@Test
309+
void shouldPrependCorrelationIdToErrorLogMessages() {
310+
var log = mock(Log.class);
311+
var batchingProgressLogger = new BatchingProgressLogger(
312+
log,
313+
JobId.parse("my job id"),
314+
new LeafTask("Monsieur Alfonse", 42),
315+
new Concurrency(87)
316+
);
317+
318+
batchingProgressLogger.logError("Swiftly, and with style");
319+
320+
verify(log).error("[%s] [%s] %s %s", "my job id", "Test worker", "Monsieur Alfonse", "Swiftly, and with style");
321+
}
256322
}

progress-tracking/src/test/java/org/neo4j/gds/core/utils/progress/tasks/TaskProgressLoggerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.junit.jupiter.api.Test;
2323
import org.neo4j.gds.core.concurrency.Concurrency;
24+
import org.neo4j.gds.core.utils.progress.JobId;
2425
import org.neo4j.gds.logging.Log;
2526

2627
import java.util.List;
@@ -36,7 +37,7 @@ void shouldNotEliminateParentTaskIfCommonPreffix(){
3637
var taskAB = Tasks.task("A B", List.of(taskA));
3738
var task = Tasks.task("T", List.of(taskA));
3839

39-
var logger =new TaskProgressLogger(Log.noOpLog(),task,new Concurrency(1));
40+
var logger =new TaskProgressLogger(Log.noOpLog(), new JobId(), task,new Concurrency(1));
4041

4142
assertThatNoException().isThrownBy(
4243
()-> {

0 commit comments

Comments
 (0)