diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index c1570fada3d8ec..a1e2813731d14f 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -82,10 +82,6 @@ static int ceph_set_page_dirty(struct page *page) struct inode *inode; struct ceph_inode_info *ci; struct ceph_snap_context *snapc; - int ret; - - if (unlikely(!mapping)) - return !TestSetPageDirty(page); if (PageDirty(page)) { dout("%p set_page_dirty %p idx %lu -- already dirty\n", @@ -130,11 +126,7 @@ static int ceph_set_page_dirty(struct page *page) BUG_ON(PagePrivate(page)); attach_page_private(page, snapc); - ret = __set_page_dirty_nobuffers(page); - WARN_ON(!PageLocked(page)); - WARN_ON(!page->mapping); - - return ret; + return __set_page_dirty_nobuffers(page); } /* @@ -226,7 +218,7 @@ static void finish_netfs_read(struct ceph_osd_request *req) int err = req->r_result; ceph_update_read_metrics(&fsc->mdsc->metric, req->r_start_latency, - req->r_end_latency, err); + req->r_end_latency, osd_data->length, err); dout("%s: result %d subreq->len=%zu i_size=%lld\n", __func__, req->r_result, subreq->len, i_size_read(req->r_inode)); @@ -313,7 +305,7 @@ static void ceph_readahead_cleanup(struct address_space *mapping, void *priv) ceph_put_cap_refs(ci, got); } -const struct netfs_read_request_ops ceph_netfs_read_ops = { +static const struct netfs_read_request_ops ceph_netfs_read_ops = { .init_rreq = ceph_init_rreq, .is_cache_enabled = ceph_is_cache_enabled, .begin_cache_operation = ceph_begin_cache_operation, @@ -560,7 +552,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) err = ceph_osdc_wait_request(osdc, req); ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency, - req->r_end_latency, err); + req->r_end_latency, len, err); ceph_osdc_put_request(req); if (err == 0) @@ -635,6 +627,7 @@ static void writepages_finish(struct ceph_osd_request *req) struct ceph_snap_context *snapc = req->r_snapc; struct address_space *mapping = inode->i_mapping; struct ceph_fs_client *fsc = ceph_inode_to_client(inode); + unsigned int len = 0; bool remove_page; dout("writepages_finish %p rc %d\n", inode, rc); @@ -647,9 +640,6 @@ static void writepages_finish(struct ceph_osd_request *req) ceph_clear_error_write(ci); } - ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency, - req->r_end_latency, rc); - /* * We lost the cache cap, need to truncate the page before * it is unlocked, otherwise we'd truncate it later in the @@ -666,6 +656,7 @@ static void writepages_finish(struct ceph_osd_request *req) osd_data = osd_req_op_extent_osd_data(req, i); BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES); + len += osd_data->length; num_pages = calc_pages_for((u64)osd_data->alignment, (u64)osd_data->length); total_pages += num_pages; @@ -696,6 +687,9 @@ static void writepages_finish(struct ceph_osd_request *req) release_pages(osd_data->pages, num_pages); } + ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency, + req->r_end_latency, len, rc); + ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc); osd_data = osd_req_op_extent_osd_data(req, 0); @@ -1711,7 +1705,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page) err = ceph_osdc_wait_request(&fsc->client->osdc, req); ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency, - req->r_end_latency, err); + req->r_end_latency, len, err); out_put: ceph_osdc_put_request(req); diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index a5e93b18551586..7bdefd0c789a66 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -645,9 +645,7 @@ void ceph_add_cap(struct inode *inode, dout("add_cap %p mds%d cap %llx %s seq %d\n", inode, session->s_mds, cap_id, ceph_cap_string(issued), seq); - spin_lock(&session->s_gen_ttl_lock); - gen = session->s_cap_gen; - spin_unlock(&session->s_gen_ttl_lock); + gen = atomic_read(&session->s_cap_gen); cap = __get_cap_for_mds(ci, mds); if (!cap) { @@ -785,10 +783,8 @@ static int __cap_is_valid(struct ceph_cap *cap) unsigned long ttl; u32 gen; - spin_lock(&cap->session->s_gen_ttl_lock); - gen = cap->session->s_cap_gen; + gen = atomic_read(&cap->session->s_cap_gen); ttl = cap->session->s_cap_ttl; - spin_unlock(&cap->session->s_gen_ttl_lock); if (cap->cap_gen < gen || time_after_eq(jiffies, ttl)) { dout("__cap_is_valid %p cap %p issued %s " @@ -1182,7 +1178,8 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release) * s_cap_gen while session is in the reconnect state. */ if (queue_release && - (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) { + (!session->s_cap_reconnect || + cap->cap_gen == atomic_read(&session->s_cap_gen))) { cap->queue_release = 1; if (removed) { __ceph_queue_cap_release(session, cap); @@ -1534,7 +1531,7 @@ static inline int __send_flush_snap(struct inode *inode, * asynchronously back to the MDS once sync writes complete and dirty * data is written out. * - * Called under i_ceph_lock. Takes s_mutex as needed. + * Called under i_ceph_lock. */ static void __ceph_flush_snaps(struct ceph_inode_info *ci, struct ceph_mds_session *session) @@ -1656,7 +1653,6 @@ void ceph_flush_snaps(struct ceph_inode_info *ci, mds = ci->i_auth_cap->session->s_mds; if (session && session->s_mds != mds) { dout(" oops, wrong session %p mutex\n", session); - mutex_unlock(&session->s_mutex); ceph_put_mds_session(session); session = NULL; } @@ -1665,10 +1661,6 @@ void ceph_flush_snaps(struct ceph_inode_info *ci, mutex_lock(&mdsc->mutex); session = __ceph_lookup_mds_session(mdsc, mds); mutex_unlock(&mdsc->mutex); - if (session) { - dout(" inverting session/ino locks on %p\n", session); - mutex_lock(&session->s_mutex); - } goto retry; } @@ -1680,12 +1672,10 @@ void ceph_flush_snaps(struct ceph_inode_info *ci, out: spin_unlock(&ci->i_ceph_lock); - if (psession) { + if (psession) *psession = session; - } else if (session) { - mutex_unlock(&session->s_mutex); + else ceph_put_mds_session(session); - } /* we flushed them all; remove this inode from the queue */ spin_lock(&mdsc->snap_flush_lock); list_del_init(&ci->i_snap_flush_item); @@ -1915,7 +1905,6 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, struct ceph_cap *cap; u64 flush_tid, oldest_flush_tid; int file_wanted, used, cap_used; - int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */ int issued, implemented, want, retain, revoking, flushing = 0; int mds = -1; /* keep track of how far we've gone through i_caps list to avoid an infinite loop on retry */ @@ -1923,14 +1912,13 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, bool queue_invalidate = false; bool tried_invalidate = false; + if (session) + ceph_get_mds_session(session); + spin_lock(&ci->i_ceph_lock); if (ci->i_ceph_flags & CEPH_I_FLUSH) flags |= CHECK_CAPS_FLUSH; - - goto retry_locked; retry: - spin_lock(&ci->i_ceph_lock); -retry_locked: /* Caps wanted by virtue of active open files. */ file_wanted = __ceph_caps_file_wanted(ci); @@ -2010,7 +1998,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, ci->i_rdcache_revoking = ci->i_rdcache_gen; } tried_invalidate = true; - goto retry_locked; + goto retry; } for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { @@ -2024,8 +2012,6 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, ((flags & CHECK_CAPS_AUTHONLY) && cap != ci->i_auth_cap)) continue; - /* NOTE: no side-effects allowed, until we take s_mutex */ - /* * If we have an auth cap, we don't need to consider any * overlapping caps as used. @@ -2088,37 +2074,8 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, continue; /* nope, all good */ ack: - if (session && session != cap->session) { - dout("oops, wrong session %p mutex\n", session); - mutex_unlock(&session->s_mutex); - session = NULL; - } - if (!session) { - session = cap->session; - if (mutex_trylock(&session->s_mutex) == 0) { - dout("inverting session/ino locks on %p\n", - session); - session = ceph_get_mds_session(session); - spin_unlock(&ci->i_ceph_lock); - if (took_snap_rwsem) { - up_read(&mdsc->snap_rwsem); - took_snap_rwsem = 0; - } - if (session) { - mutex_lock(&session->s_mutex); - ceph_put_mds_session(session); - } else { - /* - * Because we take the reference while - * holding the i_ceph_lock, it should - * never be NULL. Throw a warning if it - * ever is. - */ - WARN_ON_ONCE(true); - } - goto retry; - } - } + ceph_put_mds_session(session); + session = ceph_get_mds_session(cap->session); /* kick flushing and flush snaps before sending normal * cap message */ @@ -2130,20 +2087,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) __ceph_flush_snaps(ci, session); - goto retry_locked; - } - - /* take snap_rwsem after session mutex */ - if (!took_snap_rwsem) { - if (down_read_trylock(&mdsc->snap_rwsem) == 0) { - dout("inverting snap/in locks on %p\n", - inode); - spin_unlock(&ci->i_ceph_lock); - down_read(&mdsc->snap_rwsem); - took_snap_rwsem = 1; - goto retry; - } - took_snap_rwsem = 1; + goto retry; } if (cap == ci->i_auth_cap && ci->i_dirty_caps) { @@ -2165,9 +2109,10 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, __prep_cap(&arg, cap, CEPH_CAP_OP_UPDATE, mflags, cap_used, want, retain, flushing, flush_tid, oldest_flush_tid); - spin_unlock(&ci->i_ceph_lock); + spin_unlock(&ci->i_ceph_lock); __send_cap(&arg, ci); + spin_lock(&ci->i_ceph_lock); goto retry; /* retake i_ceph_lock and restart our cap scan. */ } @@ -2182,13 +2127,9 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, spin_unlock(&ci->i_ceph_lock); + ceph_put_mds_session(session); if (queue_invalidate) ceph_queue_invalidate(inode); - - if (session) - mutex_unlock(&session->s_mutex); - if (took_snap_rwsem) - up_read(&mdsc->snap_rwsem); } /* @@ -2198,26 +2139,17 @@ static int try_flush_caps(struct inode *inode, u64 *ptid) { struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_mds_session *session = NULL; int flushing = 0; u64 flush_tid = 0, oldest_flush_tid = 0; -retry: spin_lock(&ci->i_ceph_lock); retry_locked: if (ci->i_dirty_caps && ci->i_auth_cap) { struct ceph_cap *cap = ci->i_auth_cap; struct cap_msg_args arg; + struct ceph_mds_session *session = cap->session; - if (session != cap->session) { - spin_unlock(&ci->i_ceph_lock); - if (session) - mutex_unlock(&session->s_mutex); - session = cap->session; - mutex_lock(&session->s_mutex); - goto retry; - } - if (cap->session->s_state < CEPH_MDS_SESSION_OPEN) { + if (session->s_state < CEPH_MDS_SESSION_OPEN) { spin_unlock(&ci->i_ceph_lock); goto out; } @@ -2254,9 +2186,6 @@ static int try_flush_caps(struct inode *inode, u64 *ptid) spin_unlock(&ci->i_ceph_lock); } out: - if (session) - mutex_unlock(&session->s_mutex); - *ptid = flush_tid; return flushing; } @@ -3213,8 +3142,7 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, if (complete_capsnap) wake_up_all(&ci->i_cap_wq); while (put-- > 0) { - /* avoid calling iput_final() in osd dispatch threads */ - ceph_async_iput(inode); + iput(inode); } } @@ -3288,7 +3216,7 @@ static void handle_cap_grant(struct inode *inode, u64 size = le64_to_cpu(grant->size); u64 max_size = le64_to_cpu(grant->max_size); unsigned char check_caps = 0; - bool was_stale = cap->cap_gen < session->s_cap_gen; + bool was_stale = cap->cap_gen < atomic_read(&session->s_cap_gen); bool wake = false; bool writeback = false; bool queue_trunc = false; @@ -3340,7 +3268,7 @@ static void handle_cap_grant(struct inode *inode, } /* side effects now are allowed */ - cap->cap_gen = session->s_cap_gen; + cap->cap_gen = atomic_read(&session->s_cap_gen); cap->seq = seq; __check_cap_issue(ci, cap, newcaps); @@ -3553,13 +3481,12 @@ static void handle_cap_grant(struct inode *inode, if (wake) wake_up_all(&ci->i_cap_wq); + mutex_unlock(&session->s_mutex); if (check_caps == 1) ceph_check_caps(ci, CHECK_CAPS_AUTHONLY | CHECK_CAPS_NOINVAL, session); else if (check_caps == 2) ceph_check_caps(ci, CHECK_CAPS_NOINVAL, session); - else - mutex_unlock(&session->s_mutex); } /* @@ -4203,8 +4130,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, mutex_unlock(&session->s_mutex); done_unlocked: ceph_put_string(extra_info.pool_ns); - /* avoid calling iput_final() in mds dispatch threads */ - ceph_async_iput(inode); + iput(inode); return; flush_cap_releases: @@ -4246,8 +4172,7 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc) spin_unlock(&mdsc->cap_delay_lock); dout("check_delayed_caps on %p\n", inode); ceph_check_caps(ci, 0, NULL); - /* avoid calling iput_final() in tick thread */ - ceph_async_iput(inode); + iput(inode); spin_lock(&mdsc->cap_delay_lock); } } diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c index 425f3356332a1e..38b78b45811f91 100644 --- a/fs/ceph/debugfs.c +++ b/fs/ceph/debugfs.c @@ -127,7 +127,7 @@ static int mdsc_show(struct seq_file *s, void *p) return 0; } -#define CEPH_METRIC_SHOW(name, total, avg, min, max, sq) { \ +#define CEPH_LAT_METRIC_SHOW(name, total, avg, min, max, sq) { \ s64 _total, _avg, _min, _max, _sq, _st; \ _avg = ktime_to_us(avg); \ _min = ktime_to_us(min == KTIME_MAX ? 0 : min); \ @@ -140,6 +140,12 @@ static int mdsc_show(struct seq_file *s, void *p) name, total, _avg, _min, _max, _st); \ } +#define CEPH_SZ_METRIC_SHOW(name, total, avg, min, max, sum) { \ + u64 _min = min == U64_MAX ? 0 : min; \ + seq_printf(s, "%-14s%-12lld%-16llu%-16llu%-16llu%llu\n", \ + name, total, avg, _min, max, sum); \ +} + static int metric_show(struct seq_file *s, void *p) { struct ceph_fs_client *fsc = s->private; @@ -147,6 +153,7 @@ static int metric_show(struct seq_file *s, void *p) struct ceph_client_metric *m = &mdsc->metric; int nr_caps = 0; s64 total, sum, avg, min, max, sq; + u64 sum_sz, avg_sz, min_sz, max_sz; sum = percpu_counter_sum(&m->total_inodes); seq_printf(s, "item total\n"); @@ -170,7 +177,7 @@ static int metric_show(struct seq_file *s, void *p) max = m->read_latency_max; sq = m->read_latency_sq_sum; spin_unlock(&m->read_metric_lock); - CEPH_METRIC_SHOW("read", total, avg, min, max, sq); + CEPH_LAT_METRIC_SHOW("read", total, avg, min, max, sq); spin_lock(&m->write_metric_lock); total = m->total_writes; @@ -180,7 +187,7 @@ static int metric_show(struct seq_file *s, void *p) max = m->write_latency_max; sq = m->write_latency_sq_sum; spin_unlock(&m->write_metric_lock); - CEPH_METRIC_SHOW("write", total, avg, min, max, sq); + CEPH_LAT_METRIC_SHOW("write", total, avg, min, max, sq); spin_lock(&m->metadata_metric_lock); total = m->total_metadatas; @@ -190,7 +197,29 @@ static int metric_show(struct seq_file *s, void *p) max = m->metadata_latency_max; sq = m->metadata_latency_sq_sum; spin_unlock(&m->metadata_metric_lock); - CEPH_METRIC_SHOW("metadata", total, avg, min, max, sq); + CEPH_LAT_METRIC_SHOW("metadata", total, avg, min, max, sq); + + seq_printf(s, "\n"); + seq_printf(s, "item total avg_sz(bytes) min_sz(bytes) max_sz(bytes) total_sz(bytes)\n"); + seq_printf(s, "----------------------------------------------------------------------------------------\n"); + + spin_lock(&m->read_metric_lock); + total = m->total_reads; + sum_sz = m->read_size_sum; + avg_sz = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum_sz, total) : 0; + min_sz = m->read_size_min; + max_sz = m->read_size_max; + spin_unlock(&m->read_metric_lock); + CEPH_SZ_METRIC_SHOW("read", total, avg_sz, min_sz, max_sz, sum_sz); + + spin_lock(&m->write_metric_lock); + total = m->total_writes; + sum_sz = m->write_size_sum; + avg_sz = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum_sz, total) : 0; + min_sz = m->write_size_min; + max_sz = m->write_size_max; + spin_unlock(&m->write_metric_lock); + CEPH_SZ_METRIC_SHOW("write", total, avg_sz, min_sz, max_sz, sum_sz); seq_printf(s, "\n"); seq_printf(s, "item total miss hit\n"); diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 9ba79b6531fba5..133dbd9338e730 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -788,6 +788,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry, mask |= CEPH_CAP_XATTR_SHARED; req->r_args.getattr.mask = cpu_to_le32(mask); + ihold(dir); req->r_parent = dir; set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); err = ceph_mdsc_do_request(mdsc, NULL, req); @@ -868,6 +869,7 @@ static int ceph_mknod(struct user_namespace *mnt_userns, struct inode *dir, req->r_dentry = dget(dentry); req->r_num_caps = 2; req->r_parent = dir; + ihold(dir); set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); req->r_args.mknod.mode = cpu_to_le32(mode); req->r_args.mknod.rdev = cpu_to_le32(rdev); @@ -929,6 +931,8 @@ static int ceph_symlink(struct user_namespace *mnt_userns, struct inode *dir, goto out; } req->r_parent = dir; + ihold(dir); + set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); req->r_dentry = dget(dentry); req->r_num_caps = 2; @@ -993,6 +997,7 @@ static int ceph_mkdir(struct user_namespace *mnt_userns, struct inode *dir, req->r_dentry = dget(dentry); req->r_num_caps = 2; req->r_parent = dir; + ihold(dir); set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); req->r_args.mkdir.mode = cpu_to_le32(mode); req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL; @@ -1037,6 +1042,7 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir, req->r_num_caps = 2; req->r_old_dentry = dget(old_dentry); req->r_parent = dir; + ihold(dir); set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; @@ -1158,6 +1164,7 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry) req->r_dentry = dget(dentry); req->r_num_caps = 2; req->r_parent = dir; + ihold(dir); req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; req->r_inode_drop = ceph_drop_caps_for_unlink(inode); @@ -1232,6 +1239,7 @@ static int ceph_rename(struct user_namespace *mnt_userns, struct inode *old_dir, req->r_old_dentry = dget(old_dentry); req->r_old_dentry_dir = old_dir; req->r_parent = new_dir; + ihold(new_dir); set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL; @@ -1548,10 +1556,8 @@ static bool __dentry_lease_is_valid(struct ceph_dentry_info *di) u32 gen; unsigned long ttl; - spin_lock(&session->s_gen_ttl_lock); - gen = session->s_cap_gen; + gen = atomic_read(&session->s_cap_gen); ttl = session->s_cap_ttl; - spin_unlock(&session->s_gen_ttl_lock); if (di->lease_gen == gen && time_before(jiffies, ttl) && @@ -1730,6 +1736,7 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) req->r_dentry = dget(dentry); req->r_num_caps = 2; req->r_parent = dir; + ihold(dir); mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED; if (ceph_security_xattr_wanted(dir)) @@ -1809,8 +1816,7 @@ static void ceph_d_release(struct dentry *dentry) dentry->d_fsdata = NULL; spin_unlock(&dentry->d_lock); - if (di->lease_session) - ceph_put_mds_session(di->lease_session); + ceph_put_mds_session(di->lease_session); kmem_cache_free(ceph_dentry_cachep, di); } diff --git a/fs/ceph/export.c b/fs/ceph/export.c index 65540a4429b26a..1d65934c12620e 100644 --- a/fs/ceph/export.c +++ b/fs/ceph/export.c @@ -542,6 +542,7 @@ static int ceph_get_name(struct dentry *parent, char *name, ihold(inode); req->r_ino2 = ceph_vino(d_inode(parent)); req->r_parent = d_inode(parent); + ihold(req->r_parent); set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); req->r_num_caps = 2; err = ceph_mdsc_do_request(mdsc, NULL, req); diff --git a/fs/ceph/file.c b/fs/ceph/file.c index d51af36980324e..d1755ac1d964aa 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -706,6 +706,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, mask |= CEPH_CAP_XATTR_SHARED; req->r_args.open.mask = cpu_to_le32(mask); req->r_parent = dir; + ihold(dir); if (flags & O_CREAT) { struct ceph_file_layout lo; @@ -903,7 +904,7 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, ceph_update_read_metrics(&fsc->mdsc->metric, req->r_start_latency, req->r_end_latency, - ret); + len, ret); ceph_osdc_put_request(req); @@ -1035,12 +1036,12 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) struct ceph_aio_request *aio_req = req->r_priv; struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0); struct ceph_client_metric *metric = &ceph_sb_to_mdsc(inode->i_sb)->metric; + unsigned int len = osd_data->bvec_pos.iter.bi_size; BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_BVECS); BUG_ON(!osd_data->num_bvecs); - dout("ceph_aio_complete_req %p rc %d bytes %u\n", - inode, rc, osd_data->bvec_pos.iter.bi_size); + dout("ceph_aio_complete_req %p rc %d bytes %u\n", inode, rc, len); if (rc == -EOLDSNAPC) { struct ceph_aio_work *aio_work; @@ -1058,9 +1059,9 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) } else if (!aio_req->write) { if (rc == -ENOENT) rc = 0; - if (rc >= 0 && osd_data->bvec_pos.iter.bi_size > rc) { + if (rc >= 0 && len > rc) { struct iov_iter i; - int zlen = osd_data->bvec_pos.iter.bi_size - rc; + int zlen = len - rc; /* * If read is satisfied by single OSD request, @@ -1077,8 +1078,7 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) } iov_iter_bvec(&i, READ, osd_data->bvec_pos.bvecs, - osd_data->num_bvecs, - osd_data->bvec_pos.iter.bi_size); + osd_data->num_bvecs, len); iov_iter_advance(&i, rc); iov_iter_zero(zlen, &i); } @@ -1088,10 +1088,10 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) if (req->r_start_latency) { if (aio_req->write) ceph_update_write_metrics(metric, req->r_start_latency, - req->r_end_latency, rc); + req->r_end_latency, len, rc); else ceph_update_read_metrics(metric, req->r_start_latency, - req->r_end_latency, rc); + req->r_end_latency, len, rc); } put_bvecs(osd_data->bvec_pos.bvecs, osd_data->num_bvecs, @@ -1299,10 +1299,10 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, if (write) ceph_update_write_metrics(metric, req->r_start_latency, - req->r_end_latency, ret); + req->r_end_latency, len, ret); else ceph_update_read_metrics(metric, req->r_start_latency, - req->r_end_latency, ret); + req->r_end_latency, len, ret); size = i_size_read(inode); if (!write) { @@ -1476,7 +1476,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, ret = ceph_osdc_wait_request(&fsc->client->osdc, req); ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency, - req->r_end_latency, ret); + req->r_end_latency, len, ret); out: ceph_osdc_put_request(req); if (ret != 0) { diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index df0c8a724609d3..1bd2cc015913f9 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -1124,7 +1124,7 @@ static void __update_dentry_lease(struct inode *dir, struct dentry *dentry, return; } - if (di->lease_gen == session->s_cap_gen && + if (di->lease_gen == atomic_read(&session->s_cap_gen) && time_before(ttl, di->time)) return; /* we already have a newer lease. */ @@ -1135,7 +1135,7 @@ static void __update_dentry_lease(struct inode *dir, struct dentry *dentry, if (!di->lease_session) di->lease_session = ceph_get_mds_session(session); - di->lease_gen = session->s_cap_gen; + di->lease_gen = atomic_read(&session->s_cap_gen); di->lease_seq = le32_to_cpu(lease->seq); di->lease_renew_after = half_ttl; di->lease_renew_from = 0; @@ -1154,8 +1154,7 @@ static inline void update_dentry_lease(struct inode *dir, struct dentry *dentry, __update_dentry_lease(dir, dentry, lease, session, from_time, &old_lease_session); spin_unlock(&dentry->d_lock); - if (old_lease_session) - ceph_put_mds_session(old_lease_session); + ceph_put_mds_session(old_lease_session); } /* @@ -1200,8 +1199,7 @@ static void update_dentry_lease_careful(struct dentry *dentry, from_time, &old_lease_session); out_unlock: spin_unlock(&dentry->d_lock); - if (old_lease_session) - ceph_put_mds_session(old_lease_session); + ceph_put_mds_session(old_lease_session); } /* @@ -1568,8 +1566,7 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req, unlock_new_inode(in); } - /* avoid calling iput_final() in mds dispatch threads */ - ceph_async_iput(in); + iput(in); } return err; @@ -1766,13 +1763,11 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, if (ret < 0) { pr_err("ceph_fill_inode badness on %p\n", in); if (d_really_is_negative(dn)) { - /* avoid calling iput_final() in mds - * dispatch threads */ if (in->i_state & I_NEW) { ihold(in); discard_new_inode(in); } - ceph_async_iput(in); + iput(in); } d_drop(dn); err = ret; @@ -1785,7 +1780,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, if (ceph_security_xattr_deadlock(in)) { dout(" skip splicing dn %p to inode %p" " (security xattr deadlock)\n", dn, in); - ceph_async_iput(in); + iput(in); skipped++; goto next_item; } @@ -1836,25 +1831,6 @@ bool ceph_inode_set_size(struct inode *inode, loff_t size) return ret; } -/* - * Put reference to inode, but avoid calling iput_final() in current thread. - * iput_final() may wait for reahahead pages. The wait can cause deadlock in - * some contexts. - */ -void ceph_async_iput(struct inode *inode) -{ - if (!inode) - return; - for (;;) { - if (atomic_add_unless(&inode->i_count, -1, 1)) - break; - if (queue_work(ceph_inode_to_client(inode)->inode_wq, - &ceph_inode(inode)->i_work)) - break; - /* queue work failed, i_count must be at least 2 */ - } -} - void ceph_queue_inode_work(struct inode *inode, int work_bit) { struct ceph_fs_client *fsc = ceph_inode_to_client(inode); diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index e5af591d3bd453..a818213c972fc0 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -664,6 +664,9 @@ struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s) void ceph_put_mds_session(struct ceph_mds_session *s) { + if (IS_ERR_OR_NULL(s)) + return; + dout("mdsc put_session %p %d -> %d\n", s, refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1); if (refcount_dec_and_test(&s->s_ref)) { @@ -746,8 +749,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); - spin_lock_init(&s->s_gen_ttl_lock); - s->s_cap_gen = 1; + atomic_set(&s->s_cap_gen, 1); s->s_cap_ttl = jiffies - 1; spin_lock_init(&s->s_cap_lock); @@ -822,14 +824,13 @@ void ceph_mdsc_release_request(struct kref *kref) ceph_msg_put(req->r_reply); if (req->r_inode) { ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); - /* avoid calling iput_final() in mds dispatch threads */ - ceph_async_iput(req->r_inode); + iput(req->r_inode); } if (req->r_parent) { ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); - ceph_async_iput(req->r_parent); + iput(req->r_parent); } - ceph_async_iput(req->r_target_inode); + iput(req->r_target_inode); if (req->r_dentry) dput(req->r_dentry); if (req->r_old_dentry) @@ -843,7 +844,7 @@ void ceph_mdsc_release_request(struct kref *kref) */ ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), CEPH_CAP_PIN); - ceph_async_iput(req->r_old_dentry_dir); + iput(req->r_old_dentry_dir); } kfree(req->r_path1); kfree(req->r_path2); @@ -958,8 +959,7 @@ static void __unregister_request(struct ceph_mds_client *mdsc, } if (req->r_unsafe_dir) { - /* avoid calling iput_final() in mds dispatch threads */ - ceph_async_iput(req->r_unsafe_dir); + iput(req->r_unsafe_dir); req->r_unsafe_dir = NULL; } @@ -1130,7 +1130,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc, cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); if (!cap) { spin_unlock(&ci->i_ceph_lock); - ceph_async_iput(inode); + iput(inode); goto random; } mds = cap->session->s_mds; @@ -1139,9 +1139,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc, cap == ci->i_auth_cap ? "auth " : "", cap); spin_unlock(&ci->i_ceph_lock); out: - /* avoid calling iput_final() while holding mdsc->mutex or - * in mds dispatch threads */ - ceph_async_iput(inode); + iput(inode); return mds; random: @@ -1438,8 +1436,7 @@ static void __open_export_target_sessions(struct ceph_mds_client *mdsc, for (i = 0; i < mi->num_export_targets; i++) { ts = __open_export_target_session(mdsc, mi->export_targets[i]); - if (!IS_ERR(ts)) - ceph_put_mds_session(ts); + ceph_put_mds_session(ts); } } @@ -1545,9 +1542,7 @@ int ceph_iterate_session_caps(struct ceph_mds_session *session, spin_unlock(&session->s_cap_lock); if (last_inode) { - /* avoid calling iput_final() while holding - * s_mutex or in mds dispatch threads */ - ceph_async_iput(last_inode); + iput(last_inode); last_inode = NULL; } if (old_cap) { @@ -1581,7 +1576,7 @@ int ceph_iterate_session_caps(struct ceph_mds_session *session, session->s_cap_iterator = NULL; spin_unlock(&session->s_cap_lock); - ceph_async_iput(last_inode); + iput(last_inode); if (old_cap) ceph_put_cap(session->s_mdsc, old_cap); @@ -1721,8 +1716,7 @@ static void remove_session_caps(struct ceph_mds_session *session) spin_unlock(&session->s_cap_lock); inode = ceph_find_inode(sb, vino); - /* avoid calling iput_final() while holding s_mutex */ - ceph_async_iput(inode); + iput(inode); spin_lock(&session->s_cap_lock); } @@ -1761,7 +1755,7 @@ static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, ci->i_requested_max_size = 0; spin_unlock(&ci->i_ceph_lock); } else if (ev == RENEWCAPS) { - if (cap->cap_gen < cap->session->s_cap_gen) { + if (cap->cap_gen < atomic_read(&cap->session->s_cap_gen)) { /* mds did not re-issue stale cap */ spin_lock(&ci->i_ceph_lock); cap->issued = cap->implemented = CEPH_CAP_PIN; @@ -2988,7 +2982,6 @@ int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, ceph_take_cap_refs(ci, CEPH_CAP_PIN, false); __ceph_touch_fmode(ci, mdsc, fmode); spin_unlock(&ci->i_ceph_lock); - ihold(req->r_parent); } if (req->r_old_dentry_dir) ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), @@ -3499,10 +3492,8 @@ static void handle_session(struct ceph_mds_session *session, case CEPH_SESSION_STALE: pr_info("mds%d caps went stale, renewing\n", session->s_mds); - spin_lock(&session->s_gen_ttl_lock); - session->s_cap_gen++; + atomic_inc(&session->s_cap_gen); session->s_cap_ttl = jiffies - 1; - spin_unlock(&session->s_gen_ttl_lock); send_renew_caps(mdsc, session); break; @@ -3771,7 +3762,7 @@ static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap, cap->seq = 0; /* reset cap seq */ cap->issue_seq = 0; /* and issue_seq */ cap->mseq = 0; /* and migrate_seq */ - cap->cap_gen = cap->session->s_cap_gen; + cap->cap_gen = atomic_read(&cap->session->s_cap_gen); /* These are lost when the session goes away */ if (S_ISDIR(inode->i_mode)) { @@ -4011,9 +4002,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, dout("session %p state %s\n", session, ceph_session_state_name(session->s_state)); - spin_lock(&session->s_gen_ttl_lock); - session->s_cap_gen++; - spin_unlock(&session->s_gen_ttl_lock); + atomic_inc(&session->s_cap_gen); spin_lock(&session->s_cap_lock); /* don't know if session is readonly */ @@ -4344,7 +4333,7 @@ static void handle_lease(struct ceph_mds_client *mdsc, case CEPH_MDS_LEASE_RENEW: if (di->lease_session == session && - di->lease_gen == session->s_cap_gen && + di->lease_gen == atomic_read(&session->s_cap_gen) && di->lease_renew_from && di->lease_renew_after == 0) { unsigned long duration = @@ -4372,8 +4361,7 @@ static void handle_lease(struct ceph_mds_client *mdsc, out: mutex_unlock(&session->s_mutex); - /* avoid calling iput_final() in mds dispatch threads */ - ceph_async_iput(inode); + iput(inode); return; bad: diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 15c11a0f2caf8e..20e42d8b66c6cd 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -186,10 +186,8 @@ struct ceph_mds_session { struct ceph_auth_handshake s_auth; - /* protected by s_gen_ttl_lock */ - spinlock_t s_gen_ttl_lock; - u32 s_cap_gen; /* inc each time we get mds stale msg */ - unsigned long s_cap_ttl; /* when session caps expire */ + atomic_t s_cap_gen; /* inc each time we get mds stale msg */ + unsigned long s_cap_ttl; /* when session caps expire. protected by s_mutex */ /* protected by s_cap_lock */ spinlock_t s_cap_lock; diff --git a/fs/ceph/metric.c b/fs/ceph/metric.c index 28b6b42ad6777d..5ac151eb0d498d 100644 --- a/fs/ceph/metric.c +++ b/fs/ceph/metric.c @@ -20,8 +20,11 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc, struct ceph_opened_files *files; struct ceph_pinned_icaps *icaps; struct ceph_opened_inodes *inodes; + struct ceph_read_io_size *rsize; + struct ceph_write_io_size *wsize; struct ceph_client_metric *m = &mdsc->metric; u64 nr_caps = atomic64_read(&m->total_caps); + u32 header_len = sizeof(struct ceph_metric_header); struct ceph_msg *msg; struct timespec64 ts; s64 sum; @@ -30,7 +33,8 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc, len = sizeof(*head) + sizeof(*cap) + sizeof(*read) + sizeof(*write) + sizeof(*meta) + sizeof(*dlease) + sizeof(*files) - + sizeof(*icaps) + sizeof(*inodes); + + sizeof(*icaps) + sizeof(*inodes) + sizeof(*rsize) + + sizeof(*wsize); msg = ceph_msg_new(CEPH_MSG_CLIENT_METRICS, len, GFP_NOFS, true); if (!msg) { @@ -43,10 +47,10 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc, /* encode the cap metric */ cap = (struct ceph_metric_cap *)(head + 1); - cap->type = cpu_to_le32(CLIENT_METRIC_TYPE_CAP_INFO); - cap->ver = 1; - cap->compat = 1; - cap->data_len = cpu_to_le32(sizeof(*cap) - 10); + cap->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_CAP_INFO); + cap->header.ver = 1; + cap->header.compat = 1; + cap->header.data_len = cpu_to_le32(sizeof(*cap) - header_len); cap->hit = cpu_to_le64(percpu_counter_sum(&m->i_caps_hit)); cap->mis = cpu_to_le64(percpu_counter_sum(&m->i_caps_mis)); cap->total = cpu_to_le64(nr_caps); @@ -54,10 +58,10 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc, /* encode the read latency metric */ read = (struct ceph_metric_read_latency *)(cap + 1); - read->type = cpu_to_le32(CLIENT_METRIC_TYPE_READ_LATENCY); - read->ver = 1; - read->compat = 1; - read->data_len = cpu_to_le32(sizeof(*read) - 10); + read->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_READ_LATENCY); + read->header.ver = 1; + read->header.compat = 1; + read->header.data_len = cpu_to_le32(sizeof(*read) - header_len); sum = m->read_latency_sum; jiffies_to_timespec64(sum, &ts); read->sec = cpu_to_le32(ts.tv_sec); @@ -66,10 +70,10 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc, /* encode the write latency metric */ write = (struct ceph_metric_write_latency *)(read + 1); - write->type = cpu_to_le32(CLIENT_METRIC_TYPE_WRITE_LATENCY); - write->ver = 1; - write->compat = 1; - write->data_len = cpu_to_le32(sizeof(*write) - 10); + write->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_WRITE_LATENCY); + write->header.ver = 1; + write->header.compat = 1; + write->header.data_len = cpu_to_le32(sizeof(*write) - header_len); sum = m->write_latency_sum; jiffies_to_timespec64(sum, &ts); write->sec = cpu_to_le32(ts.tv_sec); @@ -78,10 +82,10 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc, /* encode the metadata latency metric */ meta = (struct ceph_metric_metadata_latency *)(write + 1); - meta->type = cpu_to_le32(CLIENT_METRIC_TYPE_METADATA_LATENCY); - meta->ver = 1; - meta->compat = 1; - meta->data_len = cpu_to_le32(sizeof(*meta) - 10); + meta->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_METADATA_LATENCY); + meta->header.ver = 1; + meta->header.compat = 1; + meta->header.data_len = cpu_to_le32(sizeof(*meta) - header_len); sum = m->metadata_latency_sum; jiffies_to_timespec64(sum, &ts); meta->sec = cpu_to_le32(ts.tv_sec); @@ -90,10 +94,10 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc, /* encode the dentry lease metric */ dlease = (struct ceph_metric_dlease *)(meta + 1); - dlease->type = cpu_to_le32(CLIENT_METRIC_TYPE_DENTRY_LEASE); - dlease->ver = 1; - dlease->compat = 1; - dlease->data_len = cpu_to_le32(sizeof(*dlease) - 10); + dlease->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_DENTRY_LEASE); + dlease->header.ver = 1; + dlease->header.compat = 1; + dlease->header.data_len = cpu_to_le32(sizeof(*dlease) - header_len); dlease->hit = cpu_to_le64(percpu_counter_sum(&m->d_lease_hit)); dlease->mis = cpu_to_le64(percpu_counter_sum(&m->d_lease_mis)); dlease->total = cpu_to_le64(atomic64_read(&m->total_dentries)); @@ -103,34 +107,54 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc, /* encode the opened files metric */ files = (struct ceph_opened_files *)(dlease + 1); - files->type = cpu_to_le32(CLIENT_METRIC_TYPE_OPENED_FILES); - files->ver = 1; - files->compat = 1; - files->data_len = cpu_to_le32(sizeof(*files) - 10); + files->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_OPENED_FILES); + files->header.ver = 1; + files->header.compat = 1; + files->header.data_len = cpu_to_le32(sizeof(*files) - header_len); files->opened_files = cpu_to_le64(atomic64_read(&m->opened_files)); files->total = cpu_to_le64(sum); items++; /* encode the pinned icaps metric */ icaps = (struct ceph_pinned_icaps *)(files + 1); - icaps->type = cpu_to_le32(CLIENT_METRIC_TYPE_PINNED_ICAPS); - icaps->ver = 1; - icaps->compat = 1; - icaps->data_len = cpu_to_le32(sizeof(*icaps) - 10); + icaps->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_PINNED_ICAPS); + icaps->header.ver = 1; + icaps->header.compat = 1; + icaps->header.data_len = cpu_to_le32(sizeof(*icaps) - header_len); icaps->pinned_icaps = cpu_to_le64(nr_caps); icaps->total = cpu_to_le64(sum); items++; /* encode the opened inodes metric */ inodes = (struct ceph_opened_inodes *)(icaps + 1); - inodes->type = cpu_to_le32(CLIENT_METRIC_TYPE_OPENED_INODES); - inodes->ver = 1; - inodes->compat = 1; - inodes->data_len = cpu_to_le32(sizeof(*inodes) - 10); + inodes->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_OPENED_INODES); + inodes->header.ver = 1; + inodes->header.compat = 1; + inodes->header.data_len = cpu_to_le32(sizeof(*inodes) - header_len); inodes->opened_inodes = cpu_to_le64(percpu_counter_sum(&m->opened_inodes)); inodes->total = cpu_to_le64(sum); items++; + /* encode the read io size metric */ + rsize = (struct ceph_read_io_size *)(inodes + 1); + rsize->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_READ_IO_SIZES); + rsize->header.ver = 1; + rsize->header.compat = 1; + rsize->header.data_len = cpu_to_le32(sizeof(*rsize) - header_len); + rsize->total_ops = cpu_to_le64(m->total_reads); + rsize->total_size = cpu_to_le64(m->read_size_sum); + items++; + + /* encode the write io size metric */ + wsize = (struct ceph_write_io_size *)(rsize + 1); + wsize->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_WRITE_IO_SIZES); + wsize->header.ver = 1; + wsize->header.compat = 1; + wsize->header.data_len = cpu_to_le32(sizeof(*wsize) - header_len); + wsize->total_ops = cpu_to_le64(m->total_writes); + wsize->total_size = cpu_to_le64(m->write_size_sum); + items++; + put_unaligned_le32(items, &head->num); msg->front.iov_len = len; msg->hdr.version = cpu_to_le16(1); @@ -225,6 +249,9 @@ int ceph_metric_init(struct ceph_client_metric *m) m->read_latency_max = 0; m->total_reads = 0; m->read_latency_sum = 0; + m->read_size_min = U64_MAX; + m->read_size_max = 0; + m->read_size_sum = 0; spin_lock_init(&m->write_metric_lock); m->write_latency_sq_sum = 0; @@ -232,6 +259,9 @@ int ceph_metric_init(struct ceph_client_metric *m) m->write_latency_max = 0; m->total_writes = 0; m->write_latency_sum = 0; + m->write_size_min = U64_MAX; + m->write_size_max = 0; + m->write_size_sum = 0; spin_lock_init(&m->metadata_metric_lock); m->metadata_latency_sq_sum = 0; @@ -281,23 +311,21 @@ void ceph_metric_destroy(struct ceph_client_metric *m) cancel_delayed_work_sync(&m->delayed_work); - if (m->session) - ceph_put_mds_session(m->session); + ceph_put_mds_session(m->session); } -static inline void __update_latency(ktime_t *totalp, ktime_t *lsump, - ktime_t *min, ktime_t *max, - ktime_t *sq_sump, ktime_t lat) -{ - ktime_t total, avg, sq, lsum; - - total = ++(*totalp); - lsum = (*lsump += lat); +#define METRIC_UPDATE_MIN_MAX(min, max, new) \ +{ \ + if (unlikely(new < min)) \ + min = new; \ + if (unlikely(new > max)) \ + max = new; \ +} - if (unlikely(lat < *min)) - *min = lat; - if (unlikely(lat > *max)) - *max = lat; +static inline void __update_stdev(ktime_t total, ktime_t lsum, + ktime_t *sq_sump, ktime_t lat) +{ + ktime_t avg, sq; if (unlikely(total == 1)) return; @@ -312,33 +340,51 @@ static inline void __update_latency(ktime_t *totalp, ktime_t *lsump, void ceph_update_read_metrics(struct ceph_client_metric *m, ktime_t r_start, ktime_t r_end, - int rc) + unsigned int size, int rc) { ktime_t lat = ktime_sub(r_end, r_start); + ktime_t total; if (unlikely(rc < 0 && rc != -ENOENT && rc != -ETIMEDOUT)) return; spin_lock(&m->read_metric_lock); - __update_latency(&m->total_reads, &m->read_latency_sum, - &m->read_latency_min, &m->read_latency_max, - &m->read_latency_sq_sum, lat); + total = ++m->total_reads; + m->read_size_sum += size; + m->read_latency_sum += lat; + METRIC_UPDATE_MIN_MAX(m->read_size_min, + m->read_size_max, + size); + METRIC_UPDATE_MIN_MAX(m->read_latency_min, + m->read_latency_max, + lat); + __update_stdev(total, m->read_latency_sum, + &m->read_latency_sq_sum, lat); spin_unlock(&m->read_metric_lock); } void ceph_update_write_metrics(struct ceph_client_metric *m, ktime_t r_start, ktime_t r_end, - int rc) + unsigned int size, int rc) { ktime_t lat = ktime_sub(r_end, r_start); + ktime_t total; if (unlikely(rc && rc != -ETIMEDOUT)) return; spin_lock(&m->write_metric_lock); - __update_latency(&m->total_writes, &m->write_latency_sum, - &m->write_latency_min, &m->write_latency_max, - &m->write_latency_sq_sum, lat); + total = ++m->total_writes; + m->write_size_sum += size; + m->write_latency_sum += lat; + METRIC_UPDATE_MIN_MAX(m->write_size_min, + m->write_size_max, + size); + METRIC_UPDATE_MIN_MAX(m->write_latency_min, + m->write_latency_max, + lat); + __update_stdev(total, m->write_latency_sum, + &m->write_latency_sq_sum, lat); spin_unlock(&m->write_metric_lock); } @@ -347,13 +393,18 @@ void ceph_update_metadata_metrics(struct ceph_client_metric *m, int rc) { ktime_t lat = ktime_sub(r_end, r_start); + ktime_t total; if (unlikely(rc && rc != -ENOENT)) return; spin_lock(&m->metadata_metric_lock); - __update_latency(&m->total_metadatas, &m->metadata_latency_sum, - &m->metadata_latency_min, &m->metadata_latency_max, - &m->metadata_latency_sq_sum, lat); + total = ++m->total_metadatas; + m->metadata_latency_sum += lat; + METRIC_UPDATE_MIN_MAX(m->metadata_latency_min, + m->metadata_latency_max, + lat); + __update_stdev(total, m->metadata_latency_sum, + &m->metadata_latency_sq_sum, lat); spin_unlock(&m->metadata_metric_lock); } diff --git a/fs/ceph/metric.h b/fs/ceph/metric.h index e984eb2bb14b4a..0133955a3c6aac 100644 --- a/fs/ceph/metric.h +++ b/fs/ceph/metric.h @@ -17,8 +17,10 @@ enum ceph_metric_type { CLIENT_METRIC_TYPE_OPENED_FILES, CLIENT_METRIC_TYPE_PINNED_ICAPS, CLIENT_METRIC_TYPE_OPENED_INODES, + CLIENT_METRIC_TYPE_READ_IO_SIZES, + CLIENT_METRIC_TYPE_WRITE_IO_SIZES, - CLIENT_METRIC_TYPE_MAX = CLIENT_METRIC_TYPE_OPENED_INODES, + CLIENT_METRIC_TYPE_MAX = CLIENT_METRIC_TYPE_WRITE_IO_SIZES, }; /* @@ -34,18 +36,22 @@ enum ceph_metric_type { CLIENT_METRIC_TYPE_OPENED_FILES, \ CLIENT_METRIC_TYPE_PINNED_ICAPS, \ CLIENT_METRIC_TYPE_OPENED_INODES, \ + CLIENT_METRIC_TYPE_READ_IO_SIZES, \ + CLIENT_METRIC_TYPE_WRITE_IO_SIZES, \ \ CLIENT_METRIC_TYPE_MAX, \ } -/* metric caps header */ -struct ceph_metric_cap { +struct ceph_metric_header { __le32 type; /* ceph metric type */ - __u8 ver; __u8 compat; - __le32 data_len; /* length of sizeof(hit + mis + total) */ +} __packed; + +/* metric caps header */ +struct ceph_metric_cap { + struct ceph_metric_header header; __le64 hit; __le64 mis; __le64 total; @@ -53,48 +59,28 @@ struct ceph_metric_cap { /* metric read latency header */ struct ceph_metric_read_latency { - __le32 type; /* ceph metric type */ - - __u8 ver; - __u8 compat; - - __le32 data_len; /* length of sizeof(sec + nsec) */ + struct ceph_metric_header header; __le32 sec; __le32 nsec; } __packed; /* metric write latency header */ struct ceph_metric_write_latency { - __le32 type; /* ceph metric type */ - - __u8 ver; - __u8 compat; - - __le32 data_len; /* length of sizeof(sec + nsec) */ + struct ceph_metric_header header; __le32 sec; __le32 nsec; } __packed; /* metric metadata latency header */ struct ceph_metric_metadata_latency { - __le32 type; /* ceph metric type */ - - __u8 ver; - __u8 compat; - - __le32 data_len; /* length of sizeof(sec + nsec) */ + struct ceph_metric_header header; __le32 sec; __le32 nsec; } __packed; /* metric dentry lease header */ struct ceph_metric_dlease { - __le32 type; /* ceph metric type */ - - __u8 ver; - __u8 compat; - - __le32 data_len; /* length of sizeof(hit + mis + total) */ + struct ceph_metric_header header; __le64 hit; __le64 mis; __le64 total; @@ -102,40 +88,39 @@ struct ceph_metric_dlease { /* metric opened files header */ struct ceph_opened_files { - __le32 type; /* ceph metric type */ - - __u8 ver; - __u8 compat; - - __le32 data_len; /* length of sizeof(opened_files + total) */ + struct ceph_metric_header header; __le64 opened_files; __le64 total; } __packed; /* metric pinned i_caps header */ struct ceph_pinned_icaps { - __le32 type; /* ceph metric type */ - - __u8 ver; - __u8 compat; - - __le32 data_len; /* length of sizeof(pinned_icaps + total) */ + struct ceph_metric_header header; __le64 pinned_icaps; __le64 total; } __packed; /* metric opened inodes header */ struct ceph_opened_inodes { - __le32 type; /* ceph metric type */ - - __u8 ver; - __u8 compat; - - __le32 data_len; /* length of sizeof(opened_inodes + total) */ + struct ceph_metric_header header; __le64 opened_inodes; __le64 total; } __packed; +/* metric read io size header */ +struct ceph_read_io_size { + struct ceph_metric_header header; + __le64 total_ops; + __le64 total_size; +} __packed; + +/* metric write io size header */ +struct ceph_write_io_size { + struct ceph_metric_header header; + __le64 total_ops; + __le64 total_size; +} __packed; + struct ceph_metric_head { __le32 num; /* the number of metrics that will be sent */ } __packed; @@ -152,6 +137,9 @@ struct ceph_client_metric { spinlock_t read_metric_lock; u64 total_reads; + u64 read_size_sum; + u64 read_size_min; + u64 read_size_max; ktime_t read_latency_sum; ktime_t read_latency_sq_sum; ktime_t read_latency_min; @@ -159,6 +147,9 @@ struct ceph_client_metric { spinlock_t write_metric_lock; u64 total_writes; + u64 write_size_sum; + u64 write_size_min; + u64 write_size_max; ktime_t write_latency_sum; ktime_t write_latency_sq_sum; ktime_t write_latency_min; @@ -206,10 +197,10 @@ static inline void ceph_update_cap_mis(struct ceph_client_metric *m) extern void ceph_update_read_metrics(struct ceph_client_metric *m, ktime_t r_start, ktime_t r_end, - int rc); + unsigned int size, int rc); extern void ceph_update_write_metrics(struct ceph_client_metric *m, ktime_t r_start, ktime_t r_end, - int rc); + unsigned int size, int rc); extern void ceph_update_metadata_metrics(struct ceph_client_metric *m, ktime_t r_start, ktime_t r_end, int rc); diff --git a/fs/ceph/quota.c b/fs/ceph/quota.c index 4e32c9600ecc88..620c691af40e7e 100644 --- a/fs/ceph/quota.c +++ b/fs/ceph/quota.c @@ -74,8 +74,7 @@ void ceph_handle_quota(struct ceph_mds_client *mdsc, le64_to_cpu(h->max_files)); spin_unlock(&ci->i_ceph_lock); - /* avoid calling iput_final() in dispatch thread */ - ceph_async_iput(inode); + iput(inode); } static struct ceph_quotarealm_inode * @@ -247,8 +246,7 @@ static struct ceph_snap_realm *get_quota_realm(struct ceph_mds_client *mdsc, ci = ceph_inode(in); has_quota = __ceph_has_any_quota(ci); - /* avoid calling iput_final() while holding mdsc->snap_rwsem */ - ceph_async_iput(in); + iput(in); next = realm->parent; if (has_quota || !next) @@ -383,8 +381,7 @@ static bool check_quota_exceeded(struct inode *inode, enum quota_check_op op, pr_warn("Invalid quota check op (%d)\n", op); exceeded = true; /* Just break the loop */ } - /* avoid calling iput_final() while holding mdsc->snap_rwsem */ - ceph_async_iput(in); + iput(in); next = realm->parent; if (exceeded || !next) diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index 4ce18055d9316d..4ac0606dcbd41e 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -60,11 +60,13 @@ /* * increase ref count for the realm * - * caller must hold snap_rwsem for write. + * caller must hold snap_rwsem. */ void ceph_get_snap_realm(struct ceph_mds_client *mdsc, struct ceph_snap_realm *realm) { + lockdep_assert_held(&mdsc->snap_rwsem); + dout("get_realm %p %d -> %d\n", realm, atomic_read(&realm->nref), atomic_read(&realm->nref)+1); /* @@ -113,6 +115,8 @@ static struct ceph_snap_realm *ceph_create_snap_realm( { struct ceph_snap_realm *realm; + lockdep_assert_held_write(&mdsc->snap_rwsem); + realm = kzalloc(sizeof(*realm), GFP_NOFS); if (!realm) return ERR_PTR(-ENOMEM); @@ -135,7 +139,7 @@ static struct ceph_snap_realm *ceph_create_snap_realm( /* * lookup the realm rooted at @ino. * - * caller must hold snap_rwsem for write. + * caller must hold snap_rwsem. */ static struct ceph_snap_realm *__lookup_snap_realm(struct ceph_mds_client *mdsc, u64 ino) @@ -143,6 +147,8 @@ static struct ceph_snap_realm *__lookup_snap_realm(struct ceph_mds_client *mdsc, struct rb_node *n = mdsc->snap_realms.rb_node; struct ceph_snap_realm *r; + lockdep_assert_held(&mdsc->snap_rwsem); + while (n) { r = rb_entry(n, struct ceph_snap_realm, node); if (ino < r->ino) @@ -176,6 +182,8 @@ static void __put_snap_realm(struct ceph_mds_client *mdsc, static void __destroy_snap_realm(struct ceph_mds_client *mdsc, struct ceph_snap_realm *realm) { + lockdep_assert_held_write(&mdsc->snap_rwsem); + dout("__destroy_snap_realm %p %llx\n", realm, realm->ino); rb_erase(&realm->node, &mdsc->snap_realms); @@ -198,6 +206,8 @@ static void __destroy_snap_realm(struct ceph_mds_client *mdsc, static void __put_snap_realm(struct ceph_mds_client *mdsc, struct ceph_snap_realm *realm) { + lockdep_assert_held_write(&mdsc->snap_rwsem); + dout("__put_snap_realm %llx %p %d -> %d\n", realm->ino, realm, atomic_read(&realm->nref), atomic_read(&realm->nref)-1); if (atomic_dec_and_test(&realm->nref)) @@ -236,6 +246,8 @@ static void __cleanup_empty_realms(struct ceph_mds_client *mdsc) { struct ceph_snap_realm *realm; + lockdep_assert_held_write(&mdsc->snap_rwsem); + spin_lock(&mdsc->snap_empty_lock); while (!list_empty(&mdsc->snap_empty)) { realm = list_first_entry(&mdsc->snap_empty, @@ -269,6 +281,8 @@ static int adjust_snap_realm_parent(struct ceph_mds_client *mdsc, { struct ceph_snap_realm *parent; + lockdep_assert_held_write(&mdsc->snap_rwsem); + if (realm->parent_ino == parentino) return 0; @@ -460,7 +474,7 @@ static bool has_new_snaps(struct ceph_snap_context *o, * Caller must hold snap_rwsem for read (i.e., the realm topology won't * change). */ -void ceph_queue_cap_snap(struct ceph_inode_info *ci) +static void ceph_queue_cap_snap(struct ceph_inode_info *ci) { struct inode *inode = &ci->vfs_inode; struct ceph_cap_snap *capsnap; @@ -663,15 +677,13 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm) if (!inode) continue; spin_unlock(&realm->inodes_with_caps_lock); - /* avoid calling iput_final() while holding - * mdsc->snap_rwsem or in mds dispatch threads */ - ceph_async_iput(lastinode); + iput(lastinode); lastinode = inode; ceph_queue_cap_snap(ci); spin_lock(&realm->inodes_with_caps_lock); } spin_unlock(&realm->inodes_with_caps_lock); - ceph_async_iput(lastinode); + iput(lastinode); dout("queue_realm_cap_snaps %p %llx done\n", realm, realm->ino); } @@ -696,6 +708,8 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc, int err = -ENOMEM; LIST_HEAD(dirty_realms); + lockdep_assert_held_write(&mdsc->snap_rwsem); + dout("update_snap_trace deletion=%d\n", deletion); more: ceph_decode_need(&p, e, sizeof(*ri), bad); @@ -791,7 +805,7 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc, return 0; bad: - err = -EINVAL; + err = -EIO; fail: if (realm && !IS_ERR(realm)) ceph_put_snap_realm(mdsc, realm); @@ -823,17 +837,12 @@ static void flush_snaps(struct ceph_mds_client *mdsc) ihold(inode); spin_unlock(&mdsc->snap_flush_lock); ceph_flush_snaps(ci, &session); - /* avoid calling iput_final() while holding - * session->s_mutex or in mds dispatch threads */ - ceph_async_iput(inode); + iput(inode); spin_lock(&mdsc->snap_flush_lock); } spin_unlock(&mdsc->snap_flush_lock); - if (session) { - mutex_unlock(&session->s_mutex); - ceph_put_mds_session(session); - } + ceph_put_mds_session(session); dout("flush_snaps done\n"); } @@ -969,14 +978,12 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, ceph_get_snap_realm(mdsc, realm); ceph_put_snap_realm(mdsc, oldrealm); - /* avoid calling iput_final() while holding - * mdsc->snap_rwsem or mds in dispatch threads */ - ceph_async_iput(inode); + iput(inode); continue; skip_inode: spin_unlock(&ci->i_ceph_lock); - ceph_async_iput(inode); + iput(inode); } /* we may have taken some of the old realm's children. */ diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 839e6b0239eeb7..6b6332a5c113cf 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -931,7 +931,6 @@ extern int ceph_update_snap_trace(struct ceph_mds_client *m, extern void ceph_handle_snap(struct ceph_mds_client *mdsc, struct ceph_mds_session *session, struct ceph_msg *msg); -extern void ceph_queue_cap_snap(struct ceph_inode_info *ci); extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci, struct ceph_cap_snap *capsnap); extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc); @@ -989,8 +988,6 @@ extern int ceph_inode_holds_cap(struct inode *inode, int mask); extern bool ceph_inode_set_size(struct inode *inode, loff_t size); extern void __ceph_do_pending_vmtruncate(struct inode *inode); -extern void ceph_async_iput(struct inode *inode); - void ceph_queue_inode_work(struct inode *inode, int work_bit); static inline void ceph_queue_vmtruncate(struct inode *inode) diff --git a/net/ceph/auth.c b/net/ceph/auth.c index d2b268a1838e8e..d38c9eadbe2f18 100644 --- a/net/ceph/auth.c +++ b/net/ceph/auth.c @@ -58,12 +58,10 @@ struct ceph_auth_client *ceph_auth_init(const char *name, const int *con_modes) { struct ceph_auth_client *ac; - int ret; - ret = -ENOMEM; ac = kzalloc(sizeof(*ac), GFP_NOFS); if (!ac) - goto out; + return ERR_PTR(-ENOMEM); mutex_init(&ac->mutex); ac->negotiating = true; @@ -78,9 +76,6 @@ struct ceph_auth_client *ceph_auth_init(const char *name, dout("%s name '%s' preferred_mode %d fallback_mode %d\n", __func__, ac->name, ac->preferred_mode, ac->fallback_mode); return ac; - -out: - return ERR_PTR(ret); } void ceph_auth_destroy(struct ceph_auth_client *ac) diff --git a/net/ceph/auth_none.c b/net/ceph/auth_none.c index 097e9f8d87a72f..77b5519bc45f44 100644 --- a/net/ceph/auth_none.c +++ b/net/ceph/auth_none.c @@ -112,8 +112,8 @@ static int ceph_auth_none_create_authorizer( auth->authorizer = (struct ceph_authorizer *) au; auth->authorizer_buf = au->buf; auth->authorizer_buf_len = au->buf_len; - auth->authorizer_reply_buf = au->reply_buf; - auth->authorizer_reply_buf_len = sizeof (au->reply_buf); + auth->authorizer_reply_buf = NULL; + auth->authorizer_reply_buf_len = 0; return 0; } diff --git a/net/ceph/auth_none.h b/net/ceph/auth_none.h index 4158f064302e9f..bb121539e796a3 100644 --- a/net/ceph/auth_none.h +++ b/net/ceph/auth_none.h @@ -16,7 +16,6 @@ struct ceph_none_authorizer { struct ceph_authorizer base; char buf[128]; int buf_len; - char reply_buf[0]; }; struct ceph_auth_none_info { diff --git a/net/ceph/cls_lock_client.c b/net/ceph/cls_lock_client.c index 17447c19d9376f..66136a4c1ce7fe 100644 --- a/net/ceph/cls_lock_client.c +++ b/net/ceph/cls_lock_client.c @@ -10,7 +10,9 @@ /** * ceph_cls_lock - grab rados lock for object - * @oid, @oloc: object to lock + * @osdc: OSD client instance + * @oid: object to lock + * @oloc: object to lock * @lock_name: the name of the lock * @type: lock type (CEPH_CLS_LOCK_EXCLUSIVE or CEPH_CLS_LOCK_SHARED) * @cookie: user-defined identifier for this instance of the lock @@ -82,7 +84,9 @@ EXPORT_SYMBOL(ceph_cls_lock); /** * ceph_cls_unlock - release rados lock for object - * @oid, @oloc: object to lock + * @osdc: OSD client instance + * @oid: object to lock + * @oloc: object to lock * @lock_name: the name of the lock * @cookie: user-defined identifier for this instance of the lock */ @@ -130,7 +134,9 @@ EXPORT_SYMBOL(ceph_cls_unlock); /** * ceph_cls_break_lock - release rados lock for object for specified client - * @oid, @oloc: object to lock + * @osdc: OSD client instance + * @oid: object to lock + * @oloc: object to lock * @lock_name: the name of the lock * @cookie: user-defined identifier for this instance of the lock * @locker: current lock owner