Skip to content

Commit 952310c

Browse files
Ursula Braundavem330
Ursula Braun
authored andcommitted
smc: receive data from RMBE
move RMBE data into user space buffer and update managing cursors Signed-off-by: Ursula Braun <[email protected]> Signed-off-by: David S. Miller <[email protected]>
1 parent e6727f3 commit 952310c

File tree

9 files changed

+304
-3
lines changed

9 files changed

+304
-3
lines changed

net/smc/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
obj-$(CONFIG_SMC) += smc.o
22
smc-y := af_smc.o smc_pnet.o smc_ib.o smc_clc.o smc_core.o smc_wr.o smc_llc.o
3-
smc-y += smc_cdc.o smc_tx.o
3+
smc-y += smc_cdc.o smc_tx.o smc_rx.o

net/smc/af_smc.c

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
#include "smc_ib.h"
3939
#include "smc_pnet.h"
4040
#include "smc_tx.h"
41+
#include "smc_rx.h"
4142

4243
static DEFINE_MUTEX(smc_create_lgr_pending); /* serialize link group
4344
* creation
@@ -412,6 +413,7 @@ static int smc_connect_rdma(struct smc_sock *smc)
412413

413414
mutex_unlock(&smc_create_lgr_pending);
414415
smc_tx_init(smc);
416+
smc_rx_init(smc);
415417

416418
out_connected:
417419
smc_copy_sock_settings_to_clc(smc);
@@ -755,6 +757,7 @@ static void smc_listen_work(struct work_struct *work)
755757
}
756758

757759
smc_tx_init(new_smc);
760+
smc_rx_init(new_smc);
758761

759762
out_connected:
760763
sk_refcnt_debug_inc(newsmcsk);
@@ -950,7 +953,7 @@ static int smc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
950953
if (smc->use_fallback)
951954
rc = smc->clcsock->ops->recvmsg(smc->clcsock, msg, len, flags);
952955
else
953-
rc = sock_no_recvmsg(sock, msg, len, flags);
956+
rc = smc_rx_recvmsg(smc, msg, len, flags);
954957
out:
955958
release_sock(sk);
956959
return rc;
@@ -1016,6 +1019,8 @@ static unsigned int smc_poll(struct file *file, struct socket *sock,
10161019
sk_set_bit(SOCKWQ_ASYNC_NOSPACE, sk);
10171020
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
10181021
}
1022+
if (atomic_read(&smc->conn.bytes_to_rcv))
1023+
mask |= POLLIN | POLLRDNORM;
10191024
/* for now - to be enhanced in follow-on patch */
10201025
}
10211026

net/smc/smc.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,10 @@ struct smc_connection {
115115
struct smc_buf_desc *rmb_desc; /* RMBE descriptor */
116116
int rmbe_size; /* RMBE size <== sock rmem */
117117
int rmbe_size_short;/* compressed notation */
118+
int rmbe_update_limit;
119+
/* lower limit for consumer
120+
* cursor update
121+
*/
118122

119123
struct smc_host_cdc_msg local_tx_ctrl; /* host byte order staging
120124
* buffer for CDC msg send

net/smc/smc_cdc.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "smc_wr.h"
1616
#include "smc_cdc.h"
1717
#include "smc_tx.h"
18+
#include "smc_rx.h"
1819

1920
/********************************** send *************************************/
2021

@@ -197,6 +198,7 @@ static void smc_cdc_msg_recv_action(struct smc_sock *smc,
197198
atomic_add(diff_prod, &conn->bytes_to_rcv);
198199
/* guarantee 0 <= bytes_to_rcv <= rmbe_size */
199200
smp_mb__after_atomic();
201+
smc->sk.sk_data_ready(&smc->sk);
200202
}
201203

202204
if (conn->local_rx_ctrl.conn_state_flags.peer_conn_abort)
@@ -216,7 +218,9 @@ static void smc_cdc_msg_recv_action(struct smc_sock *smc,
216218
return;
217219

218220
/* data available */
219-
/* subsequent patch: send delayed ack, wake receivers */
221+
if ((conn->local_rx_ctrl.prod_flags.write_blocked) ||
222+
(conn->local_rx_ctrl.prod_flags.cons_curs_upd_req))
223+
smc_tx_consumer_update(conn);
220224
}
221225

222226
/* called under tasklet context */

net/smc/smc_core.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,15 @@ struct smc_buf_desc *smc_rmb_get_slot(struct smc_link_group *lgr,
489489
return NULL;
490490
}
491491

492+
/* one of the conditions for announcing a receiver's current window size is
493+
* that it "results in a minimum increase in the window size of 10% of the
494+
* receive buffer space" [RFC7609]
495+
*/
496+
static inline int smc_rmb_wnd_update_limit(int rmbe_size)
497+
{
498+
return min_t(int, rmbe_size / 10, SOCK_MIN_SNDBUF / 2);
499+
}
500+
492501
/* create the tx buffer for an SMC socket */
493502
int smc_sndbuf_create(struct smc_sock *smc)
494503
{
@@ -620,6 +629,7 @@ int smc_rmb_create(struct smc_sock *smc)
620629
conn->rmbe_size_short = tmp_bufsize_short;
621630
smc->sk.sk_rcvbuf = tmp_bufsize * 2;
622631
atomic_set(&conn->bytes_to_rcv, 0);
632+
conn->rmbe_update_limit = smc_rmb_wnd_update_limit(tmp_bufsize);
623633
return 0;
624634
} else {
625635
return -ENOMEM;

net/smc/smc_rx.c

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
/*
2+
* Shared Memory Communications over RDMA (SMC-R) and RoCE
3+
*
4+
* Manage RMBE
5+
* copy new RMBE data into user space
6+
*
7+
* Copyright IBM Corp. 2016
8+
*
9+
* Author(s): Ursula Braun <[email protected]>
10+
*/
11+
12+
#include <linux/net.h>
13+
#include <linux/rcupdate.h>
14+
#include <net/sock.h>
15+
16+
#include "smc.h"
17+
#include "smc_core.h"
18+
#include "smc_cdc.h"
19+
#include "smc_tx.h" /* smc_tx_consumer_update() */
20+
#include "smc_rx.h"
21+
22+
/* callback implementation for sk.sk_data_ready()
23+
* to wakeup rcvbuf consumers that blocked with smc_rx_wait_data().
24+
* indirectly called by smc_cdc_msg_recv_action().
25+
*/
26+
static void smc_rx_data_ready(struct sock *sk)
27+
{
28+
struct socket_wq *wq;
29+
30+
/* derived from sock_def_readable() */
31+
/* called already in smc_listen_work() */
32+
rcu_read_lock();
33+
wq = rcu_dereference(sk->sk_wq);
34+
if (skwq_has_sleeper(wq))
35+
wake_up_interruptible_sync_poll(&wq->wait, POLLIN | POLLPRI |
36+
POLLRDNORM | POLLRDBAND);
37+
if ((sk->sk_shutdown == SHUTDOWN_MASK) ||
38+
(sk->sk_state == SMC_CLOSED))
39+
sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_HUP);
40+
else
41+
sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
42+
rcu_read_unlock();
43+
}
44+
45+
/* blocks rcvbuf consumer until >=len bytes available or timeout or interrupted
46+
* @smc smc socket
47+
* @timeo pointer to max seconds to wait, pointer to value 0 for no timeout
48+
* Returns:
49+
* 1 if at least 1 byte available in rcvbuf or if socket error/shutdown.
50+
* 0 otherwise (nothing in rcvbuf nor timeout, e.g. interrupted).
51+
*/
52+
static int smc_rx_wait_data(struct smc_sock *smc, long *timeo)
53+
{
54+
DEFINE_WAIT_FUNC(wait, woken_wake_function);
55+
struct smc_connection *conn = &smc->conn;
56+
struct sock *sk = &smc->sk;
57+
int rc;
58+
59+
if (atomic_read(&conn->bytes_to_rcv))
60+
return 1;
61+
sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);
62+
add_wait_queue(sk_sleep(sk), &wait);
63+
rc = sk_wait_event(sk, timeo,
64+
sk->sk_err ||
65+
sk->sk_shutdown & RCV_SHUTDOWN ||
66+
sock_flag(sk, SOCK_DONE) ||
67+
atomic_read(&conn->bytes_to_rcv) ||
68+
smc_cdc_rxed_any_close_or_senddone(conn),
69+
&wait);
70+
remove_wait_queue(sk_sleep(sk), &wait);
71+
sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk);
72+
return rc;
73+
}
74+
75+
/* rcvbuf consumer: main API called by socket layer.
76+
* called under sk lock.
77+
*/
78+
int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
79+
int flags)
80+
{
81+
size_t copylen, read_done = 0, read_remaining = len;
82+
size_t chunk_len, chunk_off, chunk_len_sum;
83+
struct smc_connection *conn = &smc->conn;
84+
union smc_host_cursor cons;
85+
int readable, chunk;
86+
char *rcvbuf_base;
87+
struct sock *sk;
88+
long timeo;
89+
int target; /* Read at least these many bytes */
90+
int rc;
91+
92+
if (unlikely(flags & MSG_ERRQUEUE))
93+
return -EINVAL; /* future work for sk.sk_family == AF_SMC */
94+
if (flags & MSG_OOB)
95+
return -EINVAL; /* future work */
96+
97+
sk = &smc->sk;
98+
if (sk->sk_state == SMC_LISTEN)
99+
return -ENOTCONN;
100+
timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
101+
target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
102+
103+
msg->msg_namelen = 0;
104+
/* we currently use 1 RMBE per RMB, so RMBE == RMB base addr */
105+
rcvbuf_base = conn->rmb_desc->cpu_addr;
106+
107+
do { /* while (read_remaining) */
108+
if (read_done >= target)
109+
break;
110+
111+
if (atomic_read(&conn->bytes_to_rcv))
112+
goto copy;
113+
114+
if (read_done) {
115+
if (sk->sk_err ||
116+
sk->sk_state == SMC_CLOSED ||
117+
(sk->sk_shutdown & RCV_SHUTDOWN) ||
118+
!timeo ||
119+
signal_pending(current) ||
120+
smc_cdc_rxed_any_close_or_senddone(conn) ||
121+
conn->local_tx_ctrl.conn_state_flags.
122+
peer_conn_abort)
123+
break;
124+
} else {
125+
if (sock_flag(sk, SOCK_DONE))
126+
break;
127+
if (sk->sk_err) {
128+
read_done = sock_error(sk);
129+
break;
130+
}
131+
if (sk->sk_shutdown & RCV_SHUTDOWN ||
132+
smc_cdc_rxed_any_close_or_senddone(conn) ||
133+
conn->local_tx_ctrl.conn_state_flags.
134+
peer_conn_abort)
135+
break;
136+
if (sk->sk_state == SMC_CLOSED) {
137+
if (!sock_flag(sk, SOCK_DONE)) {
138+
/* This occurs when user tries to read
139+
* from never connected socket.
140+
*/
141+
read_done = -ENOTCONN;
142+
break;
143+
}
144+
break;
145+
}
146+
if (signal_pending(current)) {
147+
read_done = sock_intr_errno(timeo);
148+
break;
149+
}
150+
}
151+
152+
if (!atomic_read(&conn->bytes_to_rcv)) {
153+
smc_rx_wait_data(smc, &timeo);
154+
continue;
155+
}
156+
157+
copy:
158+
/* initialize variables for 1st iteration of subsequent loop */
159+
/* could be just 1 byte, even after smc_rx_wait_data above */
160+
readable = atomic_read(&conn->bytes_to_rcv);
161+
/* not more than what user space asked for */
162+
copylen = min_t(size_t, read_remaining, readable);
163+
smc_curs_write(&cons,
164+
smc_curs_read(&conn->local_tx_ctrl.cons, conn),
165+
conn);
166+
/* determine chunks where to read from rcvbuf */
167+
/* either unwrapped case, or 1st chunk of wrapped case */
168+
chunk_len = min_t(size_t,
169+
copylen, conn->rmbe_size - cons.count);
170+
chunk_len_sum = chunk_len;
171+
chunk_off = cons.count;
172+
for (chunk = 0; chunk < 2; chunk++) {
173+
if (!(flags & MSG_TRUNC)) {
174+
rc = memcpy_to_msg(msg, rcvbuf_base + chunk_off,
175+
chunk_len);
176+
if (rc) {
177+
if (!read_done)
178+
read_done = -EFAULT;
179+
goto out;
180+
}
181+
}
182+
read_remaining -= chunk_len;
183+
read_done += chunk_len;
184+
185+
if (chunk_len_sum == copylen)
186+
break; /* either on 1st or 2nd iteration */
187+
/* prepare next (== 2nd) iteration */
188+
chunk_len = copylen - chunk_len; /* remainder */
189+
chunk_len_sum += chunk_len;
190+
chunk_off = 0; /* modulo offset in recv ring buffer */
191+
}
192+
193+
/* update cursors */
194+
if (!(flags & MSG_PEEK)) {
195+
smc_curs_add(conn->rmbe_size, &cons, copylen);
196+
/* increased in recv tasklet smc_cdc_msg_rcv() */
197+
smp_mb__before_atomic();
198+
atomic_sub(copylen, &conn->bytes_to_rcv);
199+
/* guarantee 0 <= bytes_to_rcv <= rmbe_size */
200+
smp_mb__after_atomic();
201+
smc_curs_write(&conn->local_tx_ctrl.cons,
202+
smc_curs_read(&cons, conn),
203+
conn);
204+
/* send consumer cursor update if required */
205+
/* similar to advertising new TCP rcv_wnd if required */
206+
smc_tx_consumer_update(conn);
207+
}
208+
} while (read_remaining);
209+
out:
210+
return read_done;
211+
}
212+
213+
/* Initialize receive properties on connection establishment. NB: not __init! */
214+
void smc_rx_init(struct smc_sock *smc)
215+
{
216+
smc->sk.sk_data_ready = smc_rx_data_ready;
217+
}

net/smc/smc_rx.h

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Shared Memory Communications over RDMA (SMC-R) and RoCE
3+
*
4+
* Manage RMBE
5+
*
6+
* Copyright IBM Corp. 2016
7+
*
8+
* Author(s): Ursula Braun <[email protected]>
9+
*/
10+
11+
#ifndef SMC_RX_H
12+
#define SMC_RX_H
13+
14+
#include <linux/socket.h>
15+
#include <linux/types.h>
16+
17+
#include "smc.h"
18+
19+
void smc_rx_init(struct smc_sock *smc);
20+
int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
21+
int flags);
22+
23+
#endif /* SMC_RX_H */

net/smc/smc_tx.c

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,43 @@ static void smc_tx_work(struct work_struct *work)
427427
release_sock(&smc->sk);
428428
}
429429

430+
void smc_tx_consumer_update(struct smc_connection *conn)
431+
{
432+
union smc_host_cursor cfed, cons;
433+
struct smc_cdc_tx_pend *pend;
434+
struct smc_wr_buf *wr_buf;
435+
int to_confirm, rc;
436+
437+
smc_curs_write(&cons,
438+
smc_curs_read(&conn->local_tx_ctrl.cons, conn),
439+
conn);
440+
smc_curs_write(&cfed,
441+
smc_curs_read(&conn->rx_curs_confirmed, conn),
442+
conn);
443+
to_confirm = smc_curs_diff(conn->rmbe_size, &cfed, &cons);
444+
445+
if (conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
446+
((to_confirm > conn->rmbe_update_limit) &&
447+
((to_confirm > (conn->rmbe_size / 2)) ||
448+
conn->local_rx_ctrl.prod_flags.write_blocked))) {
449+
rc = smc_cdc_get_free_slot(&conn->lgr->lnk[SMC_SINGLE_LINK],
450+
&wr_buf, &pend);
451+
if (!rc)
452+
rc = smc_cdc_msg_send(conn, wr_buf, pend);
453+
if (rc < 0) {
454+
schedule_work(&conn->tx_work);
455+
return;
456+
}
457+
smc_curs_write(&conn->rx_curs_confirmed,
458+
smc_curs_read(&conn->local_tx_ctrl.cons, conn),
459+
conn);
460+
conn->local_rx_ctrl.prod_flags.cons_curs_upd_req = 0;
461+
}
462+
if (conn->local_rx_ctrl.prod_flags.write_blocked &&
463+
!atomic_read(&conn->bytes_to_rcv))
464+
conn->local_rx_ctrl.prod_flags.write_blocked = 0;
465+
}
466+
430467
/***************************** send initialize *******************************/
431468

432469
/* Initialize send properties on connection establishment. NB: not __init! */

0 commit comments

Comments
 (0)