Skip to content
This repository was archived by the owner on Nov 1, 2023. It is now read-only.

Link VMSS nodes and tasks when setting up #43

Merged
merged 26 commits into from
Sep 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e40ed93
Refactor state update events
ranweiler Sep 23, 2020
9a2a968
Update models, event handler
ranweiler Sep 23, 2020
134cd38
Fix worker event enum model
ranweiler Sep 23, 2020
8228137
Fix enum repr of worker event
ranweiler Sep 23, 2020
10df008
Fix up comment
ranweiler Sep 23, 2020
3c978f0
Merge remote-tracking branch 'microsoft/main' into event-types
ranweiler Sep 28, 2020
954705a
Add event shim to support hot upgrades
ranweiler Sep 28, 2020
05e8aa8
Fix imports
ranweiler Sep 28, 2020
f06aaf2
Support data for `setting_up` state update
ranweiler Sep 29, 2020
a874c23
Support agent-side `setting_up` event data
ranweiler Sep 29, 2020
2906198
Pass entire state update to handler
ranweiler Sep 29, 2020
b2c0ebb
Add `setting_up` state to `Task`, `NodeTasks`
ranweiler Sep 29, 2020
2984059
Set task, node tasks `setting_up` state
ranweiler Sep 29, 2020
f81c8ea
Add comments
ranweiler Sep 29, 2020
9bff106
Merge branch 'main' into setting-up
bmc-msft Sep 29, 2020
afa1e68
Merge remote-tracking branch 'microsoft/main' into setting-up
ranweiler Sep 29, 2020
b18ffad
Merge branch 'setting-up' of github.com:ranweiler/onefuzz into settin…
ranweiler Sep 29, 2020
c8e83f4
Add missing return
ranweiler Sep 29, 2020
7a75dc7
Linting
ranweiler Sep 29, 2020
54c0373
Fix node state num
ranweiler Sep 29, 2020
7b6f00d
Emit new style of state update events
ranweiler Sep 29, 2020
7017d87
Format
ranweiler Sep 29, 2020
41f62eb
Run formatter
ranweiler Sep 29, 2020
6ddc054
Don't update task state if running
ranweiler Sep 29, 2020
63f4200
Merge remote-tracking branch 'microsoft/main' into setting-up
ranweiler Sep 29, 2020
17c7e20
Fix comment widths
ranweiler Sep 29, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions src/agent/onefuzz-supervisor/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ impl Agent {
// `Free`. If it has started up after a work set-requested reboot, the
// state will be `Ready`.
if let Some(Scheduler::Free(..)) = &self.scheduler {
self.coordinator.emit_event(NodeState::Init.into()).await?;
let event = StateUpdateEvent::Init.into();
self.coordinator.emit_event(event).await?;
}

loop {
Expand Down Expand Up @@ -94,7 +95,8 @@ impl Agent {
}

async fn free(&mut self, state: State<Free>) -> Result<Scheduler> {
self.coordinator.emit_event(NodeState::Free.into()).await?;
let event = StateUpdateEvent::Free.into();
self.coordinator.emit_event(event).await?;

let msg = self.work_queue.poll().await?;

Expand Down Expand Up @@ -165,9 +167,9 @@ impl Agent {
async fn setting_up(&mut self, state: State<SettingUp>) -> Result<Scheduler> {
verbose!("agent setting up");

self.coordinator
.emit_event(NodeState::SettingUp.into())
.await?;
let tasks = state.work_set().task_ids();
let event = StateUpdateEvent::SettingUp { tasks };
self.coordinator.emit_event(event.into()).await?;

let scheduler = match state.finish(self.setup_runner.as_mut()).await? {
SetupDone::Ready(s) => s.into(),
Expand All @@ -180,9 +182,8 @@ impl Agent {
async fn pending_reboot(&mut self, state: State<PendingReboot>) -> Result<Scheduler> {
verbose!("agent pending reboot");

self.coordinator
.emit_event(NodeState::Rebooting.into())
.await?;
let event = StateUpdateEvent::Rebooting.into();
self.coordinator.emit_event(event).await?;

let ctx = state.reboot_context();
self.reboot.save_context(ctx).await?;
Expand All @@ -194,13 +195,15 @@ impl Agent {
async fn ready(&mut self, state: State<Ready>) -> Result<Scheduler> {
verbose!("agent ready");

self.coordinator.emit_event(NodeState::Ready.into()).await?;
let event = StateUpdateEvent::Ready.into();
self.coordinator.emit_event(event).await?;

Ok(state.run().await?.into())
}

async fn busy(&mut self, state: State<Busy>) -> Result<Scheduler> {
self.coordinator.emit_event(NodeState::Busy.into()).await?;
let event = StateUpdateEvent::Busy.into();
self.coordinator.emit_event(event).await?;

let mut events = vec![];
let updated = state
Expand All @@ -217,7 +220,8 @@ impl Agent {
async fn done(&mut self, state: State<Done>) -> Result<Scheduler> {
verbose!("agent done");

self.coordinator.emit_event(NodeState::Done.into()).await?;
let event = StateUpdateEvent::Done.into();
self.coordinator.emit_event(event).await?;

// `Done` is a final state.
Ok(state.into())
Expand Down
21 changes: 3 additions & 18 deletions src/agent/onefuzz-supervisor/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ impl From<WorkerEvent> for NodeEvent {
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case", tag = "state")]
#[serde(rename_all = "snake_case", tag = "state", content = "data")]
pub enum StateUpdateEvent {
Init,
Free,
SettingUp,
SettingUp { tasks: Vec<TaskId> },
Rebooting,
Ready,
Busy,
Expand All @@ -95,28 +95,13 @@ impl From<StateUpdateEvent> for NodeEvent {
}
}

impl From<NodeState> for NodeEvent {
fn from(state: NodeState) -> Self {
let event = match state {
NodeState::Init => StateUpdateEvent::Init,
NodeState::Free => StateUpdateEvent::Free,
NodeState::SettingUp => StateUpdateEvent::SettingUp,
NodeState::Rebooting => StateUpdateEvent::Rebooting,
NodeState::Ready => StateUpdateEvent::Ready,
NodeState::Busy => StateUpdateEvent::Busy,
NodeState::Done => StateUpdateEvent::Done,
};

event.into()
}
}

#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum TaskState {
Init,
Waiting,
Scheduled,
SettingUp,
Running,
Stopping,
Stopped,
Expand Down
14 changes: 13 additions & 1 deletion src/agent/onefuzz-supervisor/src/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,19 @@ fn debug_node_event(opt: NodeEventOpt) -> Result<()> {
}

fn debug_node_event_state_update(state: NodeState) -> Result<()> {
let event = state.into();
let event = match state {
NodeState::Init => StateUpdateEvent::Init,
NodeState::Free => StateUpdateEvent::Free,
NodeState::SettingUp => {
let tasks = vec![Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4()];
StateUpdateEvent::SettingUp { tasks }
}
NodeState::Rebooting => StateUpdateEvent::Rebooting,
NodeState::Ready => StateUpdateEvent::Ready,
NodeState::Busy => StateUpdateEvent::Busy,
NodeState::Done => StateUpdateEvent::Done,
};
let event = event.into();
print_json(into_envelope(event))
}

Expand Down
4 changes: 4 additions & 0 deletions src/agent/onefuzz-supervisor/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ impl State<SettingUp> {

Ok(done)
}

pub fn work_set(&self) -> &WorkSet {
&self.ctx.work_set
}
}

impl State<PendingReboot> {
Expand Down
6 changes: 6 additions & 0 deletions src/agent/onefuzz-supervisor/src/work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ pub struct WorkSet {
pub work_units: Vec<WorkUnit>,
}

impl WorkSet {
pub fn task_ids(&self) -> Vec<TaskId> {
self.work_units.iter().map(|w| w.task_id).collect()
}
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct WorkUnit {
/// Job that the work is part of.
Expand Down
38 changes: 36 additions & 2 deletions src/api-service/__app__/agent_events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,47 @@ def get_node_checked(machine_id: UUID) -> Node:
return node


def on_state_update(machine_id: UUID, state: NodeState) -> func.HttpResponse:
def on_state_update(
machine_id: UUID,
state_update: NodeStateUpdate,
) -> func.HttpResponse:
state = state_update.state
node = get_node_checked(machine_id)

if state == NodeState.init or node.state not in NodeState.ready_for_reset():
if node.state != state:
node.state = state
node.save()

if state == NodeState.setting_up:
# This field will be required in the future.
# For now, it is optional for back compat.
if state_update.data:
for task_id in state_update.data.tasks:
task = get_task_checked(task_id)

# The task state may be `running` if it has `vm_count` > 1, and
# another node is concurrently executing the task. If so, leave
# the state as-is, to represent the max progress made.
#
# Other states we would want to preserve are excluded by the
# outermost conditional check.
if task.state != TaskState.running:
task.state = TaskState.setting_up

# We don't yet call `on_start()` for the task.
# This will happen once we see a worker event that
# reports it as `running`.
task.save()

# Note: we set the node task state to `setting_up`, even though
# the task itself may be `running`.
node_task = NodeTasks(
machine_id=machine_id,
task_id=task_id,
state=NodeTaskState.setting_up,
)
node_task.save()
else:
logging.info("ignoring state updates from the node: %s: %s", machine_id, state)

Expand Down Expand Up @@ -133,7 +167,7 @@ def post(req: func.HttpRequest) -> func.HttpResponse:
return not_ok(err, context=ERROR_CONTEXT)

if event.state_update:
return on_state_update(envelope.machine_id, event.state_update.state)
return on_state_update(envelope.machine_id, event.state_update)
elif event.worker_event:
return on_worker_event(envelope.machine_id, event.worker_event)
else:
Expand Down
2 changes: 2 additions & 0 deletions src/pytypes/onefuzztypes/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class TaskState(Enum):
init = "init"
waiting = "waiting"
scheduled = "scheduled"
setting_up = "setting_up"
running = "running"
stopping = "stopping"
stopped = "stopped"
Expand Down Expand Up @@ -286,6 +287,7 @@ class Architecture(Enum):

class NodeTaskState(Enum):
init = "init"
setting_up = "setting_up"
running = "running"


Expand Down
22 changes: 21 additions & 1 deletion src/pytypes/onefuzztypes/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,13 +500,33 @@ class WorkerEvent(EnumModel):
running: Optional[WorkerRunningEvent]


class SettingUpEventData(BaseModel):
tasks: List[UUID]


class NodeStateUpdate(BaseModel):
state: NodeState
data: Optional[SettingUpEventData]

@validator("data")
def check_data(
cls,
data: Optional[SettingUpEventData],
values: Any,
) -> Optional[SettingUpEventData]:
if data:
state = values.get("state")
if state and state != NodeState.setting_up:
raise ValueError(
"data for node state update event does not match state = %s" % state
)

return data


class NodeEvent(EnumModel):
worker_event: Optional[WorkerEvent]
state_update: Optional[NodeStateUpdate]
worker_event: Optional[WorkerEvent]


# Temporary shim type to support hot upgrade of 1.0.0 nodes.
Expand Down