Skip to content

Commit 8d7fe80

Browse files
committed
NPE on sendEvent
- Adding null check for current state - Remove synchronization from scheduleEventQueueProcessing method in executor. Looks like this sync is not really needed and indeed may cause jvm level deadlocks if threads are used for execution. - Change how some internals in AbstractStateMachine are synchronized to limit changes for deadlock. - Relates to #307 - Backport of #359 #360
1 parent b1d708e commit 8d7fe80

File tree

4 files changed

+274
-33
lines changed

4 files changed

+274
-33
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Copyright 2017 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.statemachine.buildtests;
17+
18+
import org.junit.Test;
19+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
20+
import org.springframework.statemachine.StateMachine;
21+
import org.springframework.statemachine.config.StateMachineBuilder;
22+
import org.springframework.statemachine.test.StateMachineTestPlan;
23+
import org.springframework.statemachine.test.StateMachineTestPlanBuilder;
24+
25+
public class TimerSmokeTests {
26+
27+
private static ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
28+
{
29+
taskExecutor.initialize();
30+
}
31+
32+
private StateMachine<String, String> buildMachine() throws Exception {
33+
34+
StateMachineBuilder.Builder<String, String> builder = StateMachineBuilder.builder();
35+
36+
builder.configureConfiguration()
37+
.withConfiguration()
38+
.taskExecutor(taskExecutor);
39+
40+
builder.configureStates()
41+
.withStates()
42+
.initial("initial")
43+
.end("end");
44+
45+
builder.configureTransitions()
46+
.withExternal()
47+
.source("initial")
48+
.target("end")
49+
.timerOnce(30)
50+
.and()
51+
.withLocal()
52+
.source("initial")
53+
.event("repeate");
54+
55+
return builder.build();
56+
}
57+
58+
private StateMachine<String, String> buildMachine2() throws Exception {
59+
60+
StateMachineBuilder.Builder<String, String> builder = StateMachineBuilder.builder();
61+
62+
builder.configureConfiguration()
63+
.withConfiguration()
64+
.taskExecutor(taskExecutor);
65+
66+
builder.configureStates()
67+
.withStates()
68+
.initial("initial").end("end").and()
69+
.withStates().parent("initial").initial("inner");
70+
71+
builder.configureTransitions()
72+
.withExternal()
73+
.source("initial")
74+
.target("end")
75+
.timerOnce(30)
76+
.and()
77+
.withExternal()
78+
.source("inner")
79+
.target("end")
80+
.timerOnce(15)
81+
.and()
82+
.withLocal()
83+
.source("inner")
84+
.event("repeate");
85+
86+
return builder.build();
87+
}
88+
89+
@Test
90+
public void testNPE() throws Exception {
91+
StateMachine<String, String> stateMachine;
92+
for (int i = 0; i < 20; i++) {
93+
stateMachine = buildMachine();
94+
stateMachine.start();
95+
while (!stateMachine.isComplete()) {
96+
stateMachine.sendEvent("repeate");
97+
}
98+
}
99+
}
100+
101+
@Test
102+
public void testNPE2() throws Exception {
103+
104+
StateMachine<String, String> stateMachine;
105+
106+
for (int i = 0; i < 20; i++) {
107+
stateMachine = buildMachine2();
108+
stateMachine.start();
109+
while(!stateMachine.isComplete()) {
110+
stateMachine.sendEvent("repeate");
111+
}
112+
stateMachine.stop();
113+
}
114+
}
115+
116+
@Test
117+
public void testDeadlock() throws Exception {
118+
StateMachineTestPlan<String, String> plan;
119+
for (int i = 0; i < 20; i++) {
120+
plan = StateMachineTestPlanBuilder.<String, String> builder()
121+
.defaultAwaitTime(1)
122+
.stateMachine(buildMachine())
123+
.step()
124+
.expectStateMachineStarted(1)
125+
.expectStateEntered(1)
126+
.expectStateEntered("initial")
127+
.and()
128+
.step()
129+
.sendEvent("repeate")
130+
.expectStates("initial")
131+
.and()
132+
.step()
133+
.expectStateEntered(1)
134+
.expectStateEntered("end")
135+
.and()
136+
.step()
137+
.expectStateMachineStopped(1)
138+
.and()
139+
.build();
140+
plan.test();
141+
}
142+
}
143+
}

spring-statemachine-core/src/main/java/org/springframework/statemachine/support/AbstractStateMachine.java

Lines changed: 40 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ public abstract class AbstractStateMachine<S, E> extends StateMachineObjectSuppo
116116
private volatile Message<E> forwardedInitialEvent;
117117

118118
private final Object lock = new Object();
119+
private final Object lock2 = new Object();
119120

120121
private StateMachine<S, E> parentMachine;
121122

@@ -211,30 +212,9 @@ public PseudoState<S, E> getHistoryState() {
211212

212213
@Override
213214
public boolean sendEvent(Message<E> event) {
214-
if (hasStateMachineError()) {
215-
// TODO: should we throw exception?
216-
notifyEventNotAccepted(buildStateContext(Stage.EVENT_NOT_ACCEPTED, event, null, getRelayStateMachine(), getState(), null));
217-
return false;
218-
}
219-
220-
try {
221-
event = getStateMachineInterceptors().preEvent(event, this);
222-
} catch (Exception e) {
223-
log.info("Event " + event + " threw exception in interceptors, not accepting event");
224-
notifyEventNotAccepted(buildStateContext(Stage.EVENT_NOT_ACCEPTED, event, null, getRelayStateMachine(), getState(), null));
225-
return false;
215+
synchronized (lock2) {
216+
return sendEventInternal(event);
226217
}
227-
228-
if (isComplete() || !isRunning()) {
229-
notifyEventNotAccepted(buildStateContext(Stage.EVENT_NOT_ACCEPTED, event, null, getRelayStateMachine(), getState(), null));
230-
return false;
231-
}
232-
boolean accepted = acceptEvent(event);
233-
stateMachineExecutor.execute();
234-
if (!accepted) {
235-
notifyEventNotAccepted(buildStateContext(Stage.EVENT_NOT_ACCEPTED, event, null, getRelayStateMachine(), getState(), null));
236-
}
237-
return accepted;
238218
}
239219

240220
@Override
@@ -555,6 +535,33 @@ public void setForwardedInitialEvent(Message<E> message) {
555535
forwardedInitialEvent = message;
556536
}
557537

538+
private boolean sendEventInternal(Message<E> event) {
539+
if (hasStateMachineError()) {
540+
// TODO: should we throw exception?
541+
notifyEventNotAccepted(buildStateContext(Stage.EVENT_NOT_ACCEPTED, event, null, getRelayStateMachine(), getState(), null));
542+
return false;
543+
}
544+
545+
try {
546+
event = getStateMachineInterceptors().preEvent(event, this);
547+
} catch (Exception e) {
548+
log.info("Event " + event + " threw exception in interceptors, not accepting event");
549+
notifyEventNotAccepted(buildStateContext(Stage.EVENT_NOT_ACCEPTED, event, null, getRelayStateMachine(), getState(), null));
550+
return false;
551+
}
552+
553+
if (isComplete() || !isRunning()) {
554+
notifyEventNotAccepted(buildStateContext(Stage.EVENT_NOT_ACCEPTED, event, null, getRelayStateMachine(), getState(), null));
555+
return false;
556+
}
557+
boolean accepted = acceptEvent(event);
558+
stateMachineExecutor.execute();
559+
if (!accepted) {
560+
notifyEventNotAccepted(buildStateContext(Stage.EVENT_NOT_ACCEPTED, event, null, getRelayStateMachine(), getState(), null));
561+
}
562+
return accepted;
563+
}
564+
558565
private StateMachine<S, E> getRelayStateMachine() {
559566
return relay != null ? relay : this;
560567
}
@@ -758,7 +765,7 @@ protected synchronized boolean acceptEvent(Message<E> message) {
758765
State<S,E> source = transition.getSource();
759766
Trigger<S, E> trigger = transition.getTrigger();
760767

761-
if (StateMachineUtils.containsAtleastOne(source.getIds(), currentState.getIds())) {
768+
if (currentState != null && StateMachineUtils.containsAtleastOne(source.getIds(), currentState.getIds())) {
762769
if (trigger != null && trigger.evaluate(new DefaultTriggerContext<S, E>(message.getPayload()))) {
763770
stateMachineExecutor.queueEvent(message);
764771
return true;
@@ -924,11 +931,18 @@ private State<S, E> findDeepParent(State<S, E> state) {
924931
return null;
925932
}
926933

927-
synchronized void setCurrentState(State<S, E> state, Message<E> message, Transition<S, E> transition, boolean exit, StateMachine<S, E> stateMachine) {
934+
void setCurrentState(State<S, E> state, Message<E> message, Transition<S, E> transition, boolean exit, StateMachine<S, E> stateMachine) {
928935
setCurrentState(state, message, transition, exit, stateMachine, null, null);
929936
}
930937

931-
synchronized void setCurrentState(State<S, E> state, Message<E> message, Transition<S, E> transition, boolean exit,
938+
void setCurrentState(State<S, E> state, Message<E> message, Transition<S, E> transition, boolean exit,
939+
StateMachine<S, E> stateMachine, Collection<State<S, E>> sources, Collection<State<S, E>> targets) {
940+
synchronized (lock2) {
941+
setCurrentStateInternal(state, message, transition, exit, stateMachine, sources, targets);
942+
}
943+
}
944+
945+
private void setCurrentStateInternal(State<S, E> state, Message<E> message, Transition<S, E> transition, boolean exit,
932946
StateMachine<S, E> stateMachine, Collection<State<S, E>> sources, Collection<State<S, E>> targets) {
933947
State<S, E> findDeep = findDeepParent(state);
934948
boolean isTargetSubOf = false;

spring-statemachine-core/src/main/java/org/springframework/statemachine/support/DefaultStateMachineExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ private void handleInitialTrans(Transition<S, E> tran, Message<E> queuedMessage)
263263
stateMachineExecutorTransit.transit(tran, stateContext, queuedMessage);
264264
}
265265

266-
private synchronized void scheduleEventQueueProcessing() {
266+
private void scheduleEventQueueProcessing() {
267267
TaskExecutor executor = getTaskExecutor();
268268
if (executor == null) {
269269
return;

spring-statemachine-core/src/test/java/org/springframework/statemachine/action/ActionAndTimerTests.java

Lines changed: 90 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016 the original author or authors.
2+
* Copyright 2016-2017 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,6 +27,8 @@
2727
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
2828
import org.springframework.context.annotation.Bean;
2929
import org.springframework.context.annotation.Configuration;
30+
import org.springframework.scheduling.TaskScheduler;
31+
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
3032
import org.springframework.statemachine.AbstractStateMachineTests;
3133
import org.springframework.statemachine.StateContext;
3234
import org.springframework.statemachine.StateMachine;
@@ -53,15 +55,48 @@ public void testExitActionWithTimerOnce() throws Exception {
5355
assertThat(machine.getState().getIds(), containsInAnyOrder(TestStates.S1));
5456
machine.sendEvent(TestEvents.E1);
5557
assertThat(machine.getState().getIds(), containsInAnyOrder(TestStates.S2));
56-
// sleep so that action with timerOnce(1000) is fired before event is send
57-
// event sending is happening on a main thread, but actions are executed on a same
58-
// pool than the DefaultStateMachineExecutor is using. existing action execution is interrupted.
59-
Thread.sleep(2000);
58+
59+
assertThat(testTimerAction.latch.await(4, TimeUnit.SECONDS), is(true));
60+
assertThat(testTimerAction.e, nullValue());
61+
62+
// need to sleep for TimerTrigger not causing
63+
// next event to get handled with threads, thus
64+
// causing interrupt
65+
Thread.sleep(1000);
66+
6067
machine.sendEvent(TestEvents.E2);
6168
assertThat(testListener.s3EnteredLatch.await(2, TimeUnit.SECONDS), is(true));
6269
assertThat(machine.getState().getIds(), containsInAnyOrder(TestStates.S3));
63-
assertThat(testTimerAction.latch.await(2, TimeUnit.SECONDS), is(true));
70+
assertThat(testExitAction.latch.await(2, TimeUnit.SECONDS), is(true));
71+
assertThat(testExitAction.e, nullValue());
72+
}
73+
74+
@SuppressWarnings("unchecked")
75+
@Test
76+
public void testExitActionWithTimerOnceThreadPoolTaskScheduler() throws Exception {
77+
context.register(Config2.class);
78+
context.refresh();
79+
StateMachine<TestStates, TestEvents> machine = context.getBean(StateMachine.class);
80+
TestTimerAction testTimerAction = context.getBean(TestTimerAction.class);
81+
TestExitAction testExitAction = context.getBean(TestExitAction.class);
82+
TestListener testListener = new TestListener();
83+
machine.addStateListener(testListener);
84+
machine.start();
85+
assertThat(machine.getState().getIds(), containsInAnyOrder(TestStates.S1));
86+
machine.sendEvent(TestEvents.E1);
87+
assertThat(machine.getState().getIds(), containsInAnyOrder(TestStates.S2));
88+
89+
assertThat(testTimerAction.latch.await(4, TimeUnit.SECONDS), is(true));
6490
assertThat(testTimerAction.e, nullValue());
91+
92+
// need to sleep for TimerTrigger not causing
93+
// next event to get handled with threads, thus
94+
// causing interrupt
95+
Thread.sleep(1000);
96+
97+
machine.sendEvent(TestEvents.E2);
98+
assertThat(testListener.s3EnteredLatch.await(2, TimeUnit.SECONDS), is(true));
99+
assertThat(machine.getState().getIds(), containsInAnyOrder(TestStates.S3));
65100
assertThat(testExitAction.latch.await(2, TimeUnit.SECONDS), is(true));
66101
assertThat(testExitAction.e, nullValue());
67102
}
@@ -109,6 +144,55 @@ public TestTimerAction testTimerAction() {
109144
}
110145
}
111146

147+
@Configuration
148+
@EnableStateMachine
149+
static class Config2 extends EnumStateMachineConfigurerAdapter<TestStates, TestEvents> {
150+
151+
@Override
152+
public void configure(StateMachineStateConfigurer<TestStates, TestEvents> states) throws Exception {
153+
states
154+
.withStates()
155+
.initial(TestStates.S1)
156+
.state(TestStates.S2, null, testExitAction())
157+
.state(TestStates.S3);
158+
}
159+
160+
@Override
161+
public void configure(StateMachineTransitionConfigurer<TestStates, TestEvents> transitions) throws Exception {
162+
transitions
163+
.withExternal()
164+
.source(TestStates.S1)
165+
.target(TestStates.S2)
166+
.event(TestEvents.E1)
167+
.and()
168+
.withExternal()
169+
.source(TestStates.S2)
170+
.target(TestStates.S3)
171+
.event(TestEvents.E2)
172+
.and()
173+
.withInternal()
174+
.source(TestStates.S2)
175+
.action(testTimerAction())
176+
.timerOnce(1000);
177+
}
178+
179+
@Bean
180+
public TaskScheduler taskScheduler() {
181+
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
182+
return taskScheduler;
183+
}
184+
185+
@Bean
186+
public TestExitAction testExitAction() {
187+
return new TestExitAction();
188+
}
189+
190+
@Bean
191+
public TestTimerAction testTimerAction() {
192+
return new TestTimerAction();
193+
}
194+
}
195+
112196
private static class TestListener extends StateMachineListenerAdapter<TestStates, TestEvents> {
113197

114198
volatile CountDownLatch s3EnteredLatch = new CountDownLatch(1);

0 commit comments

Comments
 (0)