summaryrefslogtreecommitdiffstats
path: root/net/ceph/messenger.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph/messenger.c')
-rw-r--r--net/ceph/messenger.c85
1 files changed, 57 insertions, 28 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 1948d592aa5..8d1653caffd 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -174,6 +174,7 @@ static struct lock_class_key socket_class;
#define SKIP_BUF_SIZE 1024
static void queue_con(struct ceph_connection *con);
+static void cancel_con(struct ceph_connection *con);
static void con_work(struct work_struct *);
static void con_fault(struct ceph_connection *con);
@@ -291,7 +292,11 @@ int ceph_msgr_init(void)
if (ceph_msgr_slab_init())
return -ENOMEM;
- ceph_msgr_wq = alloc_workqueue("ceph-msgr", 0, 0);
+ /*
+ * The number of active work items is limited by the number of
+ * connections, so leave @max_active at default.
+ */
+ ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_MEM_RECLAIM, 0);
if (ceph_msgr_wq)
return 0;
@@ -479,7 +484,7 @@ static int ceph_tcp_connect(struct ceph_connection *con)
IPPROTO_TCP, &sock);
if (ret)
return ret;
- sock->sk->sk_allocation = GFP_NOFS;
+ sock->sk->sk_allocation = GFP_NOFS | __GFP_MEMALLOC;
#ifdef CONFIG_LOCKDEP
lockdep_set_class(&sock->sk->sk_lock, &socket_class);
@@ -504,6 +509,9 @@ static int ceph_tcp_connect(struct ceph_connection *con)
return ret;
}
+
+ sk_set_memalloc(sock->sk);
+
con->sock = sock;
return 0;
}
@@ -680,7 +688,7 @@ void ceph_con_close(struct ceph_connection *con)
reset_connection(con);
con->peer_global_seq = 0;
- cancel_delayed_work(&con->work);
+ cancel_con(con);
con_close_socket(con);
mutex_unlock(&con->mutex);
}
@@ -900,7 +908,7 @@ static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
BUG_ON(page_count > (int)USHRT_MAX);
cursor->page_count = (unsigned short)page_count;
BUG_ON(length > SIZE_MAX - cursor->page_offset);
- cursor->last_piece = (size_t)cursor->page_offset + length <= PAGE_SIZE;
+ cursor->last_piece = cursor->page_offset + cursor->resid <= PAGE_SIZE;
}
static struct page *
@@ -1936,11 +1944,11 @@ static int process_banner(struct ceph_connection *con)
sizeof(con->peer_addr)) != 0 &&
!(addr_is_blank(&con->actual_peer_addr.in_addr) &&
con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
- pr_warning("wrong peer, want %s/%d, got %s/%d\n",
- ceph_pr_addr(&con->peer_addr.in_addr),
- (int)le32_to_cpu(con->peer_addr.nonce),
- ceph_pr_addr(&con->actual_peer_addr.in_addr),
- (int)le32_to_cpu(con->actual_peer_addr.nonce));
+ pr_warn("wrong peer, want %s/%d, got %s/%d\n",
+ ceph_pr_addr(&con->peer_addr.in_addr),
+ (int)le32_to_cpu(con->peer_addr.nonce),
+ ceph_pr_addr(&con->actual_peer_addr.in_addr),
+ (int)le32_to_cpu(con->actual_peer_addr.nonce));
con->error_msg = "wrong peer at address";
return -1;
}
@@ -2301,7 +2309,7 @@ static int read_partial_message(struct ceph_connection *con)
BUG_ON(!con->in_msg ^ skip);
if (con->in_msg && data_len > con->in_msg->data_length) {
- pr_warning("%s skipping long message (%u > %zd)\n",
+ pr_warn("%s skipping long message (%u > %zd)\n",
__func__, data_len, con->in_msg->data_length);
ceph_msg_put(con->in_msg);
con->in_msg = NULL;
@@ -2667,19 +2675,16 @@ static int queue_con_delay(struct ceph_connection *con, unsigned long delay)
{
if (!con->ops->get(con)) {
dout("%s %p ref count 0\n", __func__, con);
-
return -ENOENT;
}
if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) {
dout("%s %p - already queued\n", __func__, con);
con->ops->put(con);
-
return -EBUSY;
}
dout("%s %p %lu\n", __func__, con, delay);
-
return 0;
}
@@ -2688,6 +2693,14 @@ static void queue_con(struct ceph_connection *con)
(void) queue_con_delay(con, 0);
}
+static void cancel_con(struct ceph_connection *con)
+{
+ if (cancel_delayed_work(&con->work)) {
+ dout("%s %p\n", __func__, con);
+ con->ops->put(con);
+ }
+}
+
static bool con_sock_closed(struct ceph_connection *con)
{
if (!con_flag_test_and_clear(con, CON_FLAG_SOCK_CLOSED))
@@ -2706,7 +2719,7 @@ static bool con_sock_closed(struct ceph_connection *con)
CASE(OPEN);
CASE(STANDBY);
default:
- pr_warning("%s con %p unrecognized state %lu\n",
+ pr_warn("%s con %p unrecognized state %lu\n",
__func__, con, con->state);
con->error_msg = "unrecognized con state";
BUG();
@@ -2759,8 +2772,11 @@ static void con_work(struct work_struct *work)
{
struct ceph_connection *con = container_of(work, struct ceph_connection,
work.work);
+ unsigned long pflags = current->flags;
bool fault;
+ current->flags |= PF_MEMALLOC;
+
mutex_lock(&con->mutex);
while (true) {
int ret;
@@ -2814,6 +2830,8 @@ static void con_work(struct work_struct *work)
con_fault_finish(con);
con->ops->put(con);
+
+ tsk_restore_flags(current, pflags, PF_MEMALLOC);
}
/*
@@ -2822,8 +2840,8 @@ static void con_work(struct work_struct *work)
*/
static void con_fault(struct ceph_connection *con)
{
- pr_warning("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
- ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
+ pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
+ ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
dout("fault %p state %lu to peer %s\n",
con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
@@ -3065,10 +3083,8 @@ static void ceph_msg_data_destroy(struct ceph_msg_data *data)
return;
WARN_ON(!list_empty(&data->links));
- if (data->type == CEPH_MSG_DATA_PAGELIST) {
+ if (data->type == CEPH_MSG_DATA_PAGELIST)
ceph_pagelist_release(data->pagelist);
- kfree(data->pagelist);
- }
kmem_cache_free(ceph_msg_data_cache, data);
}
@@ -3269,24 +3285,21 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
/*
* Free a generically kmalloc'd message.
*/
-void ceph_msg_kfree(struct ceph_msg *m)
+static void ceph_msg_free(struct ceph_msg *m)
{
- dout("msg_kfree %p\n", m);
+ dout("%s %p\n", __func__, m);
ceph_kvfree(m->front.iov_base);
kmem_cache_free(ceph_msg_cache, m);
}
-/*
- * Drop a msg ref. Destroy as needed.
- */
-void ceph_msg_last_put(struct kref *kref)
+static void ceph_msg_release(struct kref *kref)
{
struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
LIST_HEAD(data);
struct list_head *links;
struct list_head *next;
- dout("ceph_msg_put last one on %p\n", m);
+ dout("%s %p\n", __func__, m);
WARN_ON(!list_empty(&m->list_head));
/* drop middle, data, if any */
@@ -3308,9 +3321,25 @@ void ceph_msg_last_put(struct kref *kref)
if (m->pool)
ceph_msgpool_put(m->pool, m);
else
- ceph_msg_kfree(m);
+ ceph_msg_free(m);
+}
+
+struct ceph_msg *ceph_msg_get(struct ceph_msg *msg)
+{
+ dout("%s %p (was %d)\n", __func__, msg,
+ atomic_read(&msg->kref.refcount));
+ kref_get(&msg->kref);
+ return msg;
+}
+EXPORT_SYMBOL(ceph_msg_get);
+
+void ceph_msg_put(struct ceph_msg *msg)
+{
+ dout("%s %p (was %d)\n", __func__, msg,
+ atomic_read(&msg->kref.refcount));
+ kref_put(&msg->kref, ceph_msg_release);
}
-EXPORT_SYMBOL(ceph_msg_last_put);
+EXPORT_SYMBOL(ceph_msg_put);
void ceph_msg_dump(struct ceph_msg *msg)
{