From: Jeff Moyer <jmoyer@redhat.com> Subject: [rhel5 patch 0/5] dio: clean up completion phase of direct_io_worker() Date: Fri, 01 Jun 2007 15:36:26 -0400 Bugzilla: 242116 Message-Id: <x49lkf331s5.fsf@segfault.boston.devel.redhat.com> Changelog: [dio] clean up completion phase of direct_io_worker() Hi, This patch set is Zach Brown's cleanup to the direct I/O completion path. I'll include his changelogs verbatim, and the patches apply verbatim, as well. I have applied these patches to my rhel5 dio branch and have been testing them and distributing them to customers and partners for testing over the last few months. My tests consist of both directed tests (where possible), and of generic testing using the aio-stress and fio utilities, as well as the regression test suite that Zach and I have been working on. There is 1 bugzilla which is fixed directly by this patch series: Bugzilla Bug 242116: race in aio io_submit write/read The customer provided a reproducer for the short read problem, and I was able to modify the code to trigger the problem within a very short amount of time (within 3-30 seconds). With this patch set in place, I have been unable to trigger the problem. Comments, as always, are encouraged. Thanks! Jeff --- dio: clean up completion phase of direct_io_worker() There have been a lot of bugs recently due to the way direct_io_worker() tries to decide how to finish direct IO operations. In the worst examples it has failed to call aio_complete() at all (hang) or called it too many times (oops). This set of patches cleans up the completion phase with the goal of removing the complexity that lead to these bugs. We end up with one path that calculates the result of the operation after all off the bios have completed. We decide when to generate a result of the operation using that path based on the final release of a refcount on the dio structure. I tried to progress towards the final state in steps that were relatively easy to understand. Each step should compile but I only tested the final result of having all the patches applied. The patches result in a slight net decrease in code and binary size: 2.6.18-rc4-dio-cleanup/fs/direct-io.c | 8 2.6.18-rc5-dio-cleanup/fs/direct-io.c | 94 +++++------ 2.6.18-rc5-dio-cleanup/mm/filemap.c | 4 fs/direct-io.c | 273 ++++++++++++++-------------------- 4 files changed, 159 insertions(+), 220 deletions(-) text data bss dec hex filename 2592385 450996 210296 3253677 31a5ad vmlinux.before 2592113 450980 210296 3253389 31a48d vmlinux.after The patches pass light testing with aio-stress, the direct IO tests I could manage to get running in LTP, and some home-brew functional tests. It's still making its way through stress testing. It should not be merged until we hear from that more rigorous testing, I don't think. I hoped to get some feedback (and maybe volunteers for testing!) by sending the patches out before waiting for the stress tests. commit 6d544bb4d9019c3a0d7ee4af1e4bbbd61a6e16dc Author: Zach Brown <zach.brown@oracle.com> [PATCH] dio: centralize completion in dio_complete() There have been a lot of bugs recently due to the way direct_io_worker() tries to decide how to finish direct IO operations. In the worst examples it has failed to call aio_complete() at all (hang) or called it too many times (oops). This set of patches cleans up the completion phase with the goal of removing the complexity that lead to these bugs. We end up with one path that calculates the result of the operation after all off the bios have completed. We decide when to generate a result of the operation using that path based on the final release of a refcount on the dio structure. I tried to progress towards the final state in steps that were relatively easy to understand. Each step should compile but I only tested the final result of having all the patches applied. I've tested these on low end PC drives with aio-stress, the direct IO tests I could manage to get running in LTP, orasim, and some home-brew functional tests. In http://lkml.org/lkml/2006/9/21/103 IBM reports success with ext2 and ext3 running DIO LTP tests. They found that XFS bug which has since been addressed in the patch series. This patch: The mechanics which decide the result of a direct IO operation were duplicated in the sync and async paths. The async path didn't check page_errors which can manifest as silently returning success when the final pointer in an operation faults and its matching file region is filled with zeros. The sync path and async path differed in whether they passed errors to the caller's dio->end_io operation. The async path was passing errors to it which trips an assertion in XFS, though it is apparently harmless. This centralizes the completion phase of dio ops in one place. AIO will now return EFAULT consistently and all paths fall back to the previously sync behaviour of passing the number of bytes 'transferred' to the dio->end_io callback, regardless of errors. dio_await_completion() doesn't have to propogate EIO from non-uptodate bios now that it's being propogated through dio_complete() via dio->io_error. This lets it return void which simplifies its sole caller. Signed-off-by: Zach Brown <zach.brown@oracle.com> Cc: Badari Pulavarty <pbadari@us.ibm.com> Cc: Suparna Bhattacharya <suparna@in.ibm.com> Acked-by: Jeff Moyer <jmoyer@redhat.com> Signed-off-by: Andrew Morton <akpm@osdl.org> Signed-off-by: Linus Torvalds <torvalds@osdl.org> diff --git a/fs/direct-io.c b/fs/direct-io.c index 45d34d8..b57b671 100644 --- a/fs/direct-io.c +++ b/fs/direct-io.c @@ -210,19 +210,46 @@ static struct page *dio_get_page(struct return dio->pages[dio->head++]; } -/* - * Called when all DIO BIO I/O has been completed - let the filesystem - * know, if it registered an interest earlier via get_block. Pass the - * private field of the map buffer_head so that filesystems can use it - * to hold additional state between get_block calls and dio_complete. +/** + * dio_complete() - called when all DIO BIO I/O has been completed + * @offset: the byte offset in the file of the completed operation + * + * This releases locks as dictated by the locking type, lets interested parties + * know that a DIO operation has completed, and calculates the resulting return + * code for the operation. + * + * It lets the filesystem know if it registered an interest earlier via + * get_block. Pass the private field of the map buffer_head so that + * filesystems can use it to hold additional state between get_block calls and + * dio_complete. */ -static void dio_complete(struct dio *dio, loff_t offset, ssize_t bytes) +static int dio_complete(struct dio *dio, loff_t offset, int ret) { + ssize_t transferred = 0; + + if (dio->result) { + transferred = dio->result; + + /* Check for short read case */ + if ((dio->rw == READ) && ((offset + transferred) > dio->i_size)) + transferred = dio->i_size - offset; + } + if (dio->end_io && dio->result) - dio->end_io(dio->iocb, offset, bytes, dio->map_bh.b_private); + dio->end_io(dio->iocb, offset, transferred, + dio->map_bh.b_private); if (dio->lock_type == DIO_LOCKING) /* lockdep: non-owner release */ up_read_non_owner(&dio->inode->i_alloc_sem); + + if (ret == 0) + ret = dio->page_errors; + if (ret == 0) + ret = dio->io_error; + if (ret == 0) + ret = transferred; + + return ret; } /* @@ -236,8 +263,7 @@ static void finished_one_bio(struct dio spin_lock_irqsave(&dio->bio_lock, flags); if (dio->bio_count == 1) { if (dio->is_async) { - ssize_t transferred; - loff_t offset; + int ret; /* * Last reference to the dio is going away. @@ -245,24 +271,12 @@ static void finished_one_bio(struct dio */ spin_unlock_irqrestore(&dio->bio_lock, flags); - /* Check for short read case */ - transferred = dio->result; - offset = dio->iocb->ki_pos; - - if ((dio->rw == READ) && - ((offset + transferred) > dio->i_size)) - transferred = dio->i_size - offset; - - /* check for error in completion path */ - if (dio->io_error) - transferred = dio->io_error; - - dio_complete(dio, offset, transferred); + ret = dio_complete(dio, dio->iocb->ki_pos, 0); /* Complete AIO later if falling back to buffered i/o */ if (dio->result == dio->size || ((dio->rw == READ) && dio->result)) { - aio_complete(dio->iocb, transferred, 0); + aio_complete(dio->iocb, ret, 0); kfree(dio); return; } else { @@ -434,10 +448,8 @@ static int dio_bio_complete(struct dio * /* * Wait on and process all in-flight BIOs. */ -static int dio_await_completion(struct dio *dio) +static void dio_await_completion(struct dio *dio) { - int ret = 0; - if (dio->bio) dio_bio_submit(dio); @@ -448,13 +460,9 @@ static int dio_await_completion(struct d */ while (dio->bio_count) { struct bio *bio = dio_await_one(dio); - int ret2; - - ret2 = dio_bio_complete(dio, bio); - if (ret == 0) - ret = ret2; + /* io errors are propogated through dio->io_error */ + dio_bio_complete(dio, bio); } - return ret; } /* @@ -1127,28 +1135,10 @@ direct_io_worker(int rw, struct kiocb *i kfree(dio); } } else { - ssize_t transferred = 0; - finished_one_bio(dio); - ret2 = dio_await_completion(dio); - if (ret == 0) - ret = ret2; - if (ret == 0) - ret = dio->page_errors; - if (dio->result) { - loff_t i_size = i_size_read(inode); + dio_await_completion(dio); - transferred = dio->result; - /* - * Adjust the return value if the read crossed a - * non-block-aligned EOF. - */ - if (rw == READ && (offset + transferred > i_size)) - transferred = i_size - offset; - } - dio_complete(dio, offset, transferred); - if (ret == 0) - ret = transferred; + ret = dio_complete(dio, offset, ret); /* We could have also come here on an AIO file extend */ if (!is_sync_kiocb(iocb) && (rw & WRITE) && commit 17a7b1d74b1207f8f1af40b5d184989076d08f8b Author: Zach Brown <zach.brown@oracle.com> [PATCH] dio: call blk_run_address_space() once per op We only need to call blk_run_address_space() once after all the bios for the direct IO op have been submitted. This removes the chance of calling blk_run_address_space() after spurious wake ups as the sync path waits for bios to drain. It's also one less difference betwen the sync and async paths. In the process we remove a redundant dio_bio_submit() that its caller had already performed. Signed-off-by: Zach Brown <zach.brown@oracle.com> Cc: Badari Pulavarty <pbadari@us.ibm.com> Cc: Suparna Bhattacharya <suparna@in.ibm.com> Acked-by: Jeff Moyer <jmoyer@redhat.com> Signed-off-by: Andrew Morton <akpm@osdl.org> Signed-off-by: Linus Torvalds <torvalds@osdl.org> diff --git a/fs/direct-io.c b/fs/direct-io.c index b57b671..b296942 100644 --- a/fs/direct-io.c +++ b/fs/direct-io.c @@ -404,7 +404,6 @@ static struct bio *dio_await_one(struct if (dio->bio_list == NULL) { dio->waiter = current; spin_unlock_irqrestore(&dio->bio_lock, flags); - blk_run_address_space(dio->inode->i_mapping); io_schedule(); spin_lock_irqsave(&dio->bio_lock, flags); dio->waiter = NULL; @@ -450,9 +449,6 @@ static int dio_bio_complete(struct dio * */ static void dio_await_completion(struct dio *dio) { - if (dio->bio) - dio_bio_submit(dio); - /* * The bio_lock is not held for the read of bio_count. * This is ok since it is the dio_bio_complete() that changes @@ -1085,6 +1081,9 @@ direct_io_worker(int rw, struct kiocb *i if (dio->bio) dio_bio_submit(dio); + /* All IO is now issued, send it on its way */ + blk_run_address_space(inode->i_mapping); + /* * It is possible that, we return short IO due to end of file. * In that case, we need to release all the pages we got hold on. @@ -1113,7 +1112,6 @@ direct_io_worker(int rw, struct kiocb *i if (ret == 0) ret = dio->result; finished_one_bio(dio); /* This can free the dio */ - blk_run_address_space(inode->i_mapping); if (should_wait) { unsigned long flags; /* commit 0273201e693fd62381f6b1e85b15ffc117d8a46e Author: Zach Brown <zach.brown@oracle.com> [PATCH] dio: formalize bio counters as a dio reference count Previously we had two confusing counts of bio progress. 'bio_count' was decremented as bios were processed and freed by the dio core. It was used to indicate final completion of the dio operation. 'bios_in_flight' reflected how many bios were between submit_bio() and bio->end_io. It was used by the sync path to decide when to wake up and finish completing bios and was ignored by the async path. This patch collapses the two notions into one notion of a dio reference count. bios hold a dio reference when they're between submit_bio and bio->end_io. Since bios_in_flight was only used in the sync path it is now equivalent to dio->refcount - 1 which accounts for direct_io_worker() holding a reference for the duration of the operation. dio_bio_complete() -> finished_one_bio() was called from the sync path after finding bios on the list that the bio->end_io function had deposited. finished_one_bio() can not drop the dio reference on behalf of these bios now because bio->end_io already has. The is_async test in finished_one_bio() meant that it never actually did anything other than drop the bio_count for sync callers. So we remove its refcount decrement, don't call it from dio_bio_complete(), and hoist its call up into the async dio_bio_complete() caller after an explicit refcount decrement. It is renamed dio_complete_aio() to reflect the remaining work it actually does. Signed-off-by: Zach Brown <zach.brown@oracle.com> Cc: Badari Pulavarty <pbadari@us.ibm.com> Cc: Suparna Bhattacharya <suparna@in.ibm.com> Acked-by: Jeff Moyer <jmoyer@redhat.com> Signed-off-by: Andrew Morton <akpm@osdl.org> Signed-off-by: Linus Torvalds <torvalds@osdl.org> diff --git a/fs/direct-io.c b/fs/direct-io.c index b296942..bc1cbf9 100644 --- a/fs/direct-io.c +++ b/fs/direct-io.c @@ -121,9 +121,8 @@ struct dio { int page_errors; /* errno from get_user_pages() */ /* BIO completion state */ + atomic_t refcount; /* direct_io_worker() and bios */ spinlock_t bio_lock; /* protects BIO fields below */ - int bio_count; /* nr bios to be completed */ - int bios_in_flight; /* nr bios in flight */ struct bio *bio_list; /* singly linked via bi_private */ struct task_struct *waiter; /* waiting task (NULL if none) */ @@ -256,44 +255,27 @@ static int dio_complete(struct dio *dio, * Called when a BIO has been processed. If the count goes to zero then IO is * complete and we can signal this to the AIO layer. */ -static void finished_one_bio(struct dio *dio) +static void dio_complete_aio(struct dio *dio) { unsigned long flags; + int ret; - spin_lock_irqsave(&dio->bio_lock, flags); - if (dio->bio_count == 1) { - if (dio->is_async) { - int ret; - - /* - * Last reference to the dio is going away. - * Drop spinlock and complete the DIO. - */ - spin_unlock_irqrestore(&dio->bio_lock, flags); - - ret = dio_complete(dio, dio->iocb->ki_pos, 0); + ret = dio_complete(dio, dio->iocb->ki_pos, 0); - /* Complete AIO later if falling back to buffered i/o */ - if (dio->result == dio->size || - ((dio->rw == READ) && dio->result)) { - aio_complete(dio->iocb, ret, 0); - kfree(dio); - return; - } else { - /* - * Falling back to buffered - */ - spin_lock_irqsave(&dio->bio_lock, flags); - dio->bio_count--; - if (dio->waiter) - wake_up_process(dio->waiter); - spin_unlock_irqrestore(&dio->bio_lock, flags); - return; - } - } + /* Complete AIO later if falling back to buffered i/o */ + if (dio->result == dio->size || + ((dio->rw == READ) && dio->result)) { + aio_complete(dio->iocb, ret, 0); + kfree(dio); + } else { + /* + * Falling back to buffered + */ + spin_lock_irqsave(&dio->bio_lock, flags); + if (dio->waiter) + wake_up_process(dio->waiter); + spin_unlock_irqrestore(&dio->bio_lock, flags); } - dio->bio_count--; - spin_unlock_irqrestore(&dio->bio_lock, flags); } static int dio_bio_complete(struct dio *dio, struct bio *bio); @@ -309,6 +291,10 @@ static int dio_bio_end_aio(struct bio *b /* cleanup the bio */ dio_bio_complete(dio, bio); + + if (atomic_dec_and_test(&dio->refcount)) + dio_complete_aio(dio); + return 0; } @@ -330,8 +316,7 @@ static int dio_bio_end_io(struct bio *bi spin_lock_irqsave(&dio->bio_lock, flags); bio->bi_private = dio->bio_list; dio->bio_list = bio; - dio->bios_in_flight--; - if (dio->waiter && dio->bios_in_flight == 0) + if ((atomic_sub_return(1, &dio->refcount) == 1) && dio->waiter) wake_up_process(dio->waiter); spin_unlock_irqrestore(&dio->bio_lock, flags); return 0; @@ -362,17 +347,15 @@ dio_bio_alloc(struct dio *dio, struct bl * In the AIO read case we speculatively dirty the pages before starting IO. * During IO completion, any of these pages which happen to have been written * back will be redirtied by bio_check_pages_dirty(). + * + * bios hold a dio reference between submit_bio and ->end_io. */ static void dio_bio_submit(struct dio *dio) { struct bio *bio = dio->bio; - unsigned long flags; bio->bi_private = dio; - spin_lock_irqsave(&dio->bio_lock, flags); - dio->bio_count++; - dio->bios_in_flight++; - spin_unlock_irqrestore(&dio->bio_lock, flags); + atomic_inc(&dio->refcount); if (dio->is_async && dio->rw == READ) bio_set_pages_dirty(bio); submit_bio(dio->rw, bio); @@ -390,18 +373,28 @@ static void dio_cleanup(struct dio *dio) page_cache_release(dio_get_page(dio)); } +static int wait_for_more_bios(struct dio *dio) +{ + assert_spin_locked(&dio->bio_lock); + + return (atomic_read(&dio->refcount) > 1) && (dio->bio_list == NULL); +} + /* - * Wait for the next BIO to complete. Remove it and return it. + * Wait for the next BIO to complete. Remove it and return it. NULL is + * returned once all BIOs have been completed. This must only be called once + * all bios have been issued so that dio->refcount can only decrease. This + * requires that that the caller hold a reference on the dio. */ static struct bio *dio_await_one(struct dio *dio) { unsigned long flags; - struct bio *bio; + struct bio *bio = NULL; spin_lock_irqsave(&dio->bio_lock, flags); - while (dio->bio_list == NULL) { + while (wait_for_more_bios(dio)) { set_current_state(TASK_UNINTERRUPTIBLE); - if (dio->bio_list == NULL) { + if (wait_for_more_bios(dio)) { dio->waiter = current; spin_unlock_irqrestore(&dio->bio_lock, flags); io_schedule(); @@ -410,8 +403,10 @@ static struct bio *dio_await_one(struct } set_current_state(TASK_RUNNING); } - bio = dio->bio_list; - dio->bio_list = bio->bi_private; + if (dio->bio_list) { + bio = dio->bio_list; + dio->bio_list = bio->bi_private; + } spin_unlock_irqrestore(&dio->bio_lock, flags); return bio; } @@ -440,25 +435,24 @@ static int dio_bio_complete(struct dio * } bio_put(bio); } - finished_one_bio(dio); return uptodate ? 0 : -EIO; } /* - * Wait on and process all in-flight BIOs. + * Wait on and process all in-flight BIOs. This must only be called once + * all bios have been issued so that the refcount can only decrease. + * This just waits for all bios to make it through dio_bio_complete. IO + * errors are propogated through dio->io_error and should be propogated via + * dio_complete(). */ static void dio_await_completion(struct dio *dio) { - /* - * The bio_lock is not held for the read of bio_count. - * This is ok since it is the dio_bio_complete() that changes - * bio_count. - */ - while (dio->bio_count) { - struct bio *bio = dio_await_one(dio); - /* io errors are propogated through dio->io_error */ - dio_bio_complete(dio, bio); - } + struct bio *bio; + do { + bio = dio_await_one(dio); + if (bio) + dio_bio_complete(dio, bio); + } while (bio); } /* @@ -995,16 +989,7 @@ direct_io_worker(int rw, struct kiocb *i dio->iocb = iocb; dio->i_size = i_size_read(inode); - /* - * BIO completion state. - * - * ->bio_count starts out at one, and we decrement it to zero after all - * BIOs are submitted. This to avoid the situation where a really fast - * (or synchronous) device could take the count to zero while we're - * still submitting BIOs. - */ - dio->bio_count = 1; - dio->bios_in_flight = 0; + atomic_set(&dio->refcount, 1); spin_lock_init(&dio->bio_lock); dio->bio_list = NULL; dio->waiter = NULL; @@ -1111,7 +1096,11 @@ direct_io_worker(int rw, struct kiocb *i } if (ret == 0) ret = dio->result; - finished_one_bio(dio); /* This can free the dio */ + + /* this can free the dio */ + if (atomic_dec_and_test(&dio->refcount)) + dio_complete_aio(dio); + if (should_wait) { unsigned long flags; /* @@ -1122,7 +1111,7 @@ direct_io_worker(int rw, struct kiocb *i spin_lock_irqsave(&dio->bio_lock, flags); set_current_state(TASK_UNINTERRUPTIBLE); - while (dio->bio_count) { + while (atomic_read(&dio->refcount)) { spin_unlock_irqrestore(&dio->bio_lock, flags); io_schedule(); spin_lock_irqsave(&dio->bio_lock, flags); @@ -1133,7 +1122,6 @@ direct_io_worker(int rw, struct kiocb *i kfree(dio); } } else { - finished_one_bio(dio); dio_await_completion(dio); ret = dio_complete(dio, offset, ret); @@ -1146,7 +1134,11 @@ direct_io_worker(int rw, struct kiocb *i * i/o, we have to mark the the aio complete. */ aio_complete(iocb, ret, 0); - kfree(dio); + + if (atomic_dec_and_test(&dio->refcount)) + kfree(dio); + else + BUG(); } return ret; } commit 20258b2b397031649e4a41922fe803d57017df84 Author: Zach Brown <zach.brown@oracle.com> [PATCH] dio: remove duplicate bio wait code Now that we have a single refcount and waiting path we can reuse it in the async 'should_wait' path. It continues to rely on the fragile link between the conditional in dio_complete_aio() which decides to complete the AIO and the conditional in direct_io_worker() which decides to wait and free. By waiting before dropping the reference we stop dio_bio_end_aio() from calling dio_complete_aio() which used to wake up the waiter after seeing the reference count drop to 0. We hoist this wake up into dio_bio_end_aio() which now notices when it's left a single remaining reference that is held by the waiter. Signed-off-by: Zach Brown <zach.brown@oracle.com> Cc: Badari Pulavarty <pbadari@us.ibm.com> Cc: Suparna Bhattacharya <suparna@in.ibm.com> Acked-by: Jeff Moyer <jmoyer@redhat.com> Signed-off-by: Andrew Morton <akpm@osdl.org> Signed-off-by: Linus Torvalds <torvalds@osdl.org> diff --git a/fs/direct-io.c b/fs/direct-io.c index bc1cbf9..f11f05d 100644 --- a/fs/direct-io.c +++ b/fs/direct-io.c @@ -257,7 +257,6 @@ static int dio_complete(struct dio *dio, */ static void dio_complete_aio(struct dio *dio) { - unsigned long flags; int ret; ret = dio_complete(dio, dio->iocb->ki_pos, 0); @@ -267,14 +266,6 @@ static void dio_complete_aio(struct dio ((dio->rw == READ) && dio->result)) { aio_complete(dio->iocb, ret, 0); kfree(dio); - } else { - /* - * Falling back to buffered - */ - spin_lock_irqsave(&dio->bio_lock, flags); - if (dio->waiter) - wake_up_process(dio->waiter); - spin_unlock_irqrestore(&dio->bio_lock, flags); } } @@ -285,6 +276,8 @@ static int dio_bio_complete(struct dio * static int dio_bio_end_aio(struct bio *bio, unsigned int bytes_done, int error) { struct dio *dio = bio->bi_private; + int waiter_holds_ref = 0; + int remaining; if (bio->bi_size) return 1; @@ -292,7 +285,12 @@ static int dio_bio_end_aio(struct bio *b /* cleanup the bio */ dio_bio_complete(dio, bio); - if (atomic_dec_and_test(&dio->refcount)) + waiter_holds_ref = !!dio->waiter; + remaining = atomic_sub_return(1, (&dio->refcount)); + if (remaining == 1 && waiter_holds_ref) + wake_up_process(dio->waiter); + + if (remaining == 0) dio_complete_aio(dio); return 0; @@ -1097,30 +1095,15 @@ direct_io_worker(int rw, struct kiocb *i if (ret == 0) ret = dio->result; + if (should_wait) + dio_await_completion(dio); + /* this can free the dio */ if (atomic_dec_and_test(&dio->refcount)) dio_complete_aio(dio); - if (should_wait) { - unsigned long flags; - /* - * Wait for already issued I/O to drain out and - * release its references to user-space pages - * before returning to fallback on buffered I/O - */ - - spin_lock_irqsave(&dio->bio_lock, flags); - set_current_state(TASK_UNINTERRUPTIBLE); - while (atomic_read(&dio->refcount)) { - spin_unlock_irqrestore(&dio->bio_lock, flags); - io_schedule(); - spin_lock_irqsave(&dio->bio_lock, flags); - set_current_state(TASK_UNINTERRUPTIBLE); - } - spin_unlock_irqrestore(&dio->bio_lock, flags); - set_current_state(TASK_RUNNING); + if (should_wait) kfree(dio); - } } else { dio_await_completion(dio); commit 8459d86aff04fa53c2ab6a6b9f355b3063cc8014 Author: Zach Brown <zach.brown@oracle.com> [PATCH] dio: only call aio_complete() after returning -EIOCBQUEUED The only time it is safe to call aio_complete() is when the ->ki_retry function returns -EIOCBQUEUED to the AIO core. direct_io_worker() has historically done this by relying on its caller to translate positive return codes into -EIOCBQUEUED for the aio case. It did this by trying to keep conditionals in sync. direct_io_worker() knew when finished_one_bio() was going to call aio_complete(). It would reverse the test and wait and free the dio in the cases it thought that finished_one_bio() wasn't going to. Not surprisingly, it ended up getting it wrong. 'ret' could be a negative errno from the submission path but it failed to communicate this to finished_one_bio(). direct_io_worker() would return < 0, it's callers wouldn't raise -EIOCBQUEUED, and aio_complete() would be called. In the future finished_one_bio()'s tests wouldn't reflect this and aio_complete() would be called for a second time which can manifest as an oops. The previous cleanups have whittled the sync and async completion paths down to the point where we can collapse them and clearly reassert the invariant that we must only call aio_complete() after returning -EIOCBQUEUED. direct_io_worker() will only return -EIOCBQUEUED when it is not the last to drop the dio refcount and the aio bio completion path will only call aio_complete() when it is the last to drop the dio refcount. direct_io_worker() can ensure that it is the last to drop the reference count by waiting for bios to drain. It does this for sync ops, of course, and for partial dio writes that must fall back to buffered and for aio ops that saw errors during submission. This means that operations that end up waiting, even if they were issued as aio ops, will not call aio_complete() from dio. Instead we return the return code of the operation and let the aio core call aio_complete(). This is purposely done to fix a bug where AIO DIO file extensions would call aio_complete() before their callers have a chance to update i_size. Now that direct_io_worker() is explicitly returning -EIOCBQUEUED its callers no longer have to translate for it. XFS needs to be careful not to free resources that will be used during AIO completion if -EIOCBQUEUED is returned. We maintain the previous behaviour of trying to write fs metadata for O_SYNC aio+dio writes. Signed-off-by: Zach Brown <zach.brown@oracle.com> Cc: Badari Pulavarty <pbadari@us.ibm.com> Cc: Suparna Bhattacharya <suparna@in.ibm.com> Acked-by: Jeff Moyer <jmoyer@redhat.com> Cc: <xfs-masters@oss.sgi.com> Signed-off-by: Andrew Morton <akpm@osdl.org> Signed-off-by: Linus Torvalds <torvalds@osdl.org> diff --git a/fs/direct-io.c b/fs/direct-io.c index f11f05d..71f4aea 100644 --- a/fs/direct-io.c +++ b/fs/direct-io.c @@ -226,6 +226,15 @@ static int dio_complete(struct dio *dio, { ssize_t transferred = 0; + /* + * AIO submission can race with bio completion to get here while + * expecting to have the last io completed by bio completion. + * In that case -EIOCBQUEUED is in fact not an error we want + * to preserve through this call. + */ + if (ret == -EIOCBQUEUED) + ret = 0; + if (dio->result) { transferred = dio->result; @@ -251,24 +260,6 @@ static int dio_complete(struct dio *dio, return ret; } -/* - * Called when a BIO has been processed. If the count goes to zero then IO is - * complete and we can signal this to the AIO layer. - */ -static void dio_complete_aio(struct dio *dio) -{ - int ret; - - ret = dio_complete(dio, dio->iocb->ki_pos, 0); - - /* Complete AIO later if falling back to buffered i/o */ - if (dio->result == dio->size || - ((dio->rw == READ) && dio->result)) { - aio_complete(dio->iocb, ret, 0); - kfree(dio); - } -} - static int dio_bio_complete(struct dio *dio, struct bio *bio); /* * Asynchronous IO callback. @@ -290,8 +281,11 @@ static int dio_bio_end_aio(struct bio *b if (remaining == 1 && waiter_holds_ref) wake_up_process(dio->waiter); - if (remaining == 0) - dio_complete_aio(dio); + if (remaining == 0) { + int ret = dio_complete(dio, dio->iocb->ki_pos, 0); + aio_complete(dio->iocb, ret, 0); + kfree(dio); + } return 0; } @@ -1082,47 +1076,33 @@ direct_io_worker(int rw, struct kiocb *i mutex_unlock(&dio->inode->i_mutex); /* - * OK, all BIOs are submitted, so we can decrement bio_count to truly - * reflect the number of to-be-processed BIOs. + * The only time we want to leave bios in flight is when a successful + * partial aio read or full aio write have been setup. In that case + * bio completion will call aio_complete. The only time it's safe to + * call aio_complete is when we return -EIOCBQUEUED, so we key on that. + * This had *better* be the only place that raises -EIOCBQUEUED. */ - if (dio->is_async) { - int should_wait = 0; - - if (dio->result < dio->size && (rw & WRITE)) { - dio->waiter = current; - should_wait = 1; - } - if (ret == 0) - ret = dio->result; - - if (should_wait) - dio_await_completion(dio); - - /* this can free the dio */ - if (atomic_dec_and_test(&dio->refcount)) - dio_complete_aio(dio); + BUG_ON(ret == -EIOCBQUEUED); + if (dio->is_async && ret == 0 && dio->result && + ((rw & READ) || (dio->result == dio->size))) + ret = -EIOCBQUEUED; - if (should_wait) - kfree(dio); - } else { + if (ret != -EIOCBQUEUED) dio_await_completion(dio); + /* + * Sync will always be dropping the final ref and completing the + * operation. AIO can if it was a broken operation described above + * or in fact if all the bios race to complete before we get here. + * In that case dio_complete() translates the EIOCBQUEUED into + * the proper return code that the caller will hand to aio_complete(). + */ + if (atomic_dec_and_test(&dio->refcount)) { ret = dio_complete(dio, offset, ret); + kfree(dio); + } else + BUG_ON(ret != -EIOCBQUEUED); - /* We could have also come here on an AIO file extend */ - if (!is_sync_kiocb(iocb) && (rw & WRITE) && - ret >= 0 && dio->result == dio->size) - /* - * For AIO writes where we have completed the - * i/o, we have to mark the the aio complete. - */ - aio_complete(iocb, ret, 0); - - if (atomic_dec_and_test(&dio->refcount)) - kfree(dio); - else - BUG(); - } return ret; } diff --git a/fs/xfs/linux-2.6/xfs_aops.c b/fs/xfs/linux-2.6/xfs_aops.c index 8e6b56f..b56eb75 100644 --- a/fs/xfs/linux-2.6/xfs_aops.c +++ b/fs/xfs/linux-2.6/xfs_aops.c @@ -1406,7 +1406,7 @@ xfs_vm_direct_IO( xfs_end_io_direct); } - if (unlikely(ret <= 0 && iocb->private)) + if (unlikely(ret != -EIOCBQUEUED && iocb->private)) xfs_destroy_ioend(iocb->private); return ret; } diff --git a/mm/filemap.c b/mm/filemap.c index 606432f..8332c77 100644 --- a/mm/filemap.c +++ b/mm/filemap.c @@ -1181,8 +1181,6 @@ generic_file_aio_read(struct kiocb *iocb if (pos < size) { retval = generic_file_direct_IO(READ, iocb, iov, pos, nr_segs); - if (retval > 0 && !is_sync_kiocb(iocb)) - retval = -EIOCBQUEUED; if (retval > 0) *ppos = pos + retval; } @@ -2047,15 +2045,14 @@ generic_file_direct_write(struct kiocb * * Sync the fs metadata but not the minor inode changes and * of course not the data as we did direct DMA for the IO. * i_mutex is held, which protects generic_osync_inode() from - * livelocking. + * livelocking. AIO O_DIRECT ops attempt to sync metadata here. */ - if (written >= 0 && ((file->f_flags & O_SYNC) || IS_SYNC(inode))) { + if ((written >= 0 || written == -EIOCBQUEUED) && + ((file->f_flags & O_SYNC) || IS_SYNC(inode))) { int err = generic_osync_inode(inode, mapping, OSYNC_METADATA); if (err < 0) written = err; } - if (written == count && !is_sync_kiocb(iocb)) - written = -EIOCBQUEUED; return written; } EXPORT_SYMBOL(generic_file_direct_write);