Skip to content

Commit 65f580a

Browse files
committed
[update]add dpub support and add more easy way to initiate consumer and producer
1 parent 999b537 commit 65f580a

14 files changed

+227
-67
lines changed

.DS_Store

0 Bytes
Binary file not shown.

.gitignore

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ __pycache__/
44

55
# C extensions
66
*.so
7-
7+
*_bak.py
88
# Distribution / packaging
99
.Python
1010
env/

.travis.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
language: python
22
python:
3-
- 3.2
4-
- 3.4
3+
- 3.5
4+
- 3.6
55

66
env:
77
- NSQ_DOWNLOAD=nsq-0.2.30.linux-amd64.go1.2.1

README.md

+42-8
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,54 @@
11
asyncnsq (work in progress,仍然开发中)
22
=========================
3+
asyncnsq with python3.6 await/async supported
34

4-
implement python3.5+ async/await syntax support
5-
6-
提供python3.5+ async/await 语法支持。 原项目,支持yield from 生成器语法。
7-
8-
this project is forked from jettify/aionsq, he is the original author
5+
Latest Updates
6+
--------------
97

10-
本项目fork了 jettify/aionsq, 他是原作者,但是很久没有更新了。
8+
support dpub
119

1210
Usage examples
1311
--------------
14-
you can refer from examples.
1512

16-
你可以查看examples文件夹中的例子。
13+
All you need is a loop, then enjoy
14+
15+
Consumer:
16+
17+
loop = asyncio.get_event_loop()
18+
19+
async def go():
20+
nsq_consumer = await create_nsq_consumer(host='tcp://127.0.0.1:4150',
21+
max_in_flight=200)
22+
await nsq_consumer.subscribe('test_async_nsq', 'nsq')
23+
for waiter in nsq_consumer.wait_messages():
24+
message = await waiter
25+
print(message.body)
26+
await message.fin()
27+
28+
loop.run_until_complete(go())
29+
30+
Producer:
31+
32+
loop = asyncio.get_event_loop()
33+
34+
async def go():
35+
nsq_producer = await create_nsq_producer(host='127.0.0.1', port=4150,
36+
heartbeat_interval=30000,
37+
feature_negotiation=True,
38+
tls_v1=True,
39+
snappy=False,
40+
deflate=False,
41+
deflate_level=0,
42+
loop=loop)
43+
for i in range(10):
44+
await nsq_producer.pub('test_async_nsq', 'test_async_nsq:{i}'.format(i=i))
45+
await nsq_producer.dpub('test_async_nsq', i * 1000,
46+
'test_delay_async_nsq:{i}'.format(i=i))
47+
loop.run_until_complete(go())
48+
49+
you can use host like 'tcp://127.0.0.1:4150' or '127.0.0.1' with port=4150
1750

51+
As for now, only single nsqd addr supported.
1852

1953
Requirements
2054
------------

asyncnsq/__init__.py

+55-1
Original file line numberDiff line numberDiff line change
@@ -1 +1,55 @@
1-
__version__ = '0.2.1'
1+
__version__ = '0.3.0'
2+
3+
import asyncio
4+
from .utils import get_host_and_port
5+
from .nsq import Nsq
6+
from .consumer import NsqConsumer
7+
8+
9+
async def create_nsq_producer(host='127.0.0.1', port=4150, loop=None, queue=None,
10+
heartbeat_interval=30000, feature_negotiation=True,
11+
tls_v1=False, snappy=False, deflate=False, deflate_level=6,
12+
consumer=False, sample_rate=0):
13+
""""
14+
initial function to get producer
15+
param: host: host addr with no protocol. 127.0.0.1
16+
param: port: host port
17+
param: queue: queue where all the msg been put from the nsq
18+
param: heartbeat_interval: heartbeat interval with nsq, set -1 to disable nsq heartbeat check
19+
params: snappy: snappy compress
20+
params: deflate: deflate compress can't set True both with snappy
21+
"""
22+
# TODO: add parameters type and value validation
23+
host, tmp_port = get_host_and_port(host)
24+
if not port:
25+
port = tmp_port
26+
loop = loop or asyncio.get_event_loop()
27+
queue = queue or asyncio.Queue(loop=loop)
28+
conn = Nsq(host=host, port=port, queue=queue,
29+
heartbeat_interval=heartbeat_interval,
30+
feature_negotiation=feature_negotiation,
31+
tls_v1=tls_v1, snappy=snappy, deflate=deflate,
32+
deflate_level=deflate_level,
33+
sample_rate=sample_rate, consumer=consumer, loop=loop)
34+
await conn.connect()
35+
return conn
36+
37+
38+
async def create_nsq_consumer(host='127.0.0.1', port=None, loop=None, max_in_flight=42, lookupd_http_addresses=None):
39+
""""
40+
initial function to get consumer
41+
param: host: host addr with no protocol. 127.0.0.1
42+
param: port: host port
43+
param: max_in_flight: number of messages get but not finish or req
44+
param: lookupd_http_addresses: heartbeat interval with nsq, set -1 to disable nsq heartbeat check
45+
"""
46+
# TODO: add parameters type and value validation
47+
host, tmp_port = get_host_and_port(host)
48+
if not port:
49+
port = tmp_port or 4150
50+
loop = loop or asyncio.get_event_loop()
51+
conn = NsqConsumer(nsqd_tcp_addresses=[(host, port)],
52+
lookupd_http_addresses=lookupd_http_addresses,
53+
max_in_flight=max_in_flight, loop=loop)
54+
await conn.connect()
55+
return conn

asyncnsq/connection.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,8 @@ async def _read_data(self):
179179
is_canceled = False
180180
while not self._reader.at_eof():
181181
try:
182-
data = await self._reader.read(consts.MAX_CHUNK_SIZE)
182+
data = await self._reader.read(1024)
183+
# print('\n\n', 'socket response data', data)
183184
except asyncio.CancelledError:
184185
is_canceled = True
185186
logger.error('Task is canceled')

asyncnsq/consts.py

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
AUTH = b'AUTH'
3030
SUB = b'SUB'
3131
PUB = b'PUB'
32+
DPUB = b'DPUB'
3233

3334
# connection status
3435
CLOSED = 0

asyncnsq/containers.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ async def fin(self):
3030
"""
3131
if self._is_processed:
3232
raise RuntimeWarning("Message has already been processed")
33-
resp = (await self.conn.execute(FIN, self.message_id))
33+
resp = await self.conn.execute(FIN, self.message_id)
3434
self._is_processed = True
3535
return resp
3636

@@ -53,4 +53,4 @@ async def touch(self):
5353
"""
5454
if self._is_processed:
5555
raise RuntimeWarning("Message has already been processed")
56-
return (await self.conn.execute(TOUCH, self.message_id))
56+
return await self.conn.execute(TOUCH, self.message_id)

asyncnsq/nsq.py

+46-14
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,32 @@
22
from . import consts
33
import time
44
from .log import logger
5-
from .utils import retry_iterator
5+
from .utils import retry_iterator, RdyControl
66
from .connection import create_connection
7-
from .consts import TOUCH, REQ, FIN, RDY, CLS, MPUB, PUB, SUB, AUTH
7+
from .consts import TOUCH, REQ, FIN, RDY, CLS, MPUB, PUB, SUB, AUTH, DPUB
88

99

1010
async def create_nsq(host='127.0.0.1', port=4150, loop=None, queue=None,
1111
heartbeat_interval=30000, feature_negotiation=True,
1212
tls_v1=False, snappy=False, deflate=False, deflate_level=6,
13-
sample_rate=0):
13+
consumer=False, sample_rate=0):
14+
""""
15+
param: host: host addr with no protocol. 127.0.0.1
16+
param: port: host port
17+
param: queue: queue where all the msg been put from the nsq
18+
param: heartbeat_interval: heartbeat interval with nsq, set -1 to disable nsq heartbeat check
19+
params: snappy: snappy compress
20+
params: deflate: deflate compress can't set True both with snappy
21+
"""
1422
# TODO: add parameters type and value validation
23+
loop = loop or asyncio.get_event_loop()
1524
queue = queue or asyncio.Queue(loop=loop)
1625
conn = Nsq(host=host, port=port, queue=queue,
1726
heartbeat_interval=heartbeat_interval,
1827
feature_negotiation=feature_negotiation,
1928
tls_v1=tls_v1, snappy=snappy, deflate=deflate,
2029
deflate_level=deflate_level,
21-
sample_rate=sample_rate, loop=loop)
30+
sample_rate=sample_rate, consumer=consumer, loop=loop)
2231
await conn.connect()
2332
return conn
2433

@@ -28,7 +37,7 @@ class Nsq:
2837
def __init__(self, host='127.0.0.1', port=4150, loop=None, queue=None,
2938
heartbeat_interval=30000, feature_negotiation=True,
3039
tls_v1=False, snappy=False, deflate=False, deflate_level=6,
31-
sample_rate=0):
40+
sample_rate=0, consumer=False, max_in_flight=42):
3241
# TODO: add parameters type and value validation
3342
self._config = {
3443
"deflate": deflate,
@@ -53,6 +62,13 @@ def __init__(self, host='127.0.0.1', port=4150, loop=None, queue=None,
5362

5463
self._on_rdy_changed_cb = None
5564
self._last_rdy = 0
65+
self.consumer = consumer
66+
if self.consumer:
67+
self._idle_timeout = 10
68+
self._max_in_flight = max_in_flight
69+
self._rdy_control = RdyControl(idle_timeout=self._idle_timeout,
70+
max_in_flight=self._max_in_flight,
71+
loop=self._loop)
5672

5773
async def connect(self):
5874
self._conn = await create_connection(self._host, self._port,
@@ -61,6 +77,8 @@ async def connect(self):
6177
self._conn._on_message = self._on_message
6278
await self._conn.identify(**self._config)
6379
self._status = consts.CONNECTED
80+
if self.consumer:
81+
self._rdy_control.add_connection(self._conn)
6482

6583
def _on_message(self, msg):
6684
# should not be coroutine
@@ -102,7 +120,7 @@ async def reconnect(self):
102120
await asyncio.sleep(t, loop=self._loop)
103121

104122
async def execute(self, command, *args, data=None):
105-
if self._state <= consts.CONNECTED and self._reconnect:
123+
if self._status <= consts.CONNECTED and self._reconnect:
106124
await self.reconnect()
107125

108126
response = self._conn.execute(command, *args, data=data)
@@ -113,8 +131,10 @@ def id(self):
113131
return self._conn.endpoint
114132

115133
def wait_messages(self):
134+
# print('wait_messages')
116135
while True:
117136
future = asyncio.ensure_future(self._queue.get(), loop=self._loop)
137+
# print(future)
118138
yield future
119139

120140
async def auth(self, secret):
@@ -123,7 +143,7 @@ async def auth(self, secret):
123143
:param secret:
124144
:return:
125145
"""
126-
return (await self._conn.execute(AUTH, data=secret))
146+
return await self._conn.execute(AUTH, data=secret)
127147

128148
async def sub(self, topic, channel):
129149
"""
@@ -132,7 +152,7 @@ async def sub(self, topic, channel):
132152
:param channel:
133153
:return:
134154
"""
135-
return (await self._conn.execute(SUB, topic, channel))
155+
return await self._conn.execute(SUB, topic, channel)
136156

137157
async def pub(self, topic, message):
138158
"""
@@ -141,7 +161,19 @@ async def pub(self, topic, message):
141161
:param message:
142162
:return:
143163
"""
144-
return (await self._conn.execute(PUB, topic, data=message))
164+
return await self._conn.execute(PUB, topic, data=message)
165+
166+
async def dpub(self, topic, delay_time, message):
167+
"""
168+
169+
:param topic:
170+
:param message:
171+
:param delay_time: delayed time in millisecond
172+
:return:
173+
"""
174+
if not delay_time or delay_time is None:
175+
delay_time = 0
176+
return await self._conn.execute(DPUB, topic, delay_time, data=message)
145177

146178
async def mpub(self, topic, message, *messages):
147179
"""
@@ -152,7 +184,7 @@ async def mpub(self, topic, message, *messages):
152184
:return:
153185
"""
154186
msgs = [message] + list(messages)
155-
return (await self._conn.execute(MPUB, topic, data=msgs))
187+
return await self._conn.execute(MPUB, topic, data=msgs)
156188

157189
async def rdy(self, count):
158190
"""
@@ -165,15 +197,15 @@ async def rdy(self, count):
165197

166198
self._last_rdy = count
167199
self.rdy_state = count
168-
return (await self._conn.execute(RDY, count))
200+
return await self._conn.execute(RDY, count)
169201

170202
async def fin(self, message_id):
171203
"""
172204
173205
:param message_id:
174206
:return:
175207
"""
176-
return (await self._conn.execute(FIN, message_id))
208+
return await self._conn.execute(FIN, message_id)
177209

178210
async def req(self, message_id, timeout):
179211
"""
@@ -182,15 +214,15 @@ async def req(self, message_id, timeout):
182214
:param timeout:
183215
:return:
184216
"""
185-
return (await self._conn.execute(REQ, message_id, timeout))
217+
return await self._conn.execute(REQ, message_id, timeout)
186218

187219
async def touch(self, message_id):
188220
"""
189221
190222
:param message_id:
191223
:return:
192224
"""
193-
return (await self._conn.execute(TOUCH, message_id))
225+
return await self._conn.execute(TOUCH, message_id)
194226

195227
async def cls(self):
196228
"""

asyncnsq/protocol.py

+1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ def gets(self):
6868

6969
def encode_command(self, cmd, *args, data=None):
7070
cmd = self._parser.encode_command(cmd, *args, data=data)
71+
# print(cmd)
7172
return self.compress(cmd)
7273

7374

asyncnsq/utils.py

+20
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,28 @@
22
import random
33
import re
44
import time
5+
from urllib.parse import urlparse
6+
57

68
TOPIC_NAME_RE = re.compile(r'^[\.a-zA-Z0-9_-]+$')
79
CHANNEL_NAME_RE = re.compile(r'^[\.a-zA-Z0-9_-]+(#ephemeral)?$')
810

911

12+
def get_host_and_port(host):
13+
host_parsed = urlparse(host)
14+
if host_parsed.scheme == 'tcp':
15+
result = host_parsed.netloc
16+
elif host_parsed.scheme == '':
17+
result = host_parsed.path
18+
else:
19+
result = host
20+
result = result.split(':')
21+
if len(result) == 2:
22+
return result[0], result[-1]
23+
else:
24+
return result[0], None
25+
26+
1027
def valid_topic_name(topic):
1128
if not 0 < len(topic) < 33:
1229
return False
@@ -113,6 +130,9 @@ def add_connections(self, connections):
113130
for conn in self._connections.values():
114131
conn._on_rdy_changed_cb = self.rdy_changed
115132

133+
def add_connection(self, connection):
134+
connection._on_rdy_changed_cb = self.rdy_changed
135+
116136
def rdy_changed(self, conn_id):
117137
self._cmd_queue.put_nowait((CHANGE_CONN_RDY, (conn_id,)))
118138

0 commit comments

Comments
 (0)