Skip to content

Commit 9022fce

Browse files
committed
New module implementing a multi-everything queue.
1 parent 9ee7e15 commit 9022fce

File tree

1 file changed

+122
-0
lines changed

1 file changed

+122
-0
lines changed

Lib/Queue.py

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
# A multi-producer, multi-consumer queue.
2+
3+
Empty = 'Queue.Empty' # Exception raised by get_nowait()
4+
5+
class Queue:
6+
7+
# Initialize a queue object with a given maximum size
8+
# (If maxsize is <= 0, the maximum size is infinite)
9+
def init(self, maxsize):
10+
import thread
11+
self._init(maxsize)
12+
self.mutex = thread.allocate_lock()
13+
self.esema = thread.allocate_lock()
14+
self.esema.acquire_lock()
15+
self.fsema = thread.allocate_lock()
16+
return self
17+
18+
# Get an approximation of the queue size (not reliable!)
19+
def qsize(self):
20+
self.mutex.acquire_lock()
21+
n = self._qsize()
22+
self.mutex.release_lock()
23+
return n
24+
25+
# Check if the queue is empty (not reliable!)
26+
def empty(self):
27+
self.mutex.acquire_lock()
28+
n = self._empty()
29+
self.mutex.release_lock()
30+
return n
31+
32+
# Check if the queue is full (not reliable!)
33+
def full(self):
34+
self.mutex.acquire_lock()
35+
n = self._full()
36+
self.mutex.release_lock()
37+
return n
38+
39+
# Put a new item into the queue
40+
def put(self, item):
41+
self.fsema.acquire_lock()
42+
self.mutex.acquire_lock()
43+
was_empty = self._empty()
44+
self._put(item)
45+
if was_empty:
46+
self.esema.release_lock()
47+
if not self._full():
48+
self.fsema.release_lock()
49+
self.mutex.release_lock()
50+
51+
# Get an item from the queue,
52+
# blocking if necessary until one is available
53+
def get(self):
54+
self.esema.acquire_lock()
55+
self.mutex.acquire_lock()
56+
was_full = self._full()
57+
item = self._get()
58+
if was_full:
59+
self.fsema.release_lock()
60+
if not self._empty():
61+
self.esema.release_lock()
62+
self.mutex.release_lock()
63+
return item
64+
65+
# Get an item from the queue if one is immediately available,
66+
# raise Empty if the queue is empty or temporarily unavailable
67+
def get_nowait(self):
68+
locked = self.esema.acquire_lock(0)
69+
self.mutex.acquire_lock()
70+
if self._empty():
71+
# The queue is empyt -- we can't have esema
72+
self.mutex.release_lock()
73+
raise Empty
74+
if not locked:
75+
locked = self.esema.acquire_lock(0)
76+
if not locked:
77+
# Somebody else has esema
78+
# but we have mutex --
79+
# go out of their way
80+
self.mutex.release_lock()
81+
raise Empty
82+
was_full = self._full()
83+
item = self._get()
84+
if was_full:
85+
self.fsema.release_lock()
86+
if not self._empty():
87+
self.esema.release_lock()
88+
self.mutex.release_lock()
89+
return item
90+
91+
# XXX Need to define put_nowait() as well.
92+
93+
94+
# Override these methods to implement other queue organizations
95+
# (e.g. stack or priority queue).
96+
# These will only be called with appropriate locks held
97+
98+
# Initialize the queue representation
99+
def _init(self, maxsize):
100+
self.maxsize = maxsize
101+
self.queue = []
102+
103+
def _qsize(self):
104+
return len(self.queue)
105+
106+
# Check wheter the queue is empty
107+
def _empty(self):
108+
return not self.queue
109+
110+
# Check whether the queue is full
111+
def _full(self):
112+
return self.maxsize > 0 and len(self.queue) == self.maxsize
113+
114+
# Put a new item in the queue
115+
def _put(self, item):
116+
self.queue.append(item)
117+
118+
# Get an item from the queue
119+
def _get(self):
120+
item = self.queue[0]
121+
del self.queue[0]
122+
return item

0 commit comments

Comments
 (0)