summaryrefslogtreecommitdiffstats
path: root/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/xprtrdma/svc_rdma_recvfrom.c')
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c255
1 files changed, 197 insertions, 58 deletions
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 06ab4841537..a4756576d68 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -112,16 +112,11 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
rqstp->rq_arg.tail[0].iov_len = 0;
}
-struct chunk_sge {
- int start; /* sge no for this chunk */
- int count; /* sge count for this chunk */
-};
-
/* Encode a read-chunk-list as an array of IB SGE
*
* Assumptions:
* - chunk[0]->position points to pages[0] at an offset of 0
- * - pages[] is not physically or virtually contigous and consists of
+ * - pages[] is not physically or virtually contiguous and consists of
* PAGE_SIZE elements.
*
* Output:
@@ -130,12 +125,12 @@ struct chunk_sge {
* chunk in the read list
*
*/
-static int rdma_rcl_to_sge(struct svcxprt_rdma *xprt,
+static int map_read_chunks(struct svcxprt_rdma *xprt,
struct svc_rqst *rqstp,
struct svc_rdma_op_ctxt *head,
struct rpcrdma_msg *rmsgp,
- struct ib_sge *sge,
- struct chunk_sge *ch_sge_ary,
+ struct svc_rdma_req_map *rpl_map,
+ struct svc_rdma_req_map *chl_map,
int ch_count,
int byte_count)
{
@@ -156,22 +151,18 @@ static int rdma_rcl_to_sge(struct svcxprt_rdma *xprt,
head->arg.head[0] = rqstp->rq_arg.head[0];
head->arg.tail[0] = rqstp->rq_arg.tail[0];
head->arg.pages = &head->pages[head->count];
- head->sge[0].length = head->count; /* save count of hdr pages */
+ head->hdr_count = head->count; /* save count of hdr pages */
head->arg.page_base = 0;
head->arg.page_len = ch_bytes;
head->arg.len = rqstp->rq_arg.len + ch_bytes;
head->arg.buflen = rqstp->rq_arg.buflen + ch_bytes;
head->count++;
- ch_sge_ary[0].start = 0;
+ chl_map->ch[0].start = 0;
while (byte_count) {
+ rpl_map->sge[sge_no].iov_base =
+ page_address(rqstp->rq_arg.pages[page_no]) + page_off;
sge_bytes = min_t(int, PAGE_SIZE-page_off, ch_bytes);
- sge[sge_no].addr =
- ib_dma_map_page(xprt->sc_cm_id->device,
- rqstp->rq_arg.pages[page_no],
- page_off, sge_bytes,
- DMA_FROM_DEVICE);
- sge[sge_no].length = sge_bytes;
- sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
+ rpl_map->sge[sge_no].iov_len = sge_bytes;
/*
* Don't bump head->count here because the same page
* may be used by multiple SGE.
@@ -187,11 +178,11 @@ static int rdma_rcl_to_sge(struct svcxprt_rdma *xprt,
* SGE, move to the next SGE
*/
if (ch_bytes == 0) {
- ch_sge_ary[ch_no].count =
- sge_no - ch_sge_ary[ch_no].start;
+ chl_map->ch[ch_no].count =
+ sge_no - chl_map->ch[ch_no].start;
ch_no++;
ch++;
- ch_sge_ary[ch_no].start = sge_no;
+ chl_map->ch[ch_no].start = sge_no;
ch_bytes = ch->rc_target.rs_length;
/* If bytes remaining account for next chunk */
if (byte_count) {
@@ -220,19 +211,128 @@ static int rdma_rcl_to_sge(struct svcxprt_rdma *xprt,
return sge_no;
}
-static void rdma_set_ctxt_sge(struct svc_rdma_op_ctxt *ctxt,
- struct ib_sge *sge,
- u64 *sgl_offset,
- int count)
+/* Map a read-chunk-list to an XDR and fast register the page-list.
+ *
+ * Assumptions:
+ * - chunk[0] position points to pages[0] at an offset of 0
+ * - pages[] will be made physically contiguous by creating a one-off memory
+ * region using the fastreg verb.
+ * - byte_count is # of bytes in read-chunk-list
+ * - ch_count is # of chunks in read-chunk-list
+ *
+ * Output:
+ * - sge array pointing into pages[] array.
+ * - chunk_sge array specifying sge index and count for each
+ * chunk in the read list
+ */
+static int fast_reg_read_chunks(struct svcxprt_rdma *xprt,
+ struct svc_rqst *rqstp,
+ struct svc_rdma_op_ctxt *head,
+ struct rpcrdma_msg *rmsgp,
+ struct svc_rdma_req_map *rpl_map,
+ struct svc_rdma_req_map *chl_map,
+ int ch_count,
+ int byte_count)
+{
+ int page_no;
+ int ch_no;
+ u32 offset;
+ struct rpcrdma_read_chunk *ch;
+ struct svc_rdma_fastreg_mr *frmr;
+ int ret = 0;
+
+ frmr = svc_rdma_get_frmr(xprt);
+ if (IS_ERR(frmr))
+ return -ENOMEM;
+
+ head->frmr = frmr;
+ head->arg.head[0] = rqstp->rq_arg.head[0];
+ head->arg.tail[0] = rqstp->rq_arg.tail[0];
+ head->arg.pages = &head->pages[head->count];
+ head->hdr_count = head->count; /* save count of hdr pages */
+ head->arg.page_base = 0;
+ head->arg.page_len = byte_count;
+ head->arg.len = rqstp->rq_arg.len + byte_count;
+ head->arg.buflen = rqstp->rq_arg.buflen + byte_count;
+
+ /* Fast register the page list */
+ frmr->kva = page_address(rqstp->rq_arg.pages[0]);
+ frmr->direction = DMA_FROM_DEVICE;
+ frmr->access_flags = (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE);
+ frmr->map_len = byte_count;
+ frmr->page_list_len = PAGE_ALIGN(byte_count) >> PAGE_SHIFT;
+ for (page_no = 0; page_no < frmr->page_list_len; page_no++) {
+ frmr->page_list->page_list[page_no] =
+ ib_dma_map_single(xprt->sc_cm_id->device,
+ page_address(rqstp->rq_arg.pages[page_no]),
+ PAGE_SIZE, DMA_TO_DEVICE);
+ if (ib_dma_mapping_error(xprt->sc_cm_id->device,
+ frmr->page_list->page_list[page_no]))
+ goto fatal_err;
+ atomic_inc(&xprt->sc_dma_used);
+ head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
+ }
+ head->count += page_no;
+
+ /* rq_respages points one past arg pages */
+ rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
+
+ /* Create the reply and chunk maps */
+ offset = 0;
+ ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
+ for (ch_no = 0; ch_no < ch_count; ch_no++) {
+ rpl_map->sge[ch_no].iov_base = frmr->kva + offset;
+ rpl_map->sge[ch_no].iov_len = ch->rc_target.rs_length;
+ chl_map->ch[ch_no].count = 1;
+ chl_map->ch[ch_no].start = ch_no;
+ offset += ch->rc_target.rs_length;
+ ch++;
+ }
+
+ ret = svc_rdma_fastreg(xprt, frmr);
+ if (ret)
+ goto fatal_err;
+
+ return ch_no;
+
+ fatal_err:
+ printk("svcrdma: error fast registering xdr for xprt %p", xprt);
+ svc_rdma_put_frmr(xprt, frmr);
+ return -EIO;
+}
+
+static int rdma_set_ctxt_sge(struct svcxprt_rdma *xprt,
+ struct svc_rdma_op_ctxt *ctxt,
+ struct svc_rdma_fastreg_mr *frmr,
+ struct kvec *vec,
+ u64 *sgl_offset,
+ int count)
{
int i;
ctxt->count = count;
+ ctxt->direction = DMA_FROM_DEVICE;
for (i = 0; i < count; i++) {
- ctxt->sge[i].addr = sge[i].addr;
- ctxt->sge[i].length = sge[i].length;
- *sgl_offset = *sgl_offset + sge[i].length;
+ ctxt->sge[i].length = 0; /* in case map fails */
+ if (!frmr) {
+ ctxt->sge[i].addr =
+ ib_dma_map_single(xprt->sc_cm_id->device,
+ vec[i].iov_base,
+ vec[i].iov_len,
+ DMA_FROM_DEVICE);
+ if (ib_dma_mapping_error(xprt->sc_cm_id->device,
+ ctxt->sge[i].addr))
+ return -EINVAL;
+ ctxt->sge[i].lkey = xprt->sc_dma_lkey;
+ atomic_inc(&xprt->sc_dma_used);
+ } else {
+ ctxt->sge[i].addr = (unsigned long)vec[i].iov_base;
+ ctxt->sge[i].lkey = frmr->mr->lkey;
+ }
+ ctxt->sge[i].length = vec[i].iov_len;
+ *sgl_offset = *sgl_offset + vec[i].iov_len;
}
+ return 0;
}
static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
@@ -280,37 +380,44 @@ static int rdma_read_xdr(struct svcxprt_rdma *xprt,
struct svc_rdma_op_ctxt *hdr_ctxt)
{
struct ib_send_wr read_wr;
+ struct ib_send_wr inv_wr;
int err = 0;
int ch_no;
- struct ib_sge *sge;
int ch_count;
int byte_count;
int sge_count;
u64 sgl_offset;
struct rpcrdma_read_chunk *ch;
struct svc_rdma_op_ctxt *ctxt = NULL;
- struct svc_rdma_op_ctxt *tmp_sge_ctxt;
- struct svc_rdma_op_ctxt *tmp_ch_ctxt;
- struct chunk_sge *ch_sge_ary;
+ struct svc_rdma_req_map *rpl_map;
+ struct svc_rdma_req_map *chl_map;
/* If no read list is present, return 0 */
ch = svc_rdma_get_read_chunk(rmsgp);
if (!ch)
return 0;
- /* Allocate temporary contexts to keep SGE */
- BUG_ON(sizeof(struct ib_sge) < sizeof(struct chunk_sge));
- tmp_sge_ctxt = svc_rdma_get_context(xprt);
- sge = tmp_sge_ctxt->sge;
- tmp_ch_ctxt = svc_rdma_get_context(xprt);
- ch_sge_ary = (struct chunk_sge *)tmp_ch_ctxt->sge;
+ /* Allocate temporary reply and chunk maps */
+ rpl_map = svc_rdma_get_req_map();
+ chl_map = svc_rdma_get_req_map();
svc_rdma_rcl_chunk_counts(ch, &ch_count, &byte_count);
if (ch_count > RPCSVC_MAXPAGES)
return -EINVAL;
- sge_count = rdma_rcl_to_sge(xprt, rqstp, hdr_ctxt, rmsgp,
- sge, ch_sge_ary,
- ch_count, byte_count);
+
+ if (!xprt->sc_frmr_pg_list_len)
+ sge_count = map_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp,
+ rpl_map, chl_map, ch_count,
+ byte_count);
+ else
+ sge_count = fast_reg_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp,
+ rpl_map, chl_map, ch_count,
+ byte_count);
+ if (sge_count < 0) {
+ err = -EIO;
+ goto out;
+ }
+
sgl_offset = 0;
ch_no = 0;
@@ -319,32 +426,64 @@ static int rdma_read_xdr(struct svcxprt_rdma *xprt,
next_sge:
ctxt = svc_rdma_get_context(xprt);
ctxt->direction = DMA_FROM_DEVICE;
+ ctxt->frmr = hdr_ctxt->frmr;
+ ctxt->read_hdr = NULL;
clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
+ clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
/* Prepare READ WR */
memset(&read_wr, 0, sizeof read_wr);
- ctxt->wr_op = IB_WR_RDMA_READ;
read_wr.wr_id = (unsigned long)ctxt;
read_wr.opcode = IB_WR_RDMA_READ;
+ ctxt->wr_op = read_wr.opcode;
read_wr.send_flags = IB_SEND_SIGNALED;
read_wr.wr.rdma.rkey = ch->rc_target.rs_handle;
read_wr.wr.rdma.remote_addr =
get_unaligned(&(ch->rc_target.rs_offset)) +
sgl_offset;
- read_wr.sg_list = &sge[ch_sge_ary[ch_no].start];
+ read_wr.sg_list = ctxt->sge;
read_wr.num_sge =
- rdma_read_max_sge(xprt, ch_sge_ary[ch_no].count);
- rdma_set_ctxt_sge(ctxt, &sge[ch_sge_ary[ch_no].start],
- &sgl_offset,
- read_wr.num_sge);
+ rdma_read_max_sge(xprt, chl_map->ch[ch_no].count);
+ err = rdma_set_ctxt_sge(xprt, ctxt, hdr_ctxt->frmr,
+ &rpl_map->sge[chl_map->ch[ch_no].start],
+ &sgl_offset,
+ read_wr.num_sge);
+ if (err) {
+ svc_rdma_unmap_dma(ctxt);
+ svc_rdma_put_context(ctxt, 0);
+ goto out;
+ }
if (((ch+1)->rc_discrim == 0) &&
- (read_wr.num_sge == ch_sge_ary[ch_no].count)) {
+ (read_wr.num_sge == chl_map->ch[ch_no].count)) {
/*
* Mark the last RDMA_READ with a bit to
* indicate all RPC data has been fetched from
* the client and the RPC needs to be enqueued.
*/
set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
+ if (hdr_ctxt->frmr) {
+ set_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
+ /*
+ * Invalidate the local MR used to map the data
+ * sink.
+ */
+ if (xprt->sc_dev_caps &
+ SVCRDMA_DEVCAP_READ_W_INV) {
+ read_wr.opcode =
+ IB_WR_RDMA_READ_WITH_INV;
+ ctxt->wr_op = read_wr.opcode;
+ read_wr.ex.invalidate_rkey =
+ ctxt->frmr->mr->lkey;
+ } else {
+ /* Prepare INVALIDATE WR */
+ memset(&inv_wr, 0, sizeof inv_wr);
+ inv_wr.opcode = IB_WR_LOCAL_INV;
+ inv_wr.send_flags = IB_SEND_SIGNALED;
+ inv_wr.ex.invalidate_rkey =
+ hdr_ctxt->frmr->mr->lkey;
+ read_wr.next = &inv_wr;
+ }
+ }
ctxt->read_hdr = hdr_ctxt;
}
/* Post the read */
@@ -358,9 +497,9 @@ next_sge:
}
atomic_inc(&rdma_stat_read);
- if (read_wr.num_sge < ch_sge_ary[ch_no].count) {
- ch_sge_ary[ch_no].count -= read_wr.num_sge;
- ch_sge_ary[ch_no].start += read_wr.num_sge;
+ if (read_wr.num_sge < chl_map->ch[ch_no].count) {
+ chl_map->ch[ch_no].count -= read_wr.num_sge;
+ chl_map->ch[ch_no].start += read_wr.num_sge;
goto next_sge;
}
sgl_offset = 0;
@@ -368,8 +507,8 @@ next_sge:
}
out:
- svc_rdma_put_context(tmp_sge_ctxt, 0);
- svc_rdma_put_context(tmp_ch_ctxt, 0);
+ svc_rdma_put_req_map(rpl_map);
+ svc_rdma_put_req_map(chl_map);
/* Detach arg pages. svc_recv will replenish them */
for (ch_no = 0; &rqstp->rq_pages[ch_no] < rqstp->rq_respages; ch_no++)
@@ -399,7 +538,7 @@ static int rdma_read_complete(struct svc_rqst *rqstp,
rqstp->rq_pages[page_no] = head->pages[page_no];
}
/* Point rq_arg.pages past header */
- rqstp->rq_arg.pages = &rqstp->rq_pages[head->sge[0].length];
+ rqstp->rq_arg.pages = &rqstp->rq_pages[head->hdr_count];
rqstp->rq_arg.page_len = head->arg.page_len;
rqstp->rq_arg.page_base = head->arg.page_base;
@@ -449,18 +588,18 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
dprintk("svcrdma: rqstp=%p\n", rqstp);
- spin_lock_bh(&rdma_xprt->sc_read_complete_lock);
+ spin_lock_bh(&rdma_xprt->sc_rq_dto_lock);
if (!list_empty(&rdma_xprt->sc_read_complete_q)) {
ctxt = list_entry(rdma_xprt->sc_read_complete_q.next,
struct svc_rdma_op_ctxt,
dto_q);
list_del_init(&ctxt->dto_q);
}
- spin_unlock_bh(&rdma_xprt->sc_read_complete_lock);
- if (ctxt)
+ if (ctxt) {
+ spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
return rdma_read_complete(rqstp, ctxt);
+ }
- spin_lock_bh(&rdma_xprt->sc_rq_dto_lock);
if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next,
struct svc_rdma_op_ctxt,