summaryrefslogtreecommitdiffstats
path: root/net/sunrpc/xprtrdma/svc_rdma_transport.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/xprtrdma/svc_rdma_transport.c')
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c290
1 files changed, 174 insertions, 116 deletions
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index af408fc1263..e132509d1db 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -103,8 +103,8 @@ static int rdma_bump_context_cache(struct svcxprt_rdma *xprt)
spin_lock_bh(&xprt->sc_ctxt_lock);
if (ctxt) {
at_least_one = 1;
- ctxt->next = xprt->sc_ctxt_head;
- xprt->sc_ctxt_head = ctxt;
+ INIT_LIST_HEAD(&ctxt->free_list);
+ list_add(&ctxt->free_list, &xprt->sc_ctxt_free);
} else {
/* kmalloc failed...give up for now */
xprt->sc_ctxt_cnt--;
@@ -123,7 +123,7 @@ struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
while (1) {
spin_lock_bh(&xprt->sc_ctxt_lock);
- if (unlikely(xprt->sc_ctxt_head == NULL)) {
+ if (unlikely(list_empty(&xprt->sc_ctxt_free))) {
/* Try to bump my cache. */
spin_unlock_bh(&xprt->sc_ctxt_lock);
@@ -136,12 +136,15 @@ struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
schedule_timeout_uninterruptible(msecs_to_jiffies(500));
continue;
}
- ctxt = xprt->sc_ctxt_head;
- xprt->sc_ctxt_head = ctxt->next;
+ ctxt = list_entry(xprt->sc_ctxt_free.next,
+ struct svc_rdma_op_ctxt,
+ free_list);
+ list_del_init(&ctxt->free_list);
spin_unlock_bh(&xprt->sc_ctxt_lock);
ctxt->xprt = xprt;
INIT_LIST_HEAD(&ctxt->dto_q);
ctxt->count = 0;
+ atomic_inc(&xprt->sc_ctxt_used);
break;
}
return ctxt;
@@ -159,14 +162,15 @@ void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
put_page(ctxt->pages[i]);
for (i = 0; i < ctxt->count; i++)
- dma_unmap_single(xprt->sc_cm_id->device->dma_device,
- ctxt->sge[i].addr,
- ctxt->sge[i].length,
- ctxt->direction);
+ ib_dma_unmap_single(xprt->sc_cm_id->device,
+ ctxt->sge[i].addr,
+ ctxt->sge[i].length,
+ ctxt->direction);
+
spin_lock_bh(&xprt->sc_ctxt_lock);
- ctxt->next = xprt->sc_ctxt_head;
- xprt->sc_ctxt_head = ctxt;
+ list_add(&ctxt->free_list, &xprt->sc_ctxt_free);
spin_unlock_bh(&xprt->sc_ctxt_lock);
+ atomic_dec(&xprt->sc_ctxt_used);
}
/* ib_cq event handler */
@@ -228,23 +232,8 @@ static void dto_tasklet_func(unsigned long data)
list_del_init(&xprt->sc_dto_q);
spin_unlock_irqrestore(&dto_lock, flags);
- if (test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags)) {
- ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
- rq_cq_reap(xprt);
- set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
- /*
- * If data arrived before established event,
- * don't enqueue. This defers RPC I/O until the
- * RDMA connection is complete.
- */
- if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
- svc_xprt_enqueue(&xprt->sc_xprt);
- }
-
- if (test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags)) {
- ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
- sq_cq_reap(xprt);
- }
+ rq_cq_reap(xprt);
+ sq_cq_reap(xprt);
svc_xprt_put(&xprt->sc_xprt);
spin_lock_irqsave(&dto_lock, flags);
@@ -263,11 +252,15 @@ static void rq_comp_handler(struct ib_cq *cq, void *cq_context)
struct svcxprt_rdma *xprt = cq_context;
unsigned long flags;
+ /* Guard against unconditional flush call for destroyed QP */
+ if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
+ return;
+
/*
* Set the bit regardless of whether or not it's on the list
* because it may be on the list already due to an SQ
* completion.
- */
+ */
set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags);
/*
@@ -290,6 +283,8 @@ static void rq_comp_handler(struct ib_cq *cq, void *cq_context)
*
* Take all completing WC off the CQE and enqueue the associated DTO
* context on the dto_q for the transport.
+ *
+ * Note that caller must hold a transport reference.
*/
static void rq_cq_reap(struct svcxprt_rdma *xprt)
{
@@ -297,29 +292,47 @@ static void rq_cq_reap(struct svcxprt_rdma *xprt)
struct ib_wc wc;
struct svc_rdma_op_ctxt *ctxt = NULL;
+ if (!test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags))
+ return;
+
+ ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
atomic_inc(&rdma_stat_rq_poll);
- spin_lock_bh(&xprt->sc_rq_dto_lock);
while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) {
ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
ctxt->wc_status = wc.status;
ctxt->byte_len = wc.byte_len;
if (wc.status != IB_WC_SUCCESS) {
/* Close the transport */
+ dprintk("svcrdma: transport closing putting ctxt %p\n", ctxt);
set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
svc_rdma_put_context(ctxt, 1);
+ svc_xprt_put(&xprt->sc_xprt);
continue;
}
+ spin_lock_bh(&xprt->sc_rq_dto_lock);
list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
+ spin_unlock_bh(&xprt->sc_rq_dto_lock);
+ svc_xprt_put(&xprt->sc_xprt);
}
- spin_unlock_bh(&xprt->sc_rq_dto_lock);
if (ctxt)
atomic_inc(&rdma_stat_rq_prod);
+
+ set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
+ /*
+ * If data arrived before established event,
+ * don't enqueue. This defers RPC I/O until the
+ * RDMA connection is complete.
+ */
+ if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
+ svc_xprt_enqueue(&xprt->sc_xprt);
}
/*
* Send Queue Completion Handler - potentially called on interrupt context.
+ *
+ * Note that caller must hold a transport reference.
*/
static void sq_cq_reap(struct svcxprt_rdma *xprt)
{
@@ -328,6 +341,11 @@ static void sq_cq_reap(struct svcxprt_rdma *xprt)
struct ib_cq *cq = xprt->sc_sq_cq;
int ret;
+
+ if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))
+ return;
+
+ ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
atomic_inc(&rdma_stat_sq_poll);
while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {
ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
@@ -349,14 +367,16 @@ static void sq_cq_reap(struct svcxprt_rdma *xprt)
case IB_WR_RDMA_READ:
if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
+ struct svc_rdma_op_ctxt *read_hdr = ctxt->read_hdr;
+ BUG_ON(!read_hdr);
set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
- set_bit(RDMACTXT_F_READ_DONE, &ctxt->flags);
spin_lock_bh(&xprt->sc_read_complete_lock);
- list_add_tail(&ctxt->dto_q,
+ list_add_tail(&read_hdr->dto_q,
&xprt->sc_read_complete_q);
spin_unlock_bh(&xprt->sc_read_complete_lock);
svc_xprt_enqueue(&xprt->sc_xprt);
}
+ svc_rdma_put_context(ctxt, 0);
break;
default:
@@ -365,6 +385,7 @@ static void sq_cq_reap(struct svcxprt_rdma *xprt)
wc.opcode, wc.status);
break;
}
+ svc_xprt_put(&xprt->sc_xprt);
}
if (ctxt)
@@ -376,11 +397,15 @@ static void sq_comp_handler(struct ib_cq *cq, void *cq_context)
struct svcxprt_rdma *xprt = cq_context;
unsigned long flags;
+ /* Guard against unconditional flush call for destroyed QP */
+ if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
+ return;
+
/*
* Set the bit regardless of whether or not it's on the list
* because it may be on the list already due to an RQ
* completion.
- */
+ */
set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags);
/*
@@ -407,28 +432,29 @@ static void create_context_cache(struct svcxprt_rdma *xprt,
xprt->sc_ctxt_max = ctxt_max;
xprt->sc_ctxt_bump = ctxt_bump;
xprt->sc_ctxt_cnt = 0;
- xprt->sc_ctxt_head = NULL;
+ atomic_set(&xprt->sc_ctxt_used, 0);
+
+ INIT_LIST_HEAD(&xprt->sc_ctxt_free);
for (i = 0; i < ctxt_count; i++) {
ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
if (ctxt) {
- ctxt->next = xprt->sc_ctxt_head;
- xprt->sc_ctxt_head = ctxt;
+ INIT_LIST_HEAD(&ctxt->free_list);
+ list_add(&ctxt->free_list, &xprt->sc_ctxt_free);
xprt->sc_ctxt_cnt++;
}
}
}
-static void destroy_context_cache(struct svc_rdma_op_ctxt *ctxt)
+static void destroy_context_cache(struct svcxprt_rdma *xprt)
{
- struct svc_rdma_op_ctxt *next;
- if (!ctxt)
- return;
-
- do {
- next = ctxt->next;
+ while (!list_empty(&xprt->sc_ctxt_free)) {
+ struct svc_rdma_op_ctxt *ctxt;
+ ctxt = list_entry(xprt->sc_ctxt_free.next,
+ struct svc_rdma_op_ctxt,
+ free_list);
+ list_del_init(&ctxt->free_list);
kfree(ctxt);
- ctxt = next;
- } while (next);
+ }
}
static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
@@ -465,7 +491,7 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
reqs +
cma_xprt->sc_sq_depth +
RPCRDMA_MAX_THREADS + 1); /* max */
- if (!cma_xprt->sc_ctxt_head) {
+ if (list_empty(&cma_xprt->sc_ctxt_free)) {
kfree(cma_xprt);
return NULL;
}
@@ -520,7 +546,12 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt)
recv_wr.num_sge = ctxt->count;
recv_wr.wr_id = (u64)(unsigned long)ctxt;
+ svc_xprt_get(&xprt->sc_xprt);
ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
+ if (ret) {
+ svc_xprt_put(&xprt->sc_xprt);
+ svc_rdma_put_context(ctxt, 1);
+ }
return ret;
}
@@ -539,6 +570,7 @@ static void handle_connect_req(struct rdma_cm_id *new_cma_id)
{
struct svcxprt_rdma *listen_xprt = new_cma_id->context;
struct svcxprt_rdma *newxprt;
+ struct sockaddr *sa;
/* Create a new transport */
newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0);
@@ -551,6 +583,12 @@ static void handle_connect_req(struct rdma_cm_id *new_cma_id)
dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n",
newxprt, newxprt->sc_cm_id, listen_xprt);
+ /* Set the local and remote addresses in the transport */
+ sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
+ svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa));
+ sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
+ svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa));
+
/*
* Enqueue the new transport on the accept queue of the listening
* transport
@@ -627,6 +665,7 @@ static int rdma_cma_handler(struct rdma_cm_id *cma_id,
if (xprt) {
set_bit(XPT_CLOSE, &xprt->xpt_flags);
svc_xprt_enqueue(xprt);
+ svc_xprt_put(xprt);
}
break;
case RDMA_CM_EVENT_DEVICE_REMOVAL:
@@ -661,31 +700,27 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
cma_xprt = rdma_create_xprt(serv, 1);
if (!cma_xprt)
- return ERR_PTR(ENOMEM);
+ return ERR_PTR(-ENOMEM);
xprt = &cma_xprt->sc_xprt;
listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP);
if (IS_ERR(listen_id)) {
- svc_xprt_put(&cma_xprt->sc_xprt);
- dprintk("svcrdma: rdma_create_id failed = %ld\n",
- PTR_ERR(listen_id));
- return (void *)listen_id;
+ ret = PTR_ERR(listen_id);
+ dprintk("svcrdma: rdma_create_id failed = %d\n", ret);
+ goto err0;
}
+
ret = rdma_bind_addr(listen_id, sa);
if (ret) {
- rdma_destroy_id(listen_id);
- svc_xprt_put(&cma_xprt->sc_xprt);
dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret);
- return ERR_PTR(ret);
+ goto err1;
}
cma_xprt->sc_cm_id = listen_id;
ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG);
if (ret) {
- rdma_destroy_id(listen_id);
- svc_xprt_put(&cma_xprt->sc_xprt);
dprintk("svcrdma: rdma_listen failed = %d\n", ret);
- return ERR_PTR(ret);
+ goto err1;
}
/*
@@ -696,6 +731,12 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen);
return &cma_xprt->sc_xprt;
+
+ err1:
+ rdma_destroy_id(listen_id);
+ err0:
+ kfree(cma_xprt);
+ return ERR_PTR(ret);
}
/*
@@ -716,7 +757,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
struct rdma_conn_param conn_param;
struct ib_qp_init_attr qp_attr;
struct ib_device_attr devattr;
- struct sockaddr *sa;
int ret;
int i;
@@ -826,7 +866,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
newxprt->sc_sq_depth = qp_attr.cap.max_send_wr;
newxprt->sc_max_requests = qp_attr.cap.max_recv_wr;
}
- svc_xprt_get(&newxprt->sc_xprt);
newxprt->sc_qp = newxprt->sc_cm_id->qp;
/* Register all of physical memory */
@@ -850,6 +889,13 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
/* Swap out the handler */
newxprt->sc_cm_id->event_handler = rdma_cma_handler;
+ /*
+ * Arm the CQs for the SQ and RQ before accepting so we can't
+ * miss the first message
+ */
+ ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
+ ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
+
/* Accept Connection */
set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
memset(&conn_param, 0, sizeof conn_param);
@@ -886,58 +932,26 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
newxprt->sc_max_requests,
newxprt->sc_ord);
- /* Set the local and remote addresses in the transport */
- sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
- svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa));
- sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
- svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa));
-
- ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
- ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
return &newxprt->sc_xprt;
errout:
dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret);
/* Take a reference in case the DTO handler runs */
svc_xprt_get(&newxprt->sc_xprt);
- if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp)) {
+ if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp))
ib_destroy_qp(newxprt->sc_qp);
- svc_xprt_put(&newxprt->sc_xprt);
- }
rdma_destroy_id(newxprt->sc_cm_id);
/* This call to put will destroy the transport */
svc_xprt_put(&newxprt->sc_xprt);
return NULL;
}
-/*
- * Post an RQ WQE to the RQ when the rqst is being released. This
- * effectively returns an RQ credit to the client. The rq_xprt_ctxt
- * will be null if the request is deferred due to an RDMA_READ or the
- * transport had no data ready (EAGAIN). Note that an RPC deferred in
- * svc_process will still return the credit, this is because the data
- * is copied and no longer consume a WQE/WC.
- */
static void svc_rdma_release_rqst(struct svc_rqst *rqstp)
{
- int err;
- struct svcxprt_rdma *rdma =
- container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt);
- if (rqstp->rq_xprt_ctxt) {
- BUG_ON(rqstp->rq_xprt_ctxt != rdma);
- err = svc_rdma_post_recv(rdma);
- if (err)
- dprintk("svcrdma: failed to post an RQ WQE error=%d\n",
- err);
- }
- rqstp->rq_xprt_ctxt = NULL;
}
/*
- * When connected, an svc_xprt has at least three references:
- *
- * - A reference held by the QP. We still hold that here because this
- * code deletes the QP and puts the reference.
+ * When connected, an svc_xprt has at least two references:
*
* - A reference held by the cm_id between the ESTABLISHED and
* DISCONNECTED events. If the remote peer disconnected first, this
@@ -946,7 +960,7 @@ static void svc_rdma_release_rqst(struct svc_rqst *rqstp)
* - A reference held by the svc_recv code that called this function
* as part of close processing.
*
- * At a minimum two references should still be held.
+ * At a minimum one references should still be held.
*/
static void svc_rdma_detach(struct svc_xprt *xprt)
{
@@ -956,23 +970,53 @@ static void svc_rdma_detach(struct svc_xprt *xprt)
/* Disconnect and flush posted WQE */
rdma_disconnect(rdma->sc_cm_id);
-
- /* Destroy the QP if present (not a listener) */
- if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) {
- ib_destroy_qp(rdma->sc_qp);
- svc_xprt_put(xprt);
- }
-
- /* Destroy the CM ID */
- rdma_destroy_id(rdma->sc_cm_id);
}
-static void svc_rdma_free(struct svc_xprt *xprt)
+static void __svc_rdma_free(struct work_struct *work)
{
- struct svcxprt_rdma *rdma = (struct svcxprt_rdma *)xprt;
+ struct svcxprt_rdma *rdma =
+ container_of(work, struct svcxprt_rdma, sc_work);
dprintk("svcrdma: svc_rdma_free(%p)\n", rdma);
+
/* We should only be called from kref_put */
- BUG_ON(atomic_read(&xprt->xpt_ref.refcount) != 0);
+ BUG_ON(atomic_read(&rdma->sc_xprt.xpt_ref.refcount) != 0);
+
+ /*
+ * Destroy queued, but not processed read completions. Note
+ * that this cleanup has to be done before destroying the
+ * cm_id because the device ptr is needed to unmap the dma in
+ * svc_rdma_put_context.
+ */
+ spin_lock_bh(&rdma->sc_read_complete_lock);
+ while (!list_empty(&rdma->sc_read_complete_q)) {
+ struct svc_rdma_op_ctxt *ctxt;
+ ctxt = list_entry(rdma->sc_read_complete_q.next,
+ struct svc_rdma_op_ctxt,
+ dto_q);
+ list_del_init(&ctxt->dto_q);
+ svc_rdma_put_context(ctxt, 1);
+ }
+ spin_unlock_bh(&rdma->sc_read_complete_lock);
+
+ /* Destroy queued, but not processed recv completions */
+ spin_lock_bh(&rdma->sc_rq_dto_lock);
+ while (!list_empty(&rdma->sc_rq_dto_q)) {
+ struct svc_rdma_op_ctxt *ctxt;
+ ctxt = list_entry(rdma->sc_rq_dto_q.next,
+ struct svc_rdma_op_ctxt,
+ dto_q);
+ list_del_init(&ctxt->dto_q);
+ svc_rdma_put_context(ctxt, 1);
+ }
+ spin_unlock_bh(&rdma->sc_rq_dto_lock);
+
+ /* Warn if we leaked a resource or under-referenced */
+ WARN_ON(atomic_read(&rdma->sc_ctxt_used) != 0);
+
+ /* Destroy the QP if present (not a listener) */
+ if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))
+ ib_destroy_qp(rdma->sc_qp);
+
if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq))
ib_destroy_cq(rdma->sc_sq_cq);
@@ -985,10 +1029,21 @@ static void svc_rdma_free(struct svc_xprt *xprt)
if (rdma->sc_pd && !IS_ERR(rdma->sc_pd))
ib_dealloc_pd(rdma->sc_pd);
- destroy_context_cache(rdma->sc_ctxt_head);
+ /* Destroy the CM ID */
+ rdma_destroy_id(rdma->sc_cm_id);
+
+ destroy_context_cache(rdma);
kfree(rdma);
}
+static void svc_rdma_free(struct svc_xprt *xprt)
+{
+ struct svcxprt_rdma *rdma =
+ container_of(xprt, struct svcxprt_rdma, sc_xprt);
+ INIT_WORK(&rdma->sc_work, __svc_rdma_free);
+ schedule_work(&rdma->sc_work);
+}
+
static int svc_rdma_has_wspace(struct svc_xprt *xprt)
{
struct svcxprt_rdma *rdma =
@@ -1018,7 +1073,7 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
int ret;
if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
- return 0;
+ return -ENOTCONN;
BUG_ON(wr->send_flags != IB_SEND_SIGNALED);
BUG_ON(((struct svc_rdma_op_ctxt *)(unsigned long)wr->wr_id)->wr_op !=
@@ -1029,7 +1084,8 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
if (xprt->sc_sq_depth == atomic_read(&xprt->sc_sq_count)) {
spin_unlock_bh(&xprt->sc_lock);
atomic_inc(&rdma_stat_sq_starve);
- /* See if we can reap some SQ WR */
+
+ /* See if we can opportunistically reap SQ WR to make room */
sq_cq_reap(xprt);
/* Wait until SQ WR available if SQ still full */
@@ -1041,22 +1097,25 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
continue;
}
/* Bumped used SQ WR count and post */
+ svc_xprt_get(&xprt->sc_xprt);
ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
if (!ret)
atomic_inc(&xprt->sc_sq_count);
- else
+ else {
+ svc_xprt_put(&xprt->sc_xprt);
dprintk("svcrdma: failed to post SQ WR rc=%d, "
"sc_sq_count=%d, sc_sq_depth=%d\n",
ret, atomic_read(&xprt->sc_sq_count),
xprt->sc_sq_depth);
+ }
spin_unlock_bh(&xprt->sc_lock);
break;
}
return ret;
}
-int svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
- enum rpcrdma_errcode err)
+void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
+ enum rpcrdma_errcode err)
{
struct ib_send_wr err_wr;
struct ib_sge sge;
@@ -1094,9 +1153,8 @@ int svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
/* Post It */
ret = svc_rdma_send(xprt, &err_wr);
if (ret) {
- dprintk("svcrdma: Error posting send = %d\n", ret);
+ dprintk("svcrdma: Error %d posting send for protocol error\n",
+ ret);
svc_rdma_put_context(ctxt, 1);
}
-
- return ret;
}