diff options
-rw-r--r-- | net/rds/ib.h | 2 | ||||
-rw-r--r-- | net/rds/ib_send.c | 62 | ||||
-rw-r--r-- | net/rds/message.c | 2 | ||||
-rw-r--r-- | net/rds/rds.h | 3 | ||||
-rw-r--r-- | net/rds/send.c | 11 |
5 files changed, 45 insertions, 35 deletions
diff --git a/net/rds/ib.h b/net/rds/ib.h index 148818174a0..96769b86a53 100644 --- a/net/rds/ib.h +++ b/net/rds/ib.h @@ -336,7 +336,7 @@ void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits); void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted); int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted, u32 *adv_credits, int need_posted, int max_posted); -int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op); +int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm); /* ib_stats.c */ DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats); diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c index d839b403d46..e6745d827c3 100644 --- a/net/rds/ib_send.c +++ b/net/rds/ib_send.c @@ -225,15 +225,12 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context) /* In the error case, wc.opcode sometimes contains garbage */ switch (send->s_wr.opcode) { case IB_WR_SEND: - if (send->s_rm) - rds_ib_send_unmap_rm(ic, send, wc.status); - break; case IB_WR_RDMA_WRITE: case IB_WR_RDMA_READ: case IB_WR_ATOMIC_FETCH_AND_ADD: case IB_WR_ATOMIC_CMP_AND_SWP: - /* Nothing to be done - the SG list will be unmapped - * when the SEND completes. */ + if (send->s_rm) + rds_ib_send_unmap_rm(ic, send, wc.status); break; default: if (printk_ratelimit()) @@ -425,6 +422,21 @@ void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted) set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags); } +static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic, + struct rds_ib_send_work *send, + bool notify) +{ + /* + * We want to delay signaling completions just enough to get + * the batching benefits but not so much that we create dead time + * on the wire. + */ + if (ic->i_unsignaled_wrs-- == 0 || notify) { + ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs; + send->s_wr.send_flags |= IB_SEND_SIGNALED; + } +} + /* * This can be called multiple times for a given message. The first time * we see a message we map its scatterlist into the IB device so that @@ -517,7 +529,6 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, rm->data.m_count = 0; } - ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs; rds_message_addref(rm); ic->i_rm = rm; @@ -608,15 +619,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, } } - /* - * We want to delay signaling completions just enough to get - * the batching benefits but not so much that we create dead time - * on the wire. - */ - if (ic->i_unsignaled_wrs-- == 0) { - ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs; - send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED; - } + rds_ib_set_wr_signal_state(ic, send, 0); /* * Always signal the last one if we're stopping due to flow control. @@ -656,7 +659,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, /* if we finished the message then send completion owns it */ if (scat == &rm->data.m_sg[rm->data.m_count]) { prev->s_rm = ic->i_rm; - prev->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED; + prev->s_wr.send_flags |= IB_SEND_SOLICITED; ic->i_rm = NULL; } @@ -698,9 +701,10 @@ out: * A simplified version of the rdma case, we always map 1 SG, and * only 8 bytes, for the return value from the atomic operation. */ -int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op) +int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm) { struct rds_ib_connection *ic = conn->c_transport_data; + struct rm_atomic_op *op = &rm->atomic; struct rds_ib_send_work *send = NULL; struct ib_send_wr *failed_wr; struct rds_ib_device *rds_ibdev; @@ -731,12 +735,20 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op) send->s_wr.wr.atomic.compare_add = op->op_swap_add; send->s_wr.wr.atomic.swap = 0; } - send->s_wr.send_flags = IB_SEND_SIGNALED; + rds_ib_set_wr_signal_state(ic, send, op->op_notify); send->s_wr.num_sge = 1; send->s_wr.next = NULL; send->s_wr.wr.atomic.remote_addr = op->op_remote_addr; send->s_wr.wr.atomic.rkey = op->op_rkey; + /* + * If there is no data or rdma ops in the message, then + * we must fill in s_rm ourselves, so we properly clean up + * on completion. + */ + if (!rm->rdma.m_rdma_op.r_active && !rm->data.op_active) + send->s_rm = rm; + /* map 8 byte retval buffer to the device */ ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE); rdsdebug("ic %p mapping atomic op %p. mapped %d pg\n", ic, op, ret); @@ -836,14 +848,8 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op) for (i = 0; i < work_alloc && scat != &op->r_sg[op->r_count]; i++) { send->s_wr.send_flags = 0; send->s_queued = jiffies; - /* - * We want to delay signaling completions just enough to get - * the batching benefits but not so much that we create dead time on the wire. - */ - if (ic->i_unsignaled_wrs-- == 0) { - ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs; - send->s_wr.send_flags = IB_SEND_SIGNALED; - } + + rds_ib_set_wr_signal_state(ic, send, op->r_notify); send->s_wr.opcode = op->r_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ; send->s_wr.wr.rdma.remote_addr = remote_addr; @@ -884,10 +890,6 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op) send = ic->i_sends; } - /* if we finished the message then send completion owns it */ - if (scat == &op->r_sg[op->r_count]) - prev->s_wr.send_flags = IB_SEND_SIGNALED; - if (i < work_alloc) { rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i); work_alloc = i; diff --git a/net/rds/message.c b/net/rds/message.c index 3ea05c864cd..a27e493a63a 100644 --- a/net/rds/message.c +++ b/net/rds/message.c @@ -325,6 +325,8 @@ int rds_message_copy_from_user(struct rds_message *rm, struct iovec *first_iov, sg++; } + rm->data.op_active = 1; + out: return ret; } diff --git a/net/rds/rds.h b/net/rds/rds.h index 0c610a102c2..bf2349da4db 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -341,6 +341,7 @@ struct rds_message { struct rds_mr *m_rdma_mr; } rdma; struct rm_data_op { + unsigned int op_active:1; unsigned int m_nents; unsigned int m_count; struct scatterlist *m_sg; @@ -418,7 +419,7 @@ struct rds_transport { int (*xmit_cong_map)(struct rds_connection *conn, struct rds_cong_map *map, unsigned long offset); int (*xmit_rdma)(struct rds_connection *conn, struct rds_rdma_op *op); - int (*xmit_atomic)(struct rds_connection *conn, struct rm_atomic_op *op); + int (*xmit_atomic)(struct rds_connection *conn, struct rds_message *rm); int (*recv)(struct rds_connection *conn); int (*inc_copy_to_user)(struct rds_incoming *inc, struct iovec *iov, size_t size); diff --git a/net/rds/send.c b/net/rds/send.c index 5bc35d2f40e..42fb934293b 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -266,7 +266,7 @@ int rds_send_xmit(struct rds_connection *conn) if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { - ret = conn->c_trans->xmit_atomic(conn, &rm->atomic); + ret = conn->c_trans->xmit_atomic(conn, rm); if (ret) break; conn->c_xmit_atomic_sent = 1; @@ -285,13 +285,18 @@ int rds_send_xmit(struct rds_connection *conn) if (ret) break; conn->c_xmit_rdma_sent = 1; + + /* rdmas need data sent, even if just the header */ + rm->data.op_active = 1; + /* The transport owns the mapped memory for now. * You can't unmap it while it's on the send queue */ set_bit(RDS_MSG_MAPPED, &rm->m_flags); } - if (conn->c_xmit_hdr_off < sizeof(struct rds_header) || - conn->c_xmit_sg < rm->data.m_nents) { + if (rm->data.op_active + && (conn->c_xmit_hdr_off < sizeof(struct rds_header) || + conn->c_xmit_sg < rm->data.m_nents)) { ret = conn->c_trans->xmit(conn, rm, conn->c_xmit_hdr_off, conn->c_xmit_sg, |