summaryrefslogtreecommitdiffstats
path: root/net/ceph/messenger.c
diff options
context:
space:
mode:
authorAlex Elder <elder@inktank.com>2013-03-07 15:38:28 -0600
committerSage Weil <sage@inktank.com>2013-05-01 21:17:01 -0700
commite766d7b55e10f93c7bab298135a4e90dcc46620d (patch)
tree0b3bebb5ee06c1a63d2155c6ba3ce52f83d63fe7 /net/ceph/messenger.c
parent6aaa4511deb4b0fd776d1153dc63a89cdc024fb8 (diff)
libceph: implement pages array cursor
Implement and use cursor routines for page array message data items for outbound message data. Signed-off-by: Alex Elder <elder@inktank.com> Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
Diffstat (limited to 'net/ceph/messenger.c')
-rw-r--r--net/ceph/messenger.c93
1 files changed, 89 insertions, 4 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 209990a853e..d611156808b 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -831,6 +831,79 @@ static bool ceph_msg_data_bio_advance(struct ceph_msg_data *data, size_t bytes)
#endif
/*
+ * For a page array, a piece comes from the first page in the array
+ * that has not already been fully consumed.
+ */
+static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data *data)
+{
+ struct ceph_msg_data_cursor *cursor = &data->cursor;
+ int page_count;
+
+ BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
+
+ BUG_ON(!data->pages);
+ BUG_ON(!data->length);
+
+ page_count = calc_pages_for(data->alignment, (u64)data->length);
+ BUG_ON(page_count > (int) USHRT_MAX);
+ cursor->resid = data->length;
+ cursor->page_offset = data->alignment & ~PAGE_MASK;
+ cursor->page_index = 0;
+ cursor->page_count = (unsigned short) page_count;
+ cursor->last_piece = cursor->page_count == 1;
+}
+
+static struct page *ceph_msg_data_pages_next(struct ceph_msg_data *data,
+ size_t *page_offset,
+ size_t *length)
+{
+ struct ceph_msg_data_cursor *cursor = &data->cursor;
+
+ BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
+
+ BUG_ON(cursor->page_index >= cursor->page_count);
+ BUG_ON(cursor->page_offset >= PAGE_SIZE);
+ BUG_ON(!cursor->resid);
+
+ *page_offset = cursor->page_offset;
+ if (cursor->last_piece) {
+ BUG_ON(*page_offset + cursor->resid > PAGE_SIZE);
+ *length = cursor->resid;
+ } else {
+ *length = PAGE_SIZE - *page_offset;
+ }
+
+ return data->pages[cursor->page_index];
+}
+
+static bool ceph_msg_data_pages_advance(struct ceph_msg_data *data,
+ size_t bytes)
+{
+ struct ceph_msg_data_cursor *cursor = &data->cursor;
+
+ BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
+
+ BUG_ON(cursor->page_offset + bytes > PAGE_SIZE);
+ BUG_ON(bytes > cursor->resid);
+
+ /* Advance the cursor page offset */
+
+ cursor->resid -= bytes;
+ cursor->page_offset += bytes;
+ if (!bytes || cursor->page_offset & ~PAGE_MASK)
+ return false; /* more bytes to process in the current page */
+
+ /* Move on to the next page */
+
+ BUG_ON(cursor->page_index >= cursor->page_count);
+ cursor->page_offset = 0;
+ cursor->page_index++;
+ cursor->last_piece = cursor->page_index == cursor->page_count - 1;
+
+ return true;
+}
+
+/*
* For a pagelist, a piece is whatever remains to be consumed in the
* first page in the list, or the front of the next page.
*/
@@ -932,13 +1005,15 @@ static void ceph_msg_data_cursor_init(struct ceph_msg_data *data)
case CEPH_MSG_DATA_PAGELIST:
ceph_msg_data_pagelist_cursor_init(data);
break;
+ case CEPH_MSG_DATA_PAGES:
+ ceph_msg_data_pages_cursor_init(data);
+ break;
#ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO:
ceph_msg_data_bio_cursor_init(data);
break;
#endif /* CONFIG_BLOCK */
case CEPH_MSG_DATA_NONE:
- case CEPH_MSG_DATA_PAGES:
default:
/* BUG(); */
break;
@@ -961,13 +1036,15 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
case CEPH_MSG_DATA_PAGELIST:
page = ceph_msg_data_pagelist_next(data, page_offset, length);
break;
+ case CEPH_MSG_DATA_PAGES:
+ page = ceph_msg_data_pages_next(data, page_offset, length);
+ break;
#ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO:
page = ceph_msg_data_bio_next(data, page_offset, length);
break;
#endif /* CONFIG_BLOCK */
case CEPH_MSG_DATA_NONE:
- case CEPH_MSG_DATA_PAGES:
default:
page = NULL;
break;
@@ -993,13 +1070,15 @@ static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
case CEPH_MSG_DATA_PAGELIST:
new_piece = ceph_msg_data_pagelist_advance(data, bytes);
break;
+ case CEPH_MSG_DATA_PAGES:
+ new_piece = ceph_msg_data_pages_advance(data, bytes);
+ break;
#ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO:
new_piece = ceph_msg_data_bio_advance(data, bytes);
break;
#endif /* CONFIG_BLOCK */
case CEPH_MSG_DATA_NONE:
- case CEPH_MSG_DATA_PAGES:
default:
BUG();
break;
@@ -1032,6 +1111,8 @@ static void prepare_message_data(struct ceph_msg *msg,
if (ceph_msg_has_bio(msg))
ceph_msg_data_cursor_init(&msg->b);
#endif /* CONFIG_BLOCK */
+ if (ceph_msg_has_pages(msg))
+ ceph_msg_data_cursor_init(&msg->p);
if (ceph_msg_has_pagelist(msg))
ceph_msg_data_cursor_init(&msg->l);
if (ceph_msg_has_trail(msg))
@@ -1330,6 +1411,8 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
msg_pos->page_pos += sent;
if (in_trail)
need_crc = ceph_msg_data_advance(&msg->t, sent);
+ else if (ceph_msg_has_pages(msg))
+ need_crc = ceph_msg_data_advance(&msg->p, sent);
else if (ceph_msg_has_pagelist(msg))
need_crc = ceph_msg_data_advance(&msg->l, sent);
#ifdef CONFIG_BLOCK
@@ -1435,7 +1518,9 @@ static int write_partial_message_data(struct ceph_connection *con)
page = ceph_msg_data_next(&msg->t, &page_offset,
&length, &last_piece);
} else if (ceph_msg_has_pages(msg)) {
- page = msg->p.pages[msg_pos->page];
+ use_cursor = true;
+ page = ceph_msg_data_next(&msg->p, &page_offset,
+ &length, &last_piece);
} else if (ceph_msg_has_pagelist(msg)) {
use_cursor = true;
page = ceph_msg_data_next(&msg->l, &page_offset,