Skip to content

Commit be57ae0

Browse files
feat(wq): non interruptable and non blocking
Signed-off-by: Anhad Singh <[email protected]>
1 parent bf0f496 commit be57ae0

File tree

13 files changed

+186
-80
lines changed

13 files changed

+186
-80
lines changed

src/aero_kernel/src/drivers/tty/vtty.rs

+7-6
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use crate::mem::paging::VirtAddr;
3232
use crate::userland::scheduler;
3333
use crate::userland::task::Task;
3434
use crate::userland::terminal::TerminalDevice;
35-
use crate::utils::sync::{Mutex, WaitQueue};
35+
use crate::utils::sync::{Mutex, WaitQueue, WaitQueueFlags};
3636

3737
#[cfg(target_arch = "x86_64")]
3838
use crate::drivers::keyboard::KeyCode;
@@ -247,11 +247,12 @@ impl INodeInterface for Tty {
247247
self.connected.fetch_sub(1, Ordering::SeqCst);
248248
}
249249

250-
fn read_at(&self, _flags: OpenFlags, _offset: usize, buffer: &mut [u8]) -> fs::Result<usize> {
251-
self.block_queue
252-
.block_on(&self.stdin, |future| future.is_complete())?;
253-
254-
let mut stdin = self.stdin.lock_irq();
250+
fn read_at(&self, flags: OpenFlags, _offset: usize, buffer: &mut [u8]) -> fs::Result<usize> {
251+
let mut stdin = self.block_queue.wait(
252+
WaitQueueFlags::from(flags) | WaitQueueFlags::DISABLE_IRQ,
253+
&self.stdin,
254+
|future| future.is_complete(),
255+
)?;
255256

256257
// record the back buffer size before swapping
257258
stdin.swap_buffer();

src/aero_kernel/src/fs/eventfd.rs

+2-7
Original file line numberDiff line numberDiff line change
@@ -54,19 +54,14 @@ impl INodeInterface for EventFd {
5454
Ok(None)
5555
}
5656

57-
fn read_at(
58-
&self,
59-
_flags: OpenFlags,
60-
_offset: usize,
61-
buffer: &mut [u8],
62-
) -> super::Result<usize> {
57+
fn read_at(&self, flags: OpenFlags, _offset: usize, buffer: &mut [u8]) -> super::Result<usize> {
6358
let size = core::mem::size_of::<u64>();
6459
assert!(buffer.len() >= size);
6560

6661
// SAFETY: We have above verified that it is safe to dereference
6762
// the value.
6863
let value = unsafe { &mut *(buffer.as_mut_ptr().cast::<u64>()) };
69-
let mut count = self.wq.block_on(&self.count, |e| **e != 0)?;
64+
let mut count = self.wq.wait(flags.into(), &self.count, |e| **e != 0)?;
7065

7166
*value = *count;
7267
*count = 0; // reset the counter

src/aero_kernel/src/fs/pipe.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use alloc::sync::Arc;
2222
use spin::Once;
2323

2424
use crate::utils::buffer::Buffer;
25-
use crate::utils::sync::{Mutex, WaitQueue};
25+
use crate::utils::sync::{Mutex, WaitQueue, WaitQueueFlags};
2626

2727
use super::cache::DirCacheItem;
2828
use super::file_table::FileHandle;
@@ -84,7 +84,7 @@ impl INodeInterface for Pipe {
8484
return Err(FileSystemError::WouldBlock);
8585
}
8686

87-
let mut buffer = self.readers.block_on(&self.queue, |lock| {
87+
let mut buffer = self.readers.wait(flags.into(), &self.queue, |lock| {
8888
lock.has_data() || self.active_writers() == 0
8989
})?;
9090

src/aero_kernel/src/main.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
cfg_match, // https://github.com/rust-lang/rust/issues/115585
4646
associated_type_defaults,
4747
new_zeroed_alloc, // https://github.com/rust-lang/rust/issues/129396
48-
sync_unsafe_cell
48+
sync_unsafe_cell,
4949
)]
5050
// TODO(andypython): can we remove the dependency of "prelude_import" and "lang_items"?
5151
// `lang_items` => is currently used for the personality function (`rust_eh_personality`).
@@ -59,6 +59,7 @@
5959
#![reexport_test_harness_main = "test_main"]
6060
#![warn(clippy::needless_pass_by_value)]
6161
#![deny(clippy::ptr_as_ptr)]
62+
#![allow(binary_asm_labels)]
6263

6364
#[macro_use]
6465
extern crate aero_proc;
@@ -214,7 +215,7 @@ fn kernel_dbg_thread() {
214215

215216
use crate::drivers::uart::{self, LineStatus, COM_1};
216217
use crate::userland::task::TaskId;
217-
use crate::utils::sync::WaitQueue;
218+
use crate::utils::sync::{WaitQueue, WaitQueueFlags};
218219

219220
uart::setup_interrupts();
220221

@@ -229,7 +230,7 @@ fn kernel_dbg_thread() {
229230

230231
loop {
231232
let mut com_1 = input_wq
232-
.block_on(com_1, |com_1| {
233+
.wait(WaitQueueFlags::empty(), com_1, |com_1| {
233234
com_1.line_status().contains(LineStatus::INPUT_FULL)
234235
})
235236
.unwrap();

src/aero_kernel/src/socket/netlink.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ impl INodeInterface for NetLinkSocket {
221221

222222
fn recv(
223223
&self,
224-
_fd_flags: OpenFlags,
224+
fd_flags: OpenFlags,
225225
message_hdr: &mut MessageHeader,
226226
flags: socket::MessageFlags,
227227
) -> fs::Result<usize> {
@@ -239,7 +239,7 @@ impl INodeInterface for NetLinkSocket {
239239

240240
let mut queue = self
241241
.recv_wq
242-
.block_on(&self.recv_queue, |queue| !queue.is_empty())?;
242+
.wait(fd_flags.into(), &self.recv_queue, |queue| !queue.is_empty())?;
243243

244244
let mut bytes_copied = 0;
245245
dbg!(message_hdr.iovecs_mut());

src/aero_kernel/src/socket/tcp.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use crate::fs::{self, FileSystemError};
3232
use crate::net;
3333
use crate::net::shim::PacketSend;
3434
use crate::net::{tcp, NetworkDevice};
35-
use crate::utils::sync::{Mutex, WaitQueue};
35+
use crate::utils::sync::{Mutex, WaitQueue, WaitQueueFlags};
3636

3737
// ./aero.py -- -netdev user,id=mynet0 -device e1000,netdev=mynet0,id=ck_nic0 -object
3838
// filter-dump,id=mynet0,netdev=mynet0,file=qemulog.log
@@ -97,7 +97,7 @@ impl TcpSocket {
9797
Err(TcpError::WouldBlock) => {
9898
drop(tcp);
9999

100-
let mut socket = self.wq.block_on(&self.tcp, |tcp| {
100+
let mut socket = self.wq.wait(flags.into(), &self.tcp, |tcp| {
101101
tcp.as_ref()
102102
.is_none_or(|socket| !socket.recv_queue.is_empty())
103103
})?;
@@ -145,7 +145,8 @@ impl INodeInterface for TcpSocket {
145145
*tcp = Some(socket);
146146
}
147147

148-
let _ = self.wq.block_on(&self.tcp, |x| {
148+
// FIXME: connect() should pass the fd.
149+
let _ = self.wq.wait(WaitQueueFlags::empty(), &self.tcp, |x| {
149150
x.as_ref().unwrap().state() == State::Established
150151
});
151152

src/aero_kernel/src/socket/udp.rs

+4-5
Original file line numberDiff line numberDiff line change
@@ -187,17 +187,16 @@ impl INodeInterface for UdpSocket {
187187

188188
fn recv(
189189
&self,
190-
_fd_flags: OpenFlags,
190+
fd_flags: OpenFlags,
191191
message_hdr: &mut MessageHeader,
192192
_flags: MessageFlags,
193193
) -> fs::Result<usize> {
194194
// assert!(flags.is_empty());
195195

196-
if self.inner.lock_irq().incoming.is_empty() && self.is_non_block() {
197-
return Err(FileSystemError::WouldBlock);
198-
}
196+
let mut this = self
197+
.wq
198+
.wait(fd_flags.into(), &self.inner, |e| !e.incoming.is_empty())?;
199199

200-
let mut this = self.wq.block_on(&self.inner, |e| !e.incoming.is_empty())?;
201200
let packet = this.incoming.pop().expect("recv: someone was greedy");
202201

203202
let mut data = packet.as_slice().to_vec();

src/aero_kernel/src/socket/unix.rs

+14-15
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use crate::fs::inode::{DirEntry, FileType, INodeInterface, Metadata, PollFlags,
3232
use crate::fs::{FileSystemError, Path};
3333

3434
use crate::mem::paging::VirtAddr;
35-
use crate::utils::sync::{Mutex, WaitQueue};
35+
use crate::utils::sync::{Mutex, WaitQueue, WaitQueueFlags};
3636

3737
use super::SocketAddrRef;
3838

@@ -243,14 +243,11 @@ impl INodeInterface for UnixSocket {
243243
_offset: usize,
244244
user_buffer: &mut [u8],
245245
) -> fs::Result<usize> {
246-
if self.buffer.lock_irq().is_empty() && flags.is_nonblock() {
247-
return Err(FileSystemError::WouldBlock);
248-
}
249-
250-
let mut buffer = self.wq.block_on(&self.buffer, |e| !e.is_empty())?;
246+
let mut buf = self
247+
.wq
248+
.wait(flags.into(), &self.buffer, |e| !e.is_empty())?;
251249

252-
let read = buffer.read(user_buffer);
253-
Ok(read)
250+
Ok(buf.read(user_buffer))
254251
}
255252

256253
fn write_at(&self, _offset: usize, buffer: &[u8]) -> fs::Result<usize> {
@@ -325,12 +322,16 @@ impl INodeInterface for UnixSocket {
325322
target.wq.notify_all();
326323
core::mem::drop(itarget); // release the lock
327324

328-
let _ = self.wq.block_on(&self.inner, |e| e.state.is_connected())?;
325+
// FIXME: connect() should pass fd.
326+
let _ = self.wq.wait(WaitQueueFlags::empty(), &self.inner, |e| {
327+
e.state.is_connected()
328+
})?;
329329
Ok(())
330330
}
331331

332332
fn accept(&self, address: Option<(VirtAddr, &mut u32)>) -> fs::Result<Arc<UnixSocket>> {
333-
let mut inner = self.wq.block_on(&self.inner, |e| {
333+
// TODO: accept
334+
let mut inner = self.wq.wait(WaitQueueFlags::empty(), &self.inner, |e| {
334335
e.state.queue().is_some_and(|x| !x.is_empty())
335336
})?;
336337

@@ -387,11 +388,9 @@ impl INodeInterface for UnixSocket {
387388
_ => return Err(FileSystemError::NotConnected),
388389
};
389390

390-
if self.buffer.lock_irq().is_empty() && fd_flags.is_nonblock() {
391-
return Err(FileSystemError::WouldBlock);
392-
}
393-
394-
let mut buffer = self.wq.block_on(&self.buffer, |e| !e.is_empty())?;
391+
let mut buffer = self
392+
.wq
393+
.wait(fd_flags.into(), &self.buffer, |e| !e.is_empty())?;
395394

396395
if let Some(addr) = header.name_mut::<SocketAddrUnix>() {
397396
*addr = peer.inner.lock_irq().address.as_ref().cloned().unwrap();

src/aero_kernel/src/syscall/fs.rs

+3-8
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ impl fmt::Display for FileDescriptor {
5656
let path = file_handle.inode.absolute_path();
5757
write!(f, "{{ {} -> {} }}", self.0, path)
5858
} else {
59-
// invalid file descriptor
6059
write!(f, "{{ {} -> INVALID }}", self.0)
6160
}
6261
}
@@ -164,10 +163,7 @@ pub fn getdents(fd: FileDescriptor, buffer: &mut [u8]) -> Result<usize, SyscallE
164163

165164
#[syscall]
166165
pub fn close(fd: FileDescriptor) -> Result<usize, SyscallError> {
167-
let res = scheduler::get_scheduler()
168-
.current_task()
169-
.file_table
170-
.close_file(fd.into());
166+
let res = scheduler::current_thread().file_table.close_file(fd.into());
171167

172168
if res {
173169
Ok(0)
@@ -218,11 +214,10 @@ pub fn mkdirat(dfd: usize, path: &Path) -> Result<usize, SyscallError> {
218214
// pathname is interpreted relative to the current working directory of the
219215
// calling task.
220216
if dfd as isize == aero_syscall::AT_FDCWD {
221-
let cwd = scheduler::get_scheduler().current_task().cwd_dirent();
217+
let cwd = scheduler::current_thread().cwd_dirent();
222218
(cwd.inode(), path.as_str())
223219
} else {
224-
let handle = scheduler::get_scheduler()
225-
.current_task()
220+
let handle = scheduler::current_thread()
226221
.file_table
227222
.get_handle(dfd)
228223
.ok_or(SyscallError::EBADFD)?;

src/aero_kernel/src/syscall/ipc.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use crate::userland::scheduler::get_scheduler;
1919
use crate::userland::task::TaskId;
2020

21-
use crate::utils::sync::{Mutex, WaitQueue};
21+
use crate::utils::sync::{Mutex, WaitQueue, WaitQueueFlags};
2222

2323
use aero_syscall::SyscallError;
2424
use alloc::collections::VecDeque;
@@ -103,7 +103,9 @@ pub fn recv(pid_ptr: &mut usize, output: &mut [u8], block: usize) -> Result<usiz
103103
let mq = &current.message_queue;
104104
let mut our_queue = mq
105105
.blockqueue
106-
.block_on(&mq.queue, |msg| msg.front().is_some())
106+
.wait(WaitQueueFlags::empty(), &mq.queue, |msg| {
107+
msg.front().is_some()
108+
})
107109
.unwrap();
108110

109111
let msg = our_queue

src/aero_kernel/src/userland/task/mod.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use crate::arch::task::ArchTask;
3636
use crate::fs::file_table::FileTable;
3737
use crate::syscall::ipc::MessageQueue;
3838
use crate::syscall::ExecArgs;
39-
use crate::utils::sync::{Mutex, WaitQueue};
39+
use crate::utils::sync::{Mutex, WaitQueue, WaitQueueError, WaitQueueFlags};
4040

4141
use crate::userland::signals::Signals;
4242

@@ -142,10 +142,10 @@ impl Zombies {
142142
pids: &[usize],
143143
status: &mut u32,
144144
flags: WaitPidFlags,
145-
) -> SignalResult<usize> {
145+
) -> Result<usize, WaitQueueError> {
146146
let mut captured = None;
147147

148-
self.block.block_on(&self.list, |l| {
148+
self.block.wait(WaitQueueFlags::empty(), &self.list, |l| {
149149
let mut cursor = l.front_mut();
150150

151151
while let Some(t) = cursor.get() {
@@ -488,7 +488,7 @@ impl Task {
488488
pid: isize,
489489
status: &mut u32,
490490
flags: WaitPidFlags,
491-
) -> SignalResult<usize> {
491+
) -> Result<usize, WaitQueueError> {
492492
if pid == -1 {
493493
// wait for any child process if no specific process is requested.
494494
//

src/aero_kernel/src/userland/terminal.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ use alloc::vec::Vec;
2222

2323
use spin::RwLock;
2424

25+
use crate::fs;
2526
use crate::fs::inode::INodeInterface;
26-
use crate::utils::sync::{Mutex, WaitQueue};
27+
use crate::utils::sync::{Mutex, WaitQueue, WaitQueueFlags};
2728

2829
use super::signals::SignalError;
2930
use super::task::sessions::{Group, SESSIONS};
@@ -114,8 +115,10 @@ impl LineDiscipline {
114115
*self.termios.lock() = termios;
115116
}
116117

117-
pub fn read(&self, target: &mut [u8]) -> Result<usize, SignalError> {
118-
let mut buffer = self.wq.block_on(&self.buffer, |buf| !buf.is_empty())?;
118+
pub fn read(&self, target: &mut [u8]) -> fs::Result<usize> {
119+
let mut buffer = self
120+
.wq
121+
.wait(WaitQueueFlags::empty(), &self.buffer, |buf| !buf.is_empty())?;
119122

120123
let size = core::cmp::min(target.len(), buffer.len());
121124
target[..size].copy_from_slice(&buffer.drain(..size).collect::<Vec<_>>());

0 commit comments

Comments
 (0)