Sophie

Sophie

distrib > Scientific%20Linux > 5x > x86_64 > by-pkgid > 27922b4260f65d317aabda37e42bbbff > files > 599

kernel-2.6.18-238.el5.src.rpm

From: Jeff Moyer <jmoyer@redhat.com>
Subject: [rhel5 patch 0/5] dio: clean up completion phase of direct_io_worker()
Date: Fri, 01 Jun 2007 15:36:26 -0400
Bugzilla: 242116
Message-Id: <x49lkf331s5.fsf@segfault.boston.devel.redhat.com>
Changelog: [dio] clean up completion phase of direct_io_worker()


Hi,

This patch set is Zach Brown's cleanup to the direct I/O completion
path.  I'll include his changelogs verbatim, and the patches apply
verbatim, as well.  I have applied these patches to my rhel5 dio
branch and have been testing them and distributing them to customers
and partners for testing over the last few months.  My tests consist
of both directed tests (where possible), and of generic testing using
the aio-stress and fio utilities, as well as the regression test suite
that Zach and I have been working on.  There is 1 bugzilla which is
fixed directly by this patch series:

  Bugzilla Bug 242116: race in aio io_submit write/read

The customer provided a reproducer for the short read problem, and I
was able to modify the code to trigger the problem within a very short
amount of time (within 3-30 seconds).  With this patch set in place, I
have been unable to trigger the problem.

Comments, as always, are encouraged.

Thanks!

Jeff

---

dio: clean up completion phase of direct_io_worker()

There have been a lot of bugs recently due to the way direct_io_worker() tries
to decide how to finish direct IO operations.  In the worst examples it has
failed to call aio_complete() at all (hang) or called it too many times (oops).

This set of patches cleans up the completion phase with the goal of removing
the complexity that lead to these bugs.  We end up with one path that
calculates the result of the operation after all off the bios have completed.
We decide when to generate a result of the operation using that path based on
the final release of a refcount on the dio structure.

I tried to progress towards the final state in steps that were relatively easy
to understand.  Each step should compile but I only tested the final result of
having all the patches applied.

The patches result in a slight net decrease in code and binary size:

 2.6.18-rc4-dio-cleanup/fs/direct-io.c |    8
 2.6.18-rc5-dio-cleanup/fs/direct-io.c |   94 +++++------
 2.6.18-rc5-dio-cleanup/mm/filemap.c   |    4
 fs/direct-io.c                        |  273 ++++++++++++++--------------------
 4 files changed, 159 insertions(+), 220 deletions(-)
   text    data     bss     dec     hex filename
2592385  450996  210296 3253677  31a5ad vmlinux.before
2592113  450980  210296 3253389  31a48d vmlinux.after

The patches pass light testing with aio-stress, the direct IO tests I could
manage to get running in LTP, and some home-brew functional tests.  It's still
making its way through stress testing.  It should not be merged until we hear
from that more rigorous testing, I don't think.

I hoped to get some feedback (and maybe volunteers for testing!) by sending the
patches out before waiting for the stress tests.

commit 6d544bb4d9019c3a0d7ee4af1e4bbbd61a6e16dc
Author: Zach Brown <zach.brown@oracle.com>

    [PATCH] dio: centralize completion in dio_complete()
    
    There have been a lot of bugs recently due to the way direct_io_worker() tries
    to decide how to finish direct IO operations.  In the worst examples it has
    failed to call aio_complete() at all (hang) or called it too many times
    (oops).
    
    This set of patches cleans up the completion phase with the goal of removing
    the complexity that lead to these bugs.  We end up with one path that
    calculates the result of the operation after all off the bios have completed.
    We decide when to generate a result of the operation using that path based on
    the final release of a refcount on the dio structure.
    
    I tried to progress towards the final state in steps that were relatively easy
    to understand.  Each step should compile but I only tested the final result of
    having all the patches applied.
    
    I've tested these on low end PC drives with aio-stress, the direct IO tests I
    could manage to get running in LTP, orasim, and some home-brew functional
    tests.
    
    In http://lkml.org/lkml/2006/9/21/103 IBM reports success with ext2 and ext3
    running DIO LTP tests.  They found that XFS bug which has since been addressed
    in the patch series.
    
    This patch:
    
    The mechanics which decide the result of a direct IO operation were duplicated
    in the sync and async paths.
    
    The async path didn't check page_errors which can manifest as silently
    returning success when the final pointer in an operation faults and its
    matching file region is filled with zeros.
    
    The sync path and async path differed in whether they passed errors to the
    caller's dio->end_io operation.  The async path was passing errors to it which
    trips an assertion in XFS, though it is apparently harmless.
    
    This centralizes the completion phase of dio ops in one place.  AIO will now
    return EFAULT consistently and all paths fall back to the previously sync
    behaviour of passing the number of bytes 'transferred' to the dio->end_io
    callback, regardless of errors.
    
    dio_await_completion() doesn't have to propogate EIO from non-uptodate bios
    now that it's being propogated through dio_complete() via dio->io_error.  This
    lets it return void which simplifies its sole caller.
    
    Signed-off-by: Zach Brown <zach.brown@oracle.com>
    Cc: Badari Pulavarty <pbadari@us.ibm.com>
    Cc: Suparna Bhattacharya <suparna@in.ibm.com>
    Acked-by: Jeff Moyer <jmoyer@redhat.com>
    Signed-off-by: Andrew Morton <akpm@osdl.org>
    Signed-off-by: Linus Torvalds <torvalds@osdl.org>

diff --git a/fs/direct-io.c b/fs/direct-io.c
index 45d34d8..b57b671 100644
--- a/fs/direct-io.c
+++ b/fs/direct-io.c
@@ -210,19 +210,46 @@ static struct page *dio_get_page(struct
 	return dio->pages[dio->head++];
 }
 
-/*
- * Called when all DIO BIO I/O has been completed - let the filesystem
- * know, if it registered an interest earlier via get_block.  Pass the
- * private field of the map buffer_head so that filesystems can use it
- * to hold additional state between get_block calls and dio_complete.
+/**
+ * dio_complete() - called when all DIO BIO I/O has been completed
+ * @offset: the byte offset in the file of the completed operation
+ *
+ * This releases locks as dictated by the locking type, lets interested parties
+ * know that a DIO operation has completed, and calculates the resulting return
+ * code for the operation.
+ *
+ * It lets the filesystem know if it registered an interest earlier via
+ * get_block.  Pass the private field of the map buffer_head so that
+ * filesystems can use it to hold additional state between get_block calls and
+ * dio_complete.
  */
-static void dio_complete(struct dio *dio, loff_t offset, ssize_t bytes)
+static int dio_complete(struct dio *dio, loff_t offset, int ret)
 {
+	ssize_t transferred = 0;
+
+	if (dio->result) {
+		transferred = dio->result;
+
+		/* Check for short read case */
+		if ((dio->rw == READ) && ((offset + transferred) > dio->i_size))
+			transferred = dio->i_size - offset;
+	}
+
 	if (dio->end_io && dio->result)
-		dio->end_io(dio->iocb, offset, bytes, dio->map_bh.b_private);
+		dio->end_io(dio->iocb, offset, transferred,
+			    dio->map_bh.b_private);
 	if (dio->lock_type == DIO_LOCKING)
 		/* lockdep: non-owner release */
 		up_read_non_owner(&dio->inode->i_alloc_sem);
+
+	if (ret == 0)
+		ret = dio->page_errors;
+	if (ret == 0)
+		ret = dio->io_error;
+	if (ret == 0)
+		ret = transferred;
+
+	return ret;
 }
 
 /*
@@ -236,8 +263,7 @@ static void finished_one_bio(struct dio
 	spin_lock_irqsave(&dio->bio_lock, flags);
 	if (dio->bio_count == 1) {
 		if (dio->is_async) {
-			ssize_t transferred;
-			loff_t offset;
+			int ret;
 
 			/*
 			 * Last reference to the dio is going away.
@@ -245,24 +271,12 @@ static void finished_one_bio(struct dio
 			 */
 			spin_unlock_irqrestore(&dio->bio_lock, flags);
 
-			/* Check for short read case */
-			transferred = dio->result;
-			offset = dio->iocb->ki_pos;
-
-			if ((dio->rw == READ) &&
-			    ((offset + transferred) > dio->i_size))
-				transferred = dio->i_size - offset;
-
-			/* check for error in completion path */
-			if (dio->io_error)
-				transferred = dio->io_error;
-
-			dio_complete(dio, offset, transferred);
+			ret = dio_complete(dio, dio->iocb->ki_pos, 0);
 
 			/* Complete AIO later if falling back to buffered i/o */
 			if (dio->result == dio->size ||
 				((dio->rw == READ) && dio->result)) {
-				aio_complete(dio->iocb, transferred, 0);
+				aio_complete(dio->iocb, ret, 0);
 				kfree(dio);
 				return;
 			} else {
@@ -434,10 +448,8 @@ static int dio_bio_complete(struct dio *
 /*
  * Wait on and process all in-flight BIOs.
  */
-static int dio_await_completion(struct dio *dio)
+static void dio_await_completion(struct dio *dio)
 {
-	int ret = 0;
-
 	if (dio->bio)
 		dio_bio_submit(dio);
 
@@ -448,13 +460,9 @@ static int dio_await_completion(struct d
 	 */
 	while (dio->bio_count) {
 		struct bio *bio = dio_await_one(dio);
-		int ret2;
-
-		ret2 = dio_bio_complete(dio, bio);
-		if (ret == 0)
-			ret = ret2;
+		/* io errors are propogated through dio->io_error */
+		dio_bio_complete(dio, bio);
 	}
-	return ret;
 }
 
 /*
@@ -1127,28 +1135,10 @@ direct_io_worker(int rw, struct kiocb *i
 			kfree(dio);
 		}
 	} else {
-		ssize_t transferred = 0;
-
 		finished_one_bio(dio);
-		ret2 = dio_await_completion(dio);
-		if (ret == 0)
-			ret = ret2;
-		if (ret == 0)
-			ret = dio->page_errors;
-		if (dio->result) {
-			loff_t i_size = i_size_read(inode);
+		dio_await_completion(dio);
 
-			transferred = dio->result;
-			/*
-			 * Adjust the return value if the read crossed a
-			 * non-block-aligned EOF.
-			 */
-			if (rw == READ && (offset + transferred > i_size))
-				transferred = i_size - offset;
-		}
-		dio_complete(dio, offset, transferred);
-		if (ret == 0)
-			ret = transferred;
+		ret = dio_complete(dio, offset, ret);
 
 		/* We could have also come here on an AIO file extend */
 		if (!is_sync_kiocb(iocb) && (rw & WRITE) &&

commit 17a7b1d74b1207f8f1af40b5d184989076d08f8b
Author: Zach Brown <zach.brown@oracle.com>

    [PATCH] dio: call blk_run_address_space() once per op
    
    We only need to call blk_run_address_space() once after all the bios for the
    direct IO op have been submitted.  This removes the chance of calling
    blk_run_address_space() after spurious wake ups as the sync path waits for
    bios to drain.  It's also one less difference betwen the sync and async paths.
    
    In the process we remove a redundant dio_bio_submit() that its caller had
    already performed.
    
    Signed-off-by: Zach Brown <zach.brown@oracle.com>
    Cc: Badari Pulavarty <pbadari@us.ibm.com>
    Cc: Suparna Bhattacharya <suparna@in.ibm.com>
    Acked-by: Jeff Moyer <jmoyer@redhat.com>
    Signed-off-by: Andrew Morton <akpm@osdl.org>
    Signed-off-by: Linus Torvalds <torvalds@osdl.org>

diff --git a/fs/direct-io.c b/fs/direct-io.c
index b57b671..b296942 100644
--- a/fs/direct-io.c
+++ b/fs/direct-io.c
@@ -404,7 +404,6 @@ static struct bio *dio_await_one(struct
 		if (dio->bio_list == NULL) {
 			dio->waiter = current;
 			spin_unlock_irqrestore(&dio->bio_lock, flags);
-			blk_run_address_space(dio->inode->i_mapping);
 			io_schedule();
 			spin_lock_irqsave(&dio->bio_lock, flags);
 			dio->waiter = NULL;
@@ -450,9 +449,6 @@ static int dio_bio_complete(struct dio *
  */
 static void dio_await_completion(struct dio *dio)
 {
-	if (dio->bio)
-		dio_bio_submit(dio);
-
 	/*
 	 * The bio_lock is not held for the read of bio_count.
 	 * This is ok since it is the dio_bio_complete() that changes
@@ -1085,6 +1081,9 @@ direct_io_worker(int rw, struct kiocb *i
 	if (dio->bio)
 		dio_bio_submit(dio);
 
+	/* All IO is now issued, send it on its way */
+	blk_run_address_space(inode->i_mapping);
+
 	/*
 	 * It is possible that, we return short IO due to end of file.
 	 * In that case, we need to release all the pages we got hold on.
@@ -1113,7 +1112,6 @@ direct_io_worker(int rw, struct kiocb *i
 		if (ret == 0)
 			ret = dio->result;
 		finished_one_bio(dio);		/* This can free the dio */
-		blk_run_address_space(inode->i_mapping);
 		if (should_wait) {
 			unsigned long flags;
 			/*

commit 0273201e693fd62381f6b1e85b15ffc117d8a46e
Author: Zach Brown <zach.brown@oracle.com>

    [PATCH] dio: formalize bio counters as a dio reference count
    
    Previously we had two confusing counts of bio progress.  'bio_count' was
    decremented as bios were processed and freed by the dio core.  It was used to
    indicate final completion of the dio operation.  'bios_in_flight' reflected
    how many bios were between submit_bio() and bio->end_io.  It was used by the
    sync path to decide when to wake up and finish completing bios and was ignored
    by the async path.
    
    This patch collapses the two notions into one notion of a dio reference count.
     bios hold a dio reference when they're between submit_bio and bio->end_io.
    
    Since bios_in_flight was only used in the sync path it is now equivalent to
    dio->refcount - 1 which accounts for direct_io_worker() holding a reference
    for the duration of the operation.
    
    dio_bio_complete() -> finished_one_bio() was called from the sync path after
    finding bios on the list that the bio->end_io function had deposited.
    finished_one_bio() can not drop the dio reference on behalf of these bios now
    because bio->end_io already has.  The is_async test in finished_one_bio()
    meant that it never actually did anything other than drop the bio_count for
    sync callers.  So we remove its refcount decrement, don't call it from
    dio_bio_complete(), and hoist its call up into the async dio_bio_complete()
    caller after an explicit refcount decrement.  It is renamed dio_complete_aio()
    to reflect the remaining work it actually does.
    
    Signed-off-by: Zach Brown <zach.brown@oracle.com>
    Cc: Badari Pulavarty <pbadari@us.ibm.com>
    Cc: Suparna Bhattacharya <suparna@in.ibm.com>
    Acked-by: Jeff Moyer <jmoyer@redhat.com>
    Signed-off-by: Andrew Morton <akpm@osdl.org>
    Signed-off-by: Linus Torvalds <torvalds@osdl.org>

diff --git a/fs/direct-io.c b/fs/direct-io.c
index b296942..bc1cbf9 100644
--- a/fs/direct-io.c
+++ b/fs/direct-io.c
@@ -121,9 +121,8 @@ struct dio {
 	int page_errors;		/* errno from get_user_pages() */
 
 	/* BIO completion state */
+	atomic_t refcount;		/* direct_io_worker() and bios */
 	spinlock_t bio_lock;		/* protects BIO fields below */
-	int bio_count;			/* nr bios to be completed */
-	int bios_in_flight;		/* nr bios in flight */
 	struct bio *bio_list;		/* singly linked via bi_private */
 	struct task_struct *waiter;	/* waiting task (NULL if none) */
 
@@ -256,44 +255,27 @@ static int dio_complete(struct dio *dio,
  * Called when a BIO has been processed.  If the count goes to zero then IO is
  * complete and we can signal this to the AIO layer.
  */
-static void finished_one_bio(struct dio *dio)
+static void dio_complete_aio(struct dio *dio)
 {
 	unsigned long flags;
+	int ret;
 
-	spin_lock_irqsave(&dio->bio_lock, flags);
-	if (dio->bio_count == 1) {
-		if (dio->is_async) {
-			int ret;
-
-			/*
-			 * Last reference to the dio is going away.
-			 * Drop spinlock and complete the DIO.
-			 */
-			spin_unlock_irqrestore(&dio->bio_lock, flags);
-
-			ret = dio_complete(dio, dio->iocb->ki_pos, 0);
+	ret = dio_complete(dio, dio->iocb->ki_pos, 0);
 
-			/* Complete AIO later if falling back to buffered i/o */
-			if (dio->result == dio->size ||
-				((dio->rw == READ) && dio->result)) {
-				aio_complete(dio->iocb, ret, 0);
-				kfree(dio);
-				return;
-			} else {
-				/*
-				 * Falling back to buffered
-				 */
-				spin_lock_irqsave(&dio->bio_lock, flags);
-				dio->bio_count--;
-				if (dio->waiter)
-					wake_up_process(dio->waiter);
-				spin_unlock_irqrestore(&dio->bio_lock, flags);
-				return;
-			}
-		}
+	/* Complete AIO later if falling back to buffered i/o */
+	if (dio->result == dio->size ||
+		((dio->rw == READ) && dio->result)) {
+		aio_complete(dio->iocb, ret, 0);
+		kfree(dio);
+	} else {
+		/*
+		 * Falling back to buffered
+		 */
+		spin_lock_irqsave(&dio->bio_lock, flags);
+		if (dio->waiter)
+			wake_up_process(dio->waiter);
+		spin_unlock_irqrestore(&dio->bio_lock, flags);
 	}
-	dio->bio_count--;
-	spin_unlock_irqrestore(&dio->bio_lock, flags);
 }
 
 static int dio_bio_complete(struct dio *dio, struct bio *bio);
@@ -309,6 +291,10 @@ static int dio_bio_end_aio(struct bio *b
 
 	/* cleanup the bio */
 	dio_bio_complete(dio, bio);
+
+	if (atomic_dec_and_test(&dio->refcount))
+		dio_complete_aio(dio);
+
 	return 0;
 }
 
@@ -330,8 +316,7 @@ static int dio_bio_end_io(struct bio *bi
 	spin_lock_irqsave(&dio->bio_lock, flags);
 	bio->bi_private = dio->bio_list;
 	dio->bio_list = bio;
-	dio->bios_in_flight--;
-	if (dio->waiter && dio->bios_in_flight == 0)
+	if ((atomic_sub_return(1, &dio->refcount) == 1) && dio->waiter)
 		wake_up_process(dio->waiter);
 	spin_unlock_irqrestore(&dio->bio_lock, flags);
 	return 0;
@@ -362,17 +347,15 @@ dio_bio_alloc(struct dio *dio, struct bl
  * In the AIO read case we speculatively dirty the pages before starting IO.
  * During IO completion, any of these pages which happen to have been written
  * back will be redirtied by bio_check_pages_dirty().
+ *
+ * bios hold a dio reference between submit_bio and ->end_io.
  */
 static void dio_bio_submit(struct dio *dio)
 {
 	struct bio *bio = dio->bio;
-	unsigned long flags;
 
 	bio->bi_private = dio;
-	spin_lock_irqsave(&dio->bio_lock, flags);
-	dio->bio_count++;
-	dio->bios_in_flight++;
-	spin_unlock_irqrestore(&dio->bio_lock, flags);
+	atomic_inc(&dio->refcount);
 	if (dio->is_async && dio->rw == READ)
 		bio_set_pages_dirty(bio);
 	submit_bio(dio->rw, bio);
@@ -390,18 +373,28 @@ static void dio_cleanup(struct dio *dio)
 		page_cache_release(dio_get_page(dio));
 }
 
+static int wait_for_more_bios(struct dio *dio)
+{
+	assert_spin_locked(&dio->bio_lock);
+
+	return (atomic_read(&dio->refcount) > 1) && (dio->bio_list == NULL);
+}
+
 /*
- * Wait for the next BIO to complete.  Remove it and return it.
+ * Wait for the next BIO to complete.  Remove it and return it.  NULL is
+ * returned once all BIOs have been completed.  This must only be called once
+ * all bios have been issued so that dio->refcount can only decrease.  This
+ * requires that that the caller hold a reference on the dio.
  */
 static struct bio *dio_await_one(struct dio *dio)
 {
 	unsigned long flags;
-	struct bio *bio;
+	struct bio *bio = NULL;
 
 	spin_lock_irqsave(&dio->bio_lock, flags);
-	while (dio->bio_list == NULL) {
+	while (wait_for_more_bios(dio)) {
 		set_current_state(TASK_UNINTERRUPTIBLE);
-		if (dio->bio_list == NULL) {
+		if (wait_for_more_bios(dio)) {
 			dio->waiter = current;
 			spin_unlock_irqrestore(&dio->bio_lock, flags);
 			io_schedule();
@@ -410,8 +403,10 @@ static struct bio *dio_await_one(struct
 		}
 		set_current_state(TASK_RUNNING);
 	}
-	bio = dio->bio_list;
-	dio->bio_list = bio->bi_private;
+	if (dio->bio_list) {
+		bio = dio->bio_list;
+		dio->bio_list = bio->bi_private;
+	}
 	spin_unlock_irqrestore(&dio->bio_lock, flags);
 	return bio;
 }
@@ -440,25 +435,24 @@ static int dio_bio_complete(struct dio *
 		}
 		bio_put(bio);
 	}
-	finished_one_bio(dio);
 	return uptodate ? 0 : -EIO;
 }
 
 /*
- * Wait on and process all in-flight BIOs.
+ * Wait on and process all in-flight BIOs.  This must only be called once
+ * all bios have been issued so that the refcount can only decrease.
+ * This just waits for all bios to make it through dio_bio_complete.  IO
+ * errors are propogated through dio->io_error and should be propogated via
+ * dio_complete().
  */
 static void dio_await_completion(struct dio *dio)
 {
-	/*
-	 * The bio_lock is not held for the read of bio_count.
-	 * This is ok since it is the dio_bio_complete() that changes
-	 * bio_count.
-	 */
-	while (dio->bio_count) {
-		struct bio *bio = dio_await_one(dio);
-		/* io errors are propogated through dio->io_error */
-		dio_bio_complete(dio, bio);
-	}
+	struct bio *bio;
+	do {
+		bio = dio_await_one(dio);
+		if (bio)
+			dio_bio_complete(dio, bio);
+	} while (bio);
 }
 
 /*
@@ -995,16 +989,7 @@ direct_io_worker(int rw, struct kiocb *i
 	dio->iocb = iocb;
 	dio->i_size = i_size_read(inode);
 
-	/*
-	 * BIO completion state.
-	 *
-	 * ->bio_count starts out at one, and we decrement it to zero after all
-	 * BIOs are submitted.  This to avoid the situation where a really fast
-	 * (or synchronous) device could take the count to zero while we're
-	 * still submitting BIOs.
-	 */
-	dio->bio_count = 1;
-	dio->bios_in_flight = 0;
+	atomic_set(&dio->refcount, 1);
 	spin_lock_init(&dio->bio_lock);
 	dio->bio_list = NULL;
 	dio->waiter = NULL;
@@ -1111,7 +1096,11 @@ direct_io_worker(int rw, struct kiocb *i
 		}
 		if (ret == 0)
 			ret = dio->result;
-		finished_one_bio(dio);		/* This can free the dio */
+
+		/* this can free the dio */
+		if (atomic_dec_and_test(&dio->refcount))
+			dio_complete_aio(dio);
+
 		if (should_wait) {
 			unsigned long flags;
 			/*
@@ -1122,7 +1111,7 @@ direct_io_worker(int rw, struct kiocb *i
 
 			spin_lock_irqsave(&dio->bio_lock, flags);
 			set_current_state(TASK_UNINTERRUPTIBLE);
-			while (dio->bio_count) {
+			while (atomic_read(&dio->refcount)) {
 				spin_unlock_irqrestore(&dio->bio_lock, flags);
 				io_schedule();
 				spin_lock_irqsave(&dio->bio_lock, flags);
@@ -1133,7 +1122,6 @@ direct_io_worker(int rw, struct kiocb *i
 			kfree(dio);
 		}
 	} else {
-		finished_one_bio(dio);
 		dio_await_completion(dio);
 
 		ret = dio_complete(dio, offset, ret);
@@ -1146,7 +1134,11 @@ direct_io_worker(int rw, struct kiocb *i
 			 * i/o, we have to mark the the aio complete.
 			 */
 			aio_complete(iocb, ret, 0);
-		kfree(dio);
+
+		if (atomic_dec_and_test(&dio->refcount))
+			kfree(dio);
+		else
+			BUG();
 	}
 	return ret;
 }

commit 20258b2b397031649e4a41922fe803d57017df84
Author: Zach Brown <zach.brown@oracle.com>

    [PATCH] dio: remove duplicate bio wait code
    
    Now that we have a single refcount and waiting path we can reuse it in the
    async 'should_wait' path.  It continues to rely on the fragile link between
    the conditional in dio_complete_aio() which decides to complete the AIO and
    the conditional in direct_io_worker() which decides to wait and free.
    
    By waiting before dropping the reference we stop dio_bio_end_aio() from
    calling dio_complete_aio() which used to wake up the waiter after seeing the
    reference count drop to 0.  We hoist this wake up into dio_bio_end_aio() which
    now notices when it's left a single remaining reference that is held by the
    waiter.
    
    Signed-off-by: Zach Brown <zach.brown@oracle.com>
    Cc: Badari Pulavarty <pbadari@us.ibm.com>
    Cc: Suparna Bhattacharya <suparna@in.ibm.com>
    Acked-by: Jeff Moyer <jmoyer@redhat.com>
    Signed-off-by: Andrew Morton <akpm@osdl.org>
    Signed-off-by: Linus Torvalds <torvalds@osdl.org>

diff --git a/fs/direct-io.c b/fs/direct-io.c
index bc1cbf9..f11f05d 100644
--- a/fs/direct-io.c
+++ b/fs/direct-io.c
@@ -257,7 +257,6 @@ static int dio_complete(struct dio *dio,
  */
 static void dio_complete_aio(struct dio *dio)
 {
-	unsigned long flags;
 	int ret;
 
 	ret = dio_complete(dio, dio->iocb->ki_pos, 0);
@@ -267,14 +266,6 @@ static void dio_complete_aio(struct dio
 		((dio->rw == READ) && dio->result)) {
 		aio_complete(dio->iocb, ret, 0);
 		kfree(dio);
-	} else {
-		/*
-		 * Falling back to buffered
-		 */
-		spin_lock_irqsave(&dio->bio_lock, flags);
-		if (dio->waiter)
-			wake_up_process(dio->waiter);
-		spin_unlock_irqrestore(&dio->bio_lock, flags);
 	}
 }
 
@@ -285,6 +276,8 @@ static int dio_bio_complete(struct dio *
 static int dio_bio_end_aio(struct bio *bio, unsigned int bytes_done, int error)
 {
 	struct dio *dio = bio->bi_private;
+	int waiter_holds_ref = 0;
+	int remaining;
 
 	if (bio->bi_size)
 		return 1;
@@ -292,7 +285,12 @@ static int dio_bio_end_aio(struct bio *b
 	/* cleanup the bio */
 	dio_bio_complete(dio, bio);
 
-	if (atomic_dec_and_test(&dio->refcount))
+	waiter_holds_ref = !!dio->waiter;
+	remaining = atomic_sub_return(1, (&dio->refcount));
+	if (remaining == 1 && waiter_holds_ref)
+		wake_up_process(dio->waiter);
+
+	if (remaining == 0)
 		dio_complete_aio(dio);
 
 	return 0;
@@ -1097,30 +1095,15 @@ direct_io_worker(int rw, struct kiocb *i
 		if (ret == 0)
 			ret = dio->result;
 
+		if (should_wait)
+			dio_await_completion(dio);
+
 		/* this can free the dio */
 		if (atomic_dec_and_test(&dio->refcount))
 			dio_complete_aio(dio);
 
-		if (should_wait) {
-			unsigned long flags;
-			/*
-			 * Wait for already issued I/O to drain out and
-			 * release its references to user-space pages
-			 * before returning to fallback on buffered I/O
-			 */
-
-			spin_lock_irqsave(&dio->bio_lock, flags);
-			set_current_state(TASK_UNINTERRUPTIBLE);
-			while (atomic_read(&dio->refcount)) {
-				spin_unlock_irqrestore(&dio->bio_lock, flags);
-				io_schedule();
-				spin_lock_irqsave(&dio->bio_lock, flags);
-				set_current_state(TASK_UNINTERRUPTIBLE);
-			}
-			spin_unlock_irqrestore(&dio->bio_lock, flags);
-			set_current_state(TASK_RUNNING);
+		if (should_wait)
 			kfree(dio);
-		}
 	} else {
 		dio_await_completion(dio);
 

commit 8459d86aff04fa53c2ab6a6b9f355b3063cc8014
Author: Zach Brown <zach.brown@oracle.com>

    [PATCH] dio: only call aio_complete() after returning -EIOCBQUEUED
    
    The only time it is safe to call aio_complete() is when the ->ki_retry
    function returns -EIOCBQUEUED to the AIO core.  direct_io_worker() has
    historically done this by relying on its caller to translate positive return
    codes into -EIOCBQUEUED for the aio case.  It did this by trying to keep
    conditionals in sync.  direct_io_worker() knew when finished_one_bio() was
    going to call aio_complete().  It would reverse the test and wait and free the
    dio in the cases it thought that finished_one_bio() wasn't going to.
    
    Not surprisingly, it ended up getting it wrong.  'ret' could be a negative
    errno from the submission path but it failed to communicate this to
    finished_one_bio().  direct_io_worker() would return < 0, it's callers
    wouldn't raise -EIOCBQUEUED, and aio_complete() would be called.  In the
    future finished_one_bio()'s tests wouldn't reflect this and aio_complete()
    would be called for a second time which can manifest as an oops.
    
    The previous cleanups have whittled the sync and async completion paths down
    to the point where we can collapse them and clearly reassert the invariant
    that we must only call aio_complete() after returning -EIOCBQUEUED.
    direct_io_worker() will only return -EIOCBQUEUED when it is not the last to
    drop the dio refcount and the aio bio completion path will only call
    aio_complete() when it is the last to drop the dio refcount.
    direct_io_worker() can ensure that it is the last to drop the reference count
    by waiting for bios to drain.  It does this for sync ops, of course, and for
    partial dio writes that must fall back to buffered and for aio ops that saw
    errors during submission.
    
    This means that operations that end up waiting, even if they were issued as
    aio ops, will not call aio_complete() from dio.  Instead we return the return
    code of the operation and let the aio core call aio_complete().  This is
    purposely done to fix a bug where AIO DIO file extensions would call
    aio_complete() before their callers have a chance to update i_size.
    
    Now that direct_io_worker() is explicitly returning -EIOCBQUEUED its callers
    no longer have to translate for it.  XFS needs to be careful not to free
    resources that will be used during AIO completion if -EIOCBQUEUED is returned.
     We maintain the previous behaviour of trying to write fs metadata for O_SYNC
    aio+dio writes.
    
    Signed-off-by: Zach Brown <zach.brown@oracle.com>
    Cc: Badari Pulavarty <pbadari@us.ibm.com>
    Cc: Suparna Bhattacharya <suparna@in.ibm.com>
    Acked-by: Jeff Moyer <jmoyer@redhat.com>
    Cc: <xfs-masters@oss.sgi.com>
    Signed-off-by: Andrew Morton <akpm@osdl.org>
    Signed-off-by: Linus Torvalds <torvalds@osdl.org>

diff --git a/fs/direct-io.c b/fs/direct-io.c
index f11f05d..71f4aea 100644
--- a/fs/direct-io.c
+++ b/fs/direct-io.c
@@ -226,6 +226,15 @@ static int dio_complete(struct dio *dio,
 {
 	ssize_t transferred = 0;
 
+	/*
+	 * AIO submission can race with bio completion to get here while
+	 * expecting to have the last io completed by bio completion.
+	 * In that case -EIOCBQUEUED is in fact not an error we want
+	 * to preserve through this call.
+	 */
+	if (ret == -EIOCBQUEUED)
+		ret = 0;
+
 	if (dio->result) {
 		transferred = dio->result;
 
@@ -251,24 +260,6 @@ static int dio_complete(struct dio *dio,
 	return ret;
 }
 
-/*
- * Called when a BIO has been processed.  If the count goes to zero then IO is
- * complete and we can signal this to the AIO layer.
- */
-static void dio_complete_aio(struct dio *dio)
-{
-	int ret;
-
-	ret = dio_complete(dio, dio->iocb->ki_pos, 0);
-
-	/* Complete AIO later if falling back to buffered i/o */
-	if (dio->result == dio->size ||
-		((dio->rw == READ) && dio->result)) {
-		aio_complete(dio->iocb, ret, 0);
-		kfree(dio);
-	}
-}
-
 static int dio_bio_complete(struct dio *dio, struct bio *bio);
 /*
  * Asynchronous IO callback. 
@@ -290,8 +281,11 @@ static int dio_bio_end_aio(struct bio *b
 	if (remaining == 1 && waiter_holds_ref)
 		wake_up_process(dio->waiter);
 
-	if (remaining == 0)
-		dio_complete_aio(dio);
+	if (remaining == 0) {
+		int ret = dio_complete(dio, dio->iocb->ki_pos, 0);
+		aio_complete(dio->iocb, ret, 0);
+		kfree(dio);
+	}
 
 	return 0;
 }
@@ -1082,47 +1076,33 @@ direct_io_worker(int rw, struct kiocb *i
 		mutex_unlock(&dio->inode->i_mutex);
 
 	/*
-	 * OK, all BIOs are submitted, so we can decrement bio_count to truly
-	 * reflect the number of to-be-processed BIOs.
+	 * The only time we want to leave bios in flight is when a successful
+	 * partial aio read or full aio write have been setup.  In that case
+	 * bio completion will call aio_complete.  The only time it's safe to
+	 * call aio_complete is when we return -EIOCBQUEUED, so we key on that.
+	 * This had *better* be the only place that raises -EIOCBQUEUED.
 	 */
-	if (dio->is_async) {
-		int should_wait = 0;
-
-		if (dio->result < dio->size && (rw & WRITE)) {
-			dio->waiter = current;
-			should_wait = 1;
-		}
-		if (ret == 0)
-			ret = dio->result;
-
-		if (should_wait)
-			dio_await_completion(dio);
-
-		/* this can free the dio */
-		if (atomic_dec_and_test(&dio->refcount))
-			dio_complete_aio(dio);
+	BUG_ON(ret == -EIOCBQUEUED);
+	if (dio->is_async && ret == 0 && dio->result &&
+	    ((rw & READ) || (dio->result == dio->size)))
+		ret = -EIOCBQUEUED;
 
-		if (should_wait)
-			kfree(dio);
-	} else {
+	if (ret != -EIOCBQUEUED)
 		dio_await_completion(dio);
 
+	/*
+	 * Sync will always be dropping the final ref and completing the
+	 * operation.  AIO can if it was a broken operation described above
+	 * or in fact if all the bios race to complete before we get here.
+	 * In that case dio_complete() translates the EIOCBQUEUED into
+	 * the proper return code that the caller will hand to aio_complete().
+	 */
+	if (atomic_dec_and_test(&dio->refcount)) {
 		ret = dio_complete(dio, offset, ret);
+		kfree(dio);
+	} else
+		BUG_ON(ret != -EIOCBQUEUED);
 
-		/* We could have also come here on an AIO file extend */
-		if (!is_sync_kiocb(iocb) && (rw & WRITE) &&
-		    ret >= 0 && dio->result == dio->size)
-			/*
-			 * For AIO writes where we have completed the
-			 * i/o, we have to mark the the aio complete.
-			 */
-			aio_complete(iocb, ret, 0);
-
-		if (atomic_dec_and_test(&dio->refcount))
-			kfree(dio);
-		else
-			BUG();
-	}
 	return ret;
 }
 
diff --git a/fs/xfs/linux-2.6/xfs_aops.c b/fs/xfs/linux-2.6/xfs_aops.c
index 8e6b56f..b56eb75 100644
--- a/fs/xfs/linux-2.6/xfs_aops.c
+++ b/fs/xfs/linux-2.6/xfs_aops.c
@@ -1406,7 +1406,7 @@ xfs_vm_direct_IO(
 			xfs_end_io_direct);
 	}
 
-	if (unlikely(ret <= 0 && iocb->private))
+	if (unlikely(ret != -EIOCBQUEUED && iocb->private))
 		xfs_destroy_ioend(iocb->private);
 	return ret;
 }
diff --git a/mm/filemap.c b/mm/filemap.c
index 606432f..8332c77 100644
--- a/mm/filemap.c
+++ b/mm/filemap.c
@@ -1181,8 +1181,6 @@ generic_file_aio_read(struct kiocb *iocb
 		if (pos < size) {
 			retval = generic_file_direct_IO(READ, iocb,
 						iov, pos, nr_segs);
-			if (retval > 0 && !is_sync_kiocb(iocb))
-				retval = -EIOCBQUEUED;
 			if (retval > 0)
 				*ppos = pos + retval;
 		}
@@ -2047,15 +2045,14 @@ generic_file_direct_write(struct kiocb *
 	 * Sync the fs metadata but not the minor inode changes and
 	 * of course not the data as we did direct DMA for the IO.
 	 * i_mutex is held, which protects generic_osync_inode() from
-	 * livelocking.
+	 * livelocking.  AIO O_DIRECT ops attempt to sync metadata here.
 	 */
-	if (written >= 0 && ((file->f_flags & O_SYNC) || IS_SYNC(inode))) {
+	if ((written >= 0 || written == -EIOCBQUEUED) &&
+	    ((file->f_flags & O_SYNC) || IS_SYNC(inode))) {
 		int err = generic_osync_inode(inode, mapping, OSYNC_METADATA);
 		if (err < 0)
 			written = err;
 	}
-	if (written == count && !is_sync_kiocb(iocb))
-		written = -EIOCBQUEUED;
 	return written;
 }
 EXPORT_SYMBOL(generic_file_direct_write);