Skip to content
This repository was archived by the owner on Nov 4, 2024. It is now read-only.

Commit 838c778

Browse files
Fix PythonAnalyzerSession leak (microsoft#866)
1 parent ce8b87a commit 838c778

File tree

2 files changed

+97
-102
lines changed

2 files changed

+97
-102
lines changed

src/Analysis/Ast/Impl/Analyzer/PythonAnalyzer.cs

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
using System;
1717
using System.Collections.Generic;
1818
using System.Diagnostics;
19+
using System.Linq;
1920
using System.Threading;
2021
using System.Threading.Tasks;
2122
using Microsoft.Python.Analysis.Dependencies;
@@ -39,17 +40,20 @@ public sealed class PythonAnalyzer : IPythonAnalyzer, IDisposable {
3940
private readonly DisposeToken _disposeToken = DisposeToken.Create<PythonAnalyzer>();
4041
private readonly object _syncObj = new object();
4142
private readonly AsyncManualResetEvent _analysisCompleteEvent = new AsyncManualResetEvent();
43+
private readonly Action<Task> _startNextSession;
4244
private readonly ProgressReporter _progress;
4345
private readonly ILogger _log;
4446
private readonly int _maxTaskRunning = Environment.ProcessorCount;
4547
private int _version;
46-
private PythonAnalyzerSession _session;
48+
private PythonAnalyzerSession _currentSession;
49+
private PythonAnalyzerSession _nextSession;
4750

4851
public PythonAnalyzer(IServiceManager services) {
4952
_services = services;
5053
_log = services.GetService<ILogger>();
5154
_dependencyResolver = new DependencyResolver<AnalysisModuleKey, PythonAnalyzerEntry>();
5255
_analysisCompleteEvent.Set();
56+
_startNextSession = StartNextSession;
5357

5458
_progress = new ProgressReporter(services.GetService<IProgressService>());
5559
}
@@ -195,24 +199,56 @@ private void AnalyzeDocument(AnalysisModuleKey key, PythonAnalyzerEntry entry, I
195199
LoadMissingDocuments(entry.Module.Interpreter, walker.MissingKeys);
196200
}
197201

198-
PythonAnalyzerSession currentSession, nextSession;
202+
if (TryCreateSession(walker, entry, cancellationToken, out var session)) {
203+
session.Start(true);
204+
}
205+
}
206+
207+
private bool TryCreateSession(IDependencyChainWalker<AnalysisModuleKey, PythonAnalyzerEntry> walker, PythonAnalyzerEntry entry, CancellationToken cancellationToken, out PythonAnalyzerSession session) {
199208
lock (_syncObj) {
200-
currentSession = _session;
201-
if (currentSession != null && currentSession.Version > walker.Version) {
202-
return;
209+
if (_currentSession == null) {
210+
_currentSession = session = CreateSession(walker, null, cancellationToken);
211+
return true;
212+
}
213+
214+
if (_currentSession.Version > walker.Version || _nextSession != null && _nextSession.Version > walker.Version) {
215+
session = null;
216+
return false;
217+
}
218+
219+
if (_version > walker.Version && (!_currentSession.IsCompleted || walker.AffectedValues.GetCount(e => e.NotAnalyzed) < _maxTaskRunning)) {
220+
session = null;
221+
return false;
222+
}
223+
224+
if (_currentSession.IsCompleted) {
225+
_currentSession = session = CreateSession(walker, null, cancellationToken);
226+
return true;
203227
}
204228

205-
if (_version > walker.Version && currentSession != null && (!currentSession.IsCompleted || walker.AffectedValues.GetCount(e => e.NotAnalyzed) < _maxTaskRunning)) {
229+
_currentSession.Cancel();
230+
_nextSession = session = CreateSession(walker, entry.IsUserModule ? entry : null, cancellationToken);
231+
return entry.IsUserModule;
232+
}
233+
}
234+
235+
private void StartNextSession(Task task) {
236+
PythonAnalyzerSession session;
237+
lock (_syncObj) {
238+
if (_nextSession == null) {
206239
return;
207240
}
208241

209-
nextSession = new PythonAnalyzerSession(_services, _progress, _analysisCompleteEvent, _disposeToken.CancellationToken, cancellationToken, walker, _version);
210-
_session = nextSession;
242+
_currentSession = session = _nextSession;
243+
_nextSession = null;
211244
}
212245

213-
nextSession.Start(currentSession, entry);
246+
session.Start(false);
214247
}
215248

249+
private PythonAnalyzerSession CreateSession(IDependencyChainWalker<AnalysisModuleKey, PythonAnalyzerEntry> walker, PythonAnalyzerEntry entry, CancellationToken cancellationToken)
250+
=> new PythonAnalyzerSession(_services, _progress, _analysisCompleteEvent, _startNextSession, _disposeToken.CancellationToken, cancellationToken, walker, _version, entry);
251+
216252
private void LoadMissingDocuments(IPythonInterpreter interpreter, ImmutableArray<AnalysisModuleKey> missingKeys) {
217253
foreach (var (moduleName, _, isTypeshed) in missingKeys) {
218254
var moduleResolution = isTypeshed ? interpreter.TypeshedResolution : interpreter.ModuleResolution;

src/Analysis/Ast/Impl/Analyzer/PythonAnalyzerSession.cs

Lines changed: 52 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
// permissions and limitations under the License.
1515

1616
using System;
17-
using System.ComponentModel;
1817
using System.Diagnostics;
1918
using System.Linq;
2019
using System.Threading;
@@ -25,7 +24,6 @@
2524
using Microsoft.Python.Analysis.Modules;
2625
using Microsoft.Python.Analysis.Types;
2726
using Microsoft.Python.Core;
28-
using Microsoft.Python.Core.Diagnostics;
2927
using Microsoft.Python.Core.Logging;
3028
using Microsoft.Python.Core.Services;
3129
using Microsoft.Python.Parsing.Ast;
@@ -34,21 +32,21 @@ namespace Microsoft.Python.Analysis.Analyzer {
3432
internal sealed class PythonAnalyzerSession {
3533
private readonly int _maxTaskRunning = Environment.ProcessorCount;
3634
private readonly object _syncObj = new object();
37-
private readonly Action<Task> _startNextSession;
3835

36+
private readonly IDependencyChainWalker<AnalysisModuleKey, PythonAnalyzerEntry> _walker;
37+
private readonly PythonAnalyzerEntry _entry;
3938
private readonly CancellationTokenSource _cts;
39+
private readonly Action<Task> _startNextSession;
4040
private readonly IServiceManager _services;
4141
private readonly AsyncManualResetEvent _analysisCompleteEvent;
4242
private readonly IDiagnosticsService _diagnosticsService;
4343
private readonly IProgressReporter _progress;
4444
private readonly IPythonAnalyzer _analyzer;
4545
private readonly ILogger _log;
4646

47-
private State _state = State.NotStarted;
48-
private IDependencyChainWalker<AnalysisModuleKey, PythonAnalyzerEntry> _walker;
47+
private State _state;
48+
private bool _isCanceled;
4949
private int _runningTasks;
50-
private int _version;
51-
private PythonAnalyzerSession _nextSession;
5250

5351
public bool IsCompleted {
5452
get {
@@ -58,21 +56,25 @@ public bool IsCompleted {
5856
}
5957
}
6058

61-
public int Version => _version;
59+
public int Version { get; }
6260

6361
public PythonAnalyzerSession(IServiceManager services,
6462
IProgressReporter progress,
6563
AsyncManualResetEvent analysisCompleteEvent,
64+
Action<Task> startNextSession,
6665
CancellationToken analyzerToken,
6766
CancellationToken sessionToken,
6867
IDependencyChainWalker<AnalysisModuleKey, PythonAnalyzerEntry> walker,
69-
int version) {
68+
int version,
69+
PythonAnalyzerEntry entry) {
7070

7171
_services = services;
7272
_analysisCompleteEvent = analysisCompleteEvent;
73-
_startNextSession = StartNextSession;
74-
_version = version;
73+
_startNextSession = startNextSession;
74+
Version = version;
7575
_walker = walker;
76+
_entry = entry;
77+
_state = State.NotStarted;
7678
_cts = CancellationTokenSource.CreateLinkedTokenSource(analyzerToken, sessionToken);
7779

7880
_diagnosticsService = _services.GetService<IDiagnosticsService>();
@@ -81,119 +83,64 @@ public PythonAnalyzerSession(IServiceManager services,
8183
_log = _services.GetService<ILogger>();
8284
}
8385

84-
public void Start(PythonAnalyzerSession previousSession, PythonAnalyzerEntry entry) {
85-
IDependencyChainWalker<AnalysisModuleKey, PythonAnalyzerEntry> walker;
86-
State previousSessionState;
87-
86+
public void Start(bool analyzeEntry) {
8887
lock (_syncObj) {
89-
walker = _walker;
9088
if (_state != State.NotStarted) {
91-
_cts.Dispose();
89+
analyzeEntry = false;
90+
} else if (_state == State.Completed) {
9291
return;
93-
}
94-
95-
previousSessionState = previousSession?.CancelOrSchedule(this, _version) ?? State.Completed;
96-
if (previousSessionState == State.Completed) {
92+
} else {
9793
_state = State.Started;
98-
_walker = null;
9994
}
10095
}
10196

102-
switch (previousSessionState) {
103-
case State.Started when entry.IsUserModule:
104-
StartAnalysis(entry, walker.Version);
105-
break;
106-
case State.Completed:
107-
Start(walker);
108-
break;
97+
if (analyzeEntry && _entry != null) {
98+
Task.Run(() => Analyze(_entry, _walker.Version, _cts.Token), _cts.Token).DoNotWait();
99+
} else {
100+
StartAsync(_walker).ContinueWith(_startNextSession).DoNotWait();
109101
}
110102
}
111103

112-
private void StartNextSession(Task task) {
113-
PythonAnalyzerSession nextSession;
104+
public void Cancel() {
114105
lock (_syncObj) {
115-
_walker = null;
116-
nextSession = _nextSession;
117-
_nextSession = null;
106+
_isCanceled = true;
118107
}
119-
120-
nextSession?.TryStart();
121108
}
122109

123-
private State CancelOrSchedule(PythonAnalyzerSession nextSession, int version) {
124-
lock (_syncObj) {
125-
Check.InvalidOperation(_nextSession == null);
126-
Check.InvalidOperation(nextSession != null);
127-
switch (_state) {
128-
case State.NotStarted:
129-
_state = State.Completed;
130-
break;
131-
case State.Started:
132-
_nextSession = nextSession;
133-
Interlocked.Exchange(ref _version, version);
134-
break;
135-
case State.Completed:
136-
break;
137-
default:
138-
throw new ArgumentOutOfRangeException();
139-
}
140-
141-
return _state;
142-
}
143-
}
144-
145-
private void TryStart() {
146-
IDependencyChainWalker<AnalysisModuleKey, PythonAnalyzerEntry> walker = default;
147-
lock (_syncObj) {
148-
if (_state == State.NotStarted) {
149-
_state = State.Started;
150-
walker = _walker;
151-
}
152-
153-
_walker = null;
154-
}
155-
156-
if (walker != default) {
157-
Start(walker);
158-
}
159-
}
160-
161-
private void Start(IDependencyChainWalker<AnalysisModuleKey, PythonAnalyzerEntry> walker) => StartAsync(walker).ContinueWith(_startNextSession).DoNotWait();
162-
163110
private async Task StartAsync(IDependencyChainWalker<AnalysisModuleKey, PythonAnalyzerEntry> walker) {
164-
int version;
165111
lock (_syncObj) {
166-
version = _version;
167112
var notAnalyzed = walker.AffectedValues.Count(e => e.NotAnalyzed);
168113

169-
if (version > walker.Version && notAnalyzed < _maxTaskRunning) {
114+
if (_isCanceled && notAnalyzed < _maxTaskRunning) {
170115
_state = State.Completed;
171116
_cts.Dispose();
172117
return;
173118
}
174119
}
175-
120+
121+
var cancellationToken = _cts.Token;
176122
var stopWatch = Stopwatch.StartNew();
177123
foreach (var affectedEntry in walker.AffectedValues) {
178-
affectedEntry.Invalidate(version);
124+
affectedEntry.Invalidate(Version);
179125
}
180126

181127
var originalRemaining = walker.Remaining;
182128
var remaining = originalRemaining;
183129
try {
184130
_log?.Log(TraceEventType.Verbose, $"Analysis version {walker.Version} of {originalRemaining} entries has started.");
185-
remaining = await AnalyzeAffectedEntriesAsync(walker, stopWatch, _cts.Token);
131+
remaining = await AnalyzeAffectedEntriesAsync(walker, stopWatch, cancellationToken);
186132
} finally {
187133
_cts.Dispose();
188134
stopWatch.Stop();
189135

136+
bool isCanceled;
190137
lock (_syncObj) {
191-
if (_version == walker.Version) {
192-
_progress.ReportRemaining(walker.Remaining);
193-
}
194-
138+
isCanceled = _isCanceled;
195139
_state = State.Completed;
196-
_cts.Dispose();
140+
}
141+
142+
if (!isCanceled) {
143+
_progress.ReportRemaining(walker.Remaining);
197144
}
198145

199146
if (_log != null) {
@@ -212,7 +159,12 @@ private async Task<int> AnalyzeAffectedEntriesAsync(IDependencyChainWalker<Analy
212159
IDependencyChainNode<PythonAnalyzerEntry> node;
213160
var remaining = 0;
214161
while ((node = await walker.GetNextAsync(cancellationToken)) != null) {
215-
if (_version > walker.Version && !node.Value.NotAnalyzed) {
162+
bool isCanceled;
163+
lock (_syncObj) {
164+
isCanceled = _isCanceled;
165+
}
166+
167+
if (isCanceled && !node.Value.NotAnalyzed) {
216168
remaining++;
217169
node.Skip();
218170
continue;
@@ -227,7 +179,12 @@ private async Task<int> AnalyzeAffectedEntriesAsync(IDependencyChainWalker<Analy
227179

228180
if (walker.MissingKeys.All(k => k.IsTypeshed)) {
229181
Interlocked.Exchange(ref _runningTasks, 0);
230-
if (_version == walker.Version) {
182+
bool isCanceled;
183+
lock (_syncObj) {
184+
isCanceled = _isCanceled;
185+
}
186+
187+
if (!isCanceled) {
231188
_analysisCompleteEvent.Set();
232189
}
233190
}
@@ -269,16 +226,18 @@ private void Analyze(IDependencyChainWalker<AnalysisModuleKey, PythonAnalyzerEnt
269226
node.Commit();
270227
_log?.Log(TraceEventType.Verbose, $"Analysis of {module.Name}({module.ModuleType}) failed.");
271228
} finally {
272-
if (_version == walker.Version) {
229+
bool isCanceled;
230+
lock (_syncObj) {
231+
isCanceled = _isCanceled;
232+
}
233+
234+
if (!isCanceled) {
273235
_progress.ReportRemaining(walker.Remaining);
274236
}
275237
Interlocked.Decrement(ref _runningTasks);
276238
}
277239
}
278240

279-
private void StartAnalysis(PythonAnalyzerEntry entry, int version)
280-
=> Task.Run(() => Analyze(entry, version, _cts.Token), _cts.Token).DoNotWait();
281-
282241
private void Analyze(PythonAnalyzerEntry entry, int version, CancellationToken cancellationToken) {
283242
var stopWatch = Stopwatch.StartNew();
284243
try {

0 commit comments

Comments
 (0)