Skip to content

Commit c7eef8f

Browse files
mp911dechristophstrobl
authored andcommitted
Introduce JedisInvoker.
We now use JedisInvoker to call Jedis and Pipeline methods for synchronous, pipelining, and transactional execution models. JedisInvoker captures the method invocation as functional utility and allows conversion of results: Long result = invoker.just(BinaryJedis::geoadd, MultiKeyPipelineBase:geoadd, key, point.getX(), point.getY(), member); Closes #1951 Original Pull Request: #1960
1 parent f723bd3 commit c7eef8f

16 files changed

+1494
-2626
lines changed

src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java

Lines changed: 136 additions & 117 deletions
Large diffs are not rendered by default.

src/main/java/org/springframework/data/redis/connection/jedis/JedisGeoCommands.java

Lines changed: 28 additions & 203 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515
*/
1616
package org.springframework.data.redis.connection.jedis;
1717

18+
import redis.clients.jedis.BinaryJedis;
1819
import redis.clients.jedis.GeoCoordinate;
1920
import redis.clients.jedis.GeoUnit;
21+
import redis.clients.jedis.MultiKeyPipelineBase;
2022

2123
import java.util.HashMap;
2224
import java.util.List;
@@ -56,22 +58,8 @@ public Long geoAdd(byte[] key, Point point, byte[] member) {
5658
Assert.notNull(point, "Point must not be null!");
5759
Assert.notNull(member, "Member must not be null!");
5860

59-
try {
60-
if (isPipelined()) {
61-
pipeline(connection
62-
.newJedisResult(connection.getRequiredPipeline().geoadd(key, point.getX(), point.getY(), member)));
63-
return null;
64-
}
65-
if (isQueueing()) {
66-
transaction(connection
67-
.newJedisResult(connection.getRequiredTransaction().geoadd(key, point.getX(), point.getY(), member)));
68-
return null;
69-
}
70-
71-
return connection.getJedis().geoadd(key, point.getX(), point.getY(), member);
72-
} catch (Exception ex) {
73-
throw convertJedisAccessException(ex);
74-
}
61+
return connection.invoke().just(BinaryJedis::geoadd, MultiKeyPipelineBase::geoadd, key, point.getX(), point.getY(),
62+
member);
7563
}
7664

7765
/*
@@ -90,20 +78,7 @@ public Long geoAdd(byte[] key, Map<byte[], Point> memberCoordinateMap) {
9078
redisGeoCoordinateMap.put(mapKey, JedisConverters.toGeoCoordinate(memberCoordinateMap.get(mapKey)));
9179
}
9280

93-
try {
94-
if (isPipelined()) {
95-
pipeline(connection.newJedisResult(connection.getRequiredPipeline().geoadd(key, redisGeoCoordinateMap)));
96-
return null;
97-
}
98-
if (isQueueing()) {
99-
transaction(connection.newJedisResult(connection.getRequiredTransaction().geoadd(key, redisGeoCoordinateMap)));
100-
return null;
101-
}
102-
103-
return connection.getJedis().geoadd(key, redisGeoCoordinateMap);
104-
} catch (Exception ex) {
105-
throw convertJedisAccessException(ex);
106-
}
81+
return connection.invoke().just(BinaryJedis::geoadd, MultiKeyPipelineBase::geoadd, key, redisGeoCoordinateMap);
10782
}
10883

10984
/*
@@ -122,20 +97,7 @@ public Long geoAdd(byte[] key, Iterable<GeoLocation<byte[]>> locations) {
12297
redisGeoCoordinateMap.put(location.getName(), JedisConverters.toGeoCoordinate(location.getPoint()));
12398
}
12499

125-
try {
126-
if (isPipelined()) {
127-
pipeline(connection.newJedisResult(connection.getRequiredPipeline().geoadd(key, redisGeoCoordinateMap)));
128-
return null;
129-
}
130-
if (isQueueing()) {
131-
transaction(connection.newJedisResult(connection.getRequiredTransaction().geoadd(key, redisGeoCoordinateMap)));
132-
return null;
133-
}
134-
135-
return connection.getJedis().geoadd(key, redisGeoCoordinateMap);
136-
} catch (Exception ex) {
137-
throw convertJedisAccessException(ex);
138-
}
100+
return connection.invoke().just(BinaryJedis::geoadd, MultiKeyPipelineBase::geoadd, key, redisGeoCoordinateMap);
139101
}
140102

141103
/*
@@ -151,23 +113,8 @@ public Distance geoDist(byte[] key, byte[] member1, byte[] member2) {
151113

152114
Converter<Double, Distance> distanceConverter = JedisConverters.distanceConverterForMetric(DistanceUnit.METERS);
153115

154-
try {
155-
if (isPipelined()) {
156-
pipeline(connection.newJedisResult(connection.getRequiredPipeline().geodist(key, member1, member2),
157-
distanceConverter));
158-
return null;
159-
}
160-
if (isQueueing()) {
161-
transaction(connection.newJedisResult(connection.getRequiredTransaction().geodist(key, member1, member2),
162-
distanceConverter));
163-
return null;
164-
}
165-
166-
Double distance = connection.getJedis().geodist(key, member1, member2);
167-
return distance != null ? distanceConverter.convert(distance) : null;
168-
} catch (Exception ex) {
169-
throw convertJedisAccessException(ex);
170-
}
116+
return connection.invoke().from(BinaryJedis::geodist, MultiKeyPipelineBase::geodist, key, member1, member2)
117+
.get(distanceConverter);
171118
}
172119

173120
/*
@@ -185,23 +132,8 @@ public Distance geoDist(byte[] key, byte[] member1, byte[] member2, Metric metri
185132
GeoUnit geoUnit = JedisConverters.toGeoUnit(metric);
186133
Converter<Double, Distance> distanceConverter = JedisConverters.distanceConverterForMetric(metric);
187134

188-
try {
189-
if (isPipelined()) {
190-
pipeline(connection.newJedisResult(connection.getRequiredPipeline().geodist(key, member1, member2, geoUnit),
191-
distanceConverter));
192-
return null;
193-
}
194-
if (isQueueing()) {
195-
transaction(connection.newJedisResult(
196-
connection.getRequiredTransaction().geodist(key, member1, member2, geoUnit), distanceConverter));
197-
return null;
198-
}
199-
200-
Double distance = connection.getJedis().geodist(key, member1, member2, geoUnit);
201-
return distance != null ? distanceConverter.convert(distance) : null;
202-
} catch (Exception ex) {
203-
throw convertJedisAccessException(ex);
204-
}
135+
return connection.invoke().from(BinaryJedis::geodist, MultiKeyPipelineBase::geodist, key, member1, member2, geoUnit)
136+
.get(distanceConverter);
205137
}
206138

207139
/*
@@ -215,22 +147,8 @@ public List<String> geoHash(byte[] key, byte[]... members) {
215147
Assert.notNull(members, "Members must not be null!");
216148
Assert.noNullElements(members, "Members must not contain null!");
217149

218-
try {
219-
if (isPipelined()) {
220-
pipeline(connection.newJedisResult(connection.getRequiredPipeline().geohash(key, members),
221-
JedisConverters.bytesListToStringListConverter()));
222-
return null;
223-
}
224-
if (isQueueing()) {
225-
transaction(connection.newJedisResult(connection.getRequiredTransaction().geohash(key, members),
226-
JedisConverters.bytesListToStringListConverter()));
227-
return null;
228-
}
229-
230-
return JedisConverters.bytesListToStringListConverter().convert(connection.getJedis().geohash(key, members));
231-
} catch (Exception ex) {
232-
throw convertJedisAccessException(ex);
233-
}
150+
return connection.invoke().from(BinaryJedis::geohash, MultiKeyPipelineBase::geohash, key, members)
151+
.get(JedisConverters.bytesListToStringListConverter());
234152
}
235153

236154
/*
@@ -245,19 +163,8 @@ public List<Point> geoPos(byte[] key, byte[]... members) {
245163
Assert.noNullElements(members, "Members must not contain null!");
246164

247165
ListConverter<GeoCoordinate, Point> converter = JedisConverters.geoCoordinateToPointConverter();
248-
try {
249-
if (isPipelined()) {
250-
pipeline(connection.newJedisResult(connection.getRequiredPipeline().geopos(key, members), converter));
251-
return null;
252-
}
253-
if (isQueueing()) {
254-
transaction(connection.newJedisResult(connection.getRequiredTransaction().geopos(key, members), converter));
255-
return null;
256-
}
257-
return converter.convert(connection.getJedis().geopos(key, members));
258-
} catch (Exception ex) {
259-
throw convertJedisAccessException(ex);
260-
}
166+
167+
return connection.invoke().from(BinaryJedis::geopos, MultiKeyPipelineBase::geopos, key, members).get(converter);
261168
}
262169

263170
/*
@@ -272,30 +179,12 @@ public GeoResults<GeoLocation<byte[]>> geoRadius(byte[] key, Circle within) {
272179

273180
Converter<List<redis.clients.jedis.GeoRadiusResponse>, GeoResults<GeoLocation<byte[]>>> converter = JedisConverters
274181
.geoRadiusResponseToGeoResultsConverter(within.getRadius().getMetric());
275-
try {
276-
if (isPipelined()) {
277-
pipeline(
278-
connection.newJedisResult(
279-
connection.getRequiredPipeline().georadius(key, within.getCenter().getX(), within.getCenter().getY(),
280-
within.getRadius().getValue(), JedisConverters.toGeoUnit(within.getRadius().getMetric())),
281-
converter));
282-
return null;
283-
}
284-
if (isQueueing()) {
285-
transaction(
286-
connection.newJedisResult(
287-
connection.getRequiredTransaction().georadius(key, within.getCenter().getX(), within.getCenter().getY(),
288-
within.getRadius().getValue(), JedisConverters.toGeoUnit(within.getRadius().getMetric())),
289-
converter));
290-
return null;
291-
}
292-
293-
return converter
294-
.convert(connection.getJedis().georadius(key, within.getCenter().getX(), within.getCenter().getY(),
295-
within.getRadius().getValue(), JedisConverters.toGeoUnit(within.getRadius().getMetric())));
296-
} catch (Exception ex) {
297-
throw convertJedisAccessException(ex);
298-
}
182+
183+
return connection.invoke()
184+
.from(BinaryJedis::georadius, MultiKeyPipelineBase::georadius, key, within.getCenter().getX(),
185+
within.getCenter().getY(), within.getRadius().getValue(),
186+
JedisConverters.toGeoUnit(within.getRadius().getMetric()))
187+
.get(converter);
299188
}
300189

301190
/*
@@ -313,26 +202,11 @@ public GeoResults<GeoLocation<byte[]>> geoRadius(byte[] key, Circle within, GeoR
313202
Converter<List<redis.clients.jedis.GeoRadiusResponse>, GeoResults<GeoLocation<byte[]>>> converter = JedisConverters
314203
.geoRadiusResponseToGeoResultsConverter(within.getRadius().getMetric());
315204

316-
try {
317-
if (isPipelined()) {
318-
pipeline(connection.newJedisResult(connection.getRequiredPipeline().georadius(key, within.getCenter().getX(),
205+
return connection.invoke().from(BinaryJedis::georadius, MultiKeyPipelineBase::georadius, key,
206+
within.getCenter().getX(),
319207
within.getCenter().getY(), within.getRadius().getValue(),
320-
JedisConverters.toGeoUnit(within.getRadius().getMetric()), geoRadiusParam), converter));
321-
return null;
322-
}
323-
if (isQueueing()) {
324-
transaction(connection.newJedisResult(connection.getRequiredTransaction().georadius(key,
325-
within.getCenter().getX(), within.getCenter().getY(), within.getRadius().getValue(),
326-
JedisConverters.toGeoUnit(within.getRadius().getMetric()), geoRadiusParam), converter));
327-
return null;
328-
}
329-
330-
return converter.convert(connection.getJedis().georadius(key, within.getCenter().getX(),
331-
within.getCenter().getY(), within.getRadius().getValue(),
332-
JedisConverters.toGeoUnit(within.getRadius().getMetric()), geoRadiusParam));
333-
} catch (Exception ex) {
334-
throw convertJedisAccessException(ex);
335-
}
208+
JedisConverters.toGeoUnit(within.getRadius().getMetric()), geoRadiusParam)
209+
.get(converter);
336210
}
337211

338212
/*
@@ -350,22 +224,8 @@ public GeoResults<GeoLocation<byte[]>> geoRadiusByMember(byte[] key, byte[] memb
350224
Converter<List<redis.clients.jedis.GeoRadiusResponse>, GeoResults<GeoLocation<byte[]>>> converter = JedisConverters
351225
.geoRadiusResponseToGeoResultsConverter(radius.getMetric());
352226

353-
try {
354-
if (isPipelined()) {
355-
pipeline(connection.newJedisResult(
356-
connection.getRequiredPipeline().georadiusByMember(key, member, radius.getValue(), geoUnit), converter));
357-
return null;
358-
}
359-
if (isQueueing()) {
360-
transaction(connection.newJedisResult(
361-
connection.getRequiredTransaction().georadiusByMember(key, member, radius.getValue(), geoUnit), converter));
362-
return null;
363-
}
364-
365-
return converter.convert(connection.getJedis().georadiusByMember(key, member, radius.getValue(), geoUnit));
366-
} catch (Exception ex) {
367-
throw convertJedisAccessException(ex);
368-
}
227+
return connection.invoke().from(BinaryJedis::georadiusByMember, MultiKeyPipelineBase::georadiusByMember, key,
228+
member, radius.getValue(), geoUnit).get(converter);
369229
}
370230

371231
/*
@@ -386,23 +246,8 @@ public GeoResults<GeoLocation<byte[]>> geoRadiusByMember(byte[] key, byte[] memb
386246
.geoRadiusResponseToGeoResultsConverter(radius.getMetric());
387247
redis.clients.jedis.params.GeoRadiusParam geoRadiusParam = JedisConverters.toGeoRadiusParam(args);
388248

389-
try {
390-
if (isPipelined()) {
391-
pipeline(connection.newJedisResult(
392-
connection.getRequiredPipeline().georadiusByMember(key, member, radius.getValue(), geoUnit, geoRadiusParam),
393-
converter));
394-
return null;
395-
}
396-
if (isQueueing()) {
397-
transaction(connection.newJedisResult(connection.getRequiredTransaction().georadiusByMember(key, member,
398-
radius.getValue(), geoUnit, geoRadiusParam), converter));
399-
return null;
400-
}
401-
return converter
402-
.convert(connection.getJedis().georadiusByMember(key, member, radius.getValue(), geoUnit, geoRadiusParam));
403-
} catch (Exception ex) {
404-
throw convertJedisAccessException(ex);
405-
}
249+
return connection.invoke().from(BinaryJedis::georadiusByMember, MultiKeyPipelineBase::georadiusByMember, key,
250+
member, radius.getValue(), geoUnit, geoRadiusParam).get(converter);
406251
}
407252

408253
/*
@@ -413,24 +258,4 @@ public GeoResults<GeoLocation<byte[]>> geoRadiusByMember(byte[] key, byte[] memb
413258
public Long geoRemove(byte[] key, byte[]... members) {
414259
return connection.zSetCommands().zRem(key, members);
415260
}
416-
417-
private boolean isPipelined() {
418-
return connection.isPipelined();
419-
}
420-
421-
private void pipeline(JedisResult result) {
422-
connection.pipeline(result);
423-
}
424-
425-
private boolean isQueueing() {
426-
return connection.isQueueing();
427-
}
428-
429-
private void transaction(JedisResult result) {
430-
connection.transaction(result);
431-
}
432-
433-
private RuntimeException convertJedisAccessException(Exception ex) {
434-
return connection.convertJedisAccessException(ex);
435-
}
436261
}

0 commit comments

Comments
 (0)