diff options
Diffstat (limited to 'net/ceph/messenger.c')
-rw-r--r-- | net/ceph/messenger.c | 85 |
1 files changed, 57 insertions, 28 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 1948d592aa5..8d1653caffd 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -174,6 +174,7 @@ static struct lock_class_key socket_class; #define SKIP_BUF_SIZE 1024 static void queue_con(struct ceph_connection *con); +static void cancel_con(struct ceph_connection *con); static void con_work(struct work_struct *); static void con_fault(struct ceph_connection *con); @@ -291,7 +292,11 @@ int ceph_msgr_init(void) if (ceph_msgr_slab_init()) return -ENOMEM; - ceph_msgr_wq = alloc_workqueue("ceph-msgr", 0, 0); + /* + * The number of active work items is limited by the number of + * connections, so leave @max_active at default. + */ + ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_MEM_RECLAIM, 0); if (ceph_msgr_wq) return 0; @@ -479,7 +484,7 @@ static int ceph_tcp_connect(struct ceph_connection *con) IPPROTO_TCP, &sock); if (ret) return ret; - sock->sk->sk_allocation = GFP_NOFS; + sock->sk->sk_allocation = GFP_NOFS | __GFP_MEMALLOC; #ifdef CONFIG_LOCKDEP lockdep_set_class(&sock->sk->sk_lock, &socket_class); @@ -504,6 +509,9 @@ static int ceph_tcp_connect(struct ceph_connection *con) return ret; } + + sk_set_memalloc(sock->sk); + con->sock = sock; return 0; } @@ -680,7 +688,7 @@ void ceph_con_close(struct ceph_connection *con) reset_connection(con); con->peer_global_seq = 0; - cancel_delayed_work(&con->work); + cancel_con(con); con_close_socket(con); mutex_unlock(&con->mutex); } @@ -900,7 +908,7 @@ static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor, BUG_ON(page_count > (int)USHRT_MAX); cursor->page_count = (unsigned short)page_count; BUG_ON(length > SIZE_MAX - cursor->page_offset); - cursor->last_piece = (size_t)cursor->page_offset + length <= PAGE_SIZE; + cursor->last_piece = cursor->page_offset + cursor->resid <= PAGE_SIZE; } static struct page * @@ -1936,11 +1944,11 @@ static int process_banner(struct ceph_connection *con) sizeof(con->peer_addr)) != 0 && !(addr_is_blank(&con->actual_peer_addr.in_addr) && con->actual_peer_addr.nonce == con->peer_addr.nonce)) { - pr_warning("wrong peer, want %s/%d, got %s/%d\n", - ceph_pr_addr(&con->peer_addr.in_addr), - (int)le32_to_cpu(con->peer_addr.nonce), - ceph_pr_addr(&con->actual_peer_addr.in_addr), - (int)le32_to_cpu(con->actual_peer_addr.nonce)); + pr_warn("wrong peer, want %s/%d, got %s/%d\n", + ceph_pr_addr(&con->peer_addr.in_addr), + (int)le32_to_cpu(con->peer_addr.nonce), + ceph_pr_addr(&con->actual_peer_addr.in_addr), + (int)le32_to_cpu(con->actual_peer_addr.nonce)); con->error_msg = "wrong peer at address"; return -1; } @@ -2301,7 +2309,7 @@ static int read_partial_message(struct ceph_connection *con) BUG_ON(!con->in_msg ^ skip); if (con->in_msg && data_len > con->in_msg->data_length) { - pr_warning("%s skipping long message (%u > %zd)\n", + pr_warn("%s skipping long message (%u > %zd)\n", __func__, data_len, con->in_msg->data_length); ceph_msg_put(con->in_msg); con->in_msg = NULL; @@ -2667,19 +2675,16 @@ static int queue_con_delay(struct ceph_connection *con, unsigned long delay) { if (!con->ops->get(con)) { dout("%s %p ref count 0\n", __func__, con); - return -ENOENT; } if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) { dout("%s %p - already queued\n", __func__, con); con->ops->put(con); - return -EBUSY; } dout("%s %p %lu\n", __func__, con, delay); - return 0; } @@ -2688,6 +2693,14 @@ static void queue_con(struct ceph_connection *con) (void) queue_con_delay(con, 0); } +static void cancel_con(struct ceph_connection *con) +{ + if (cancel_delayed_work(&con->work)) { + dout("%s %p\n", __func__, con); + con->ops->put(con); + } +} + static bool con_sock_closed(struct ceph_connection *con) { if (!con_flag_test_and_clear(con, CON_FLAG_SOCK_CLOSED)) @@ -2706,7 +2719,7 @@ static bool con_sock_closed(struct ceph_connection *con) CASE(OPEN); CASE(STANDBY); default: - pr_warning("%s con %p unrecognized state %lu\n", + pr_warn("%s con %p unrecognized state %lu\n", __func__, con, con->state); con->error_msg = "unrecognized con state"; BUG(); @@ -2759,8 +2772,11 @@ static void con_work(struct work_struct *work) { struct ceph_connection *con = container_of(work, struct ceph_connection, work.work); + unsigned long pflags = current->flags; bool fault; + current->flags |= PF_MEMALLOC; + mutex_lock(&con->mutex); while (true) { int ret; @@ -2814,6 +2830,8 @@ static void con_work(struct work_struct *work) con_fault_finish(con); con->ops->put(con); + + tsk_restore_flags(current, pflags, PF_MEMALLOC); } /* @@ -2822,8 +2840,8 @@ static void con_work(struct work_struct *work) */ static void con_fault(struct ceph_connection *con) { - pr_warning("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), - ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); + pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), + ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); dout("fault %p state %lu to peer %s\n", con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); @@ -3065,10 +3083,8 @@ static void ceph_msg_data_destroy(struct ceph_msg_data *data) return; WARN_ON(!list_empty(&data->links)); - if (data->type == CEPH_MSG_DATA_PAGELIST) { + if (data->type == CEPH_MSG_DATA_PAGELIST) ceph_pagelist_release(data->pagelist); - kfree(data->pagelist); - } kmem_cache_free(ceph_msg_data_cache, data); } @@ -3269,24 +3285,21 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip) /* * Free a generically kmalloc'd message. */ -void ceph_msg_kfree(struct ceph_msg *m) +static void ceph_msg_free(struct ceph_msg *m) { - dout("msg_kfree %p\n", m); + dout("%s %p\n", __func__, m); ceph_kvfree(m->front.iov_base); kmem_cache_free(ceph_msg_cache, m); } -/* - * Drop a msg ref. Destroy as needed. - */ -void ceph_msg_last_put(struct kref *kref) +static void ceph_msg_release(struct kref *kref) { struct ceph_msg *m = container_of(kref, struct ceph_msg, kref); LIST_HEAD(data); struct list_head *links; struct list_head *next; - dout("ceph_msg_put last one on %p\n", m); + dout("%s %p\n", __func__, m); WARN_ON(!list_empty(&m->list_head)); /* drop middle, data, if any */ @@ -3308,9 +3321,25 @@ void ceph_msg_last_put(struct kref *kref) if (m->pool) ceph_msgpool_put(m->pool, m); else - ceph_msg_kfree(m); + ceph_msg_free(m); +} + +struct ceph_msg *ceph_msg_get(struct ceph_msg *msg) +{ + dout("%s %p (was %d)\n", __func__, msg, + atomic_read(&msg->kref.refcount)); + kref_get(&msg->kref); + return msg; +} +EXPORT_SYMBOL(ceph_msg_get); + +void ceph_msg_put(struct ceph_msg *msg) +{ + dout("%s %p (was %d)\n", __func__, msg, + atomic_read(&msg->kref.refcount)); + kref_put(&msg->kref, ceph_msg_release); } -EXPORT_SYMBOL(ceph_msg_last_put); +EXPORT_SYMBOL(ceph_msg_put); void ceph_msg_dump(struct ceph_msg *msg) { |