Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Commit 3a5e621

Browse files
lovasoasebito91
authored andcommitted
Add support for messagepack (#734)
* Add support for messagepack * Remove unnecessary blank line Fixes https://github.com/influxdata/influxdb-python/pull/734/files/57daf8ccd5027c796a2fd3934b8e88d3982d300e#r302769403 * Small code reorganization * Small code reorganization Fixes #734 (comment)
1 parent 3d61f1f commit 3a5e621

File tree

3 files changed

+64
-10
lines changed

3 files changed

+64
-10
lines changed

influxdb/client.py

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,14 @@
66
from __future__ import print_function
77
from __future__ import unicode_literals
88

9-
import time
10-
import random
11-
9+
import datetime
1210
import json
11+
import random
1312
import socket
13+
import struct
14+
import time
15+
16+
import msgpack
1417
import requests
1518
import requests.exceptions
1619
from six.moves import xrange
@@ -144,7 +147,7 @@ def __init__(self,
144147

145148
self._headers = {
146149
'Content-Type': 'application/json',
147-
'Accept': 'text/plain'
150+
'Accept': 'application/x-msgpack'
148151
}
149152

150153
@property
@@ -293,13 +296,30 @@ def request(self, url, method='GET', params=None, data=None,
293296
time.sleep((2 ** _try) * random.random() / 100.0)
294297
if not retry:
295298
raise
299+
300+
type_header = response.headers and response.headers.get("Content-Type")
301+
if type_header == "application/x-msgpack" and response.content:
302+
response._msgpack = msgpack.unpackb(
303+
packed=response.content,
304+
ext_hook=_msgpack_parse_hook,
305+
raw=False)
306+
else:
307+
response._msgpack = None
308+
309+
def reformat_error(response):
310+
if response._msgpack:
311+
return json.dumps(response._msgpack, separators=(',', ':'))
312+
else:
313+
return response.content
314+
296315
# if there's not an error, there must have been a successful response
297316
if 500 <= response.status_code < 600:
298-
raise InfluxDBServerError(response.content)
317+
raise InfluxDBServerError(reformat_error(response))
299318
elif response.status_code == expected_response_code:
300319
return response
301320
else:
302-
raise InfluxDBClientError(response.content, response.status_code)
321+
err_msg = reformat_error(response)
322+
raise InfluxDBClientError(err_msg, response.status_code)
303323

304324
def write(self, data, params=None, expected_response_code=204,
305325
protocol='json'):
@@ -450,10 +470,11 @@ def query(self,
450470
expected_response_code=expected_response_code
451471
)
452472

453-
if chunked:
454-
return self._read_chunked_response(response)
455-
456-
data = response.json()
473+
data = response._msgpack
474+
if not data:
475+
if chunked:
476+
return self._read_chunked_response(response)
477+
data = response.json()
457478

458479
results = [
459480
ResultSet(result, raise_errors=raise_errors)
@@ -1119,3 +1140,12 @@ def _parse_netloc(netloc):
11191140
'password': info.password or None,
11201141
'host': info.hostname or 'localhost',
11211142
'port': info.port or 8086}
1143+
1144+
1145+
def _msgpack_parse_hook(code, data):
1146+
if code == 5:
1147+
(epoch_s, epoch_ns) = struct.unpack(">QI", data)
1148+
timestamp = datetime.datetime.utcfromtimestamp(epoch_s)
1149+
timestamp += datetime.timedelta(microseconds=(epoch_ns / 1000))
1150+
return timestamp.isoformat() + 'Z'
1151+
return msgpack.ExtType(code, data)

influxdb/tests/client_test.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,29 @@ def test_query(self):
473473
[{'value': 0.64, 'time': '2009-11-10T23:00:00Z'}]
474474
)
475475

476+
def test_query_msgpack(self):
477+
"""Test query method with a messagepack response."""
478+
example_response = bytes(bytearray.fromhex(
479+
"81a7726573756c74739182ac73746174656d656e745f696400a673657269"
480+
"65739183a46e616d65a161a7636f6c756d6e7392a474696d65a176a67661"
481+
"6c7565739192c70c05000000005d26178a019096c8cb3ff0000000000000"
482+
))
483+
484+
with requests_mock.Mocker() as m:
485+
m.register_uri(
486+
requests_mock.GET,
487+
"http://localhost:8086/query",
488+
request_headers={"Accept": "application/x-msgpack"},
489+
headers={"Content-Type": "application/x-msgpack"},
490+
content=example_response
491+
)
492+
rs = self.cli.query('select * from a')
493+
494+
self.assertListEqual(
495+
list(rs.get_points()),
496+
[{'v': 1.0, 'time': '2019-07-10T16:51:22.026253Z'}]
497+
)
498+
476499
def test_select_into_post(self):
477500
"""Test SELECT.*INTO is POSTed."""
478501
example_response = (

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ python-dateutil>=2.6.0
22
pytz
33
requests>=2.17.0
44
six>=1.10.0
5+
msgpack==0.6.1

0 commit comments

Comments
 (0)