Sophie

Sophie

distrib > Scientific%20Linux > 5x > x86_64 > by-pkgid > 27922b4260f65d317aabda37e42bbbff > files > 928

kernel-2.6.18-238.el5.src.rpm

From: Jeff Moyer <jmoyer@redhat.com>
Date: Tue, 5 Aug 2008 16:26:48 -0400
Subject: [fs] dio: lock refcount operations
Message-id: x49wsivcinr.fsf@segfault.boston.devel.redhat.com
O-Subject: [rhel5 patch] dio: lock refcount operations
Bugzilla: 455750
RH-Acked-by: Eric Sandeen <sandeen@redhat.com>

Hi,

Bug 455750 (which is a clone of the RHEL 4 bug for which I already
posted a patch) describes a problem where a process is blocked
indefinitely in io_schedule.  The following commit fixes the problem:

commit 5eb6c7a2ab413dea1ee6c08dd58263a1c2c2efa3
Author: Zach Brown <zach.brown@oracle.com>
Date:   Sun Dec 10 02:21:07 2006 -0800

    [PATCH] dio: lock refcount operations

    The wait_for_more_bios() function name was poorly chosen.  While looking to
    clean it up it I noticed that the dio struct refcounting between the bio
    completion and dio submission paths was racey.

    The bio submission path was simply freeing the dio struct if
    atomic_dec_and_test() indicated that it dropped the final reference.

    The aio bio completion path was dereferencing its dio struct pointer *after
    dropping its reference* based on the remaining number of references.

    These two paths could race and result in the aio bio completion path
    dereferencing a freed dio, though this was not observed in the wild.

    This moves the refcount under the bio lock so that bio completion can drop
    its reference and decide to wake all in one atomic step.

    Once testing and waking is locked dio_await_one() can test its sleeping
    condition and mark itself uninterruptible under the lock.  It gets simpler
    and wait_for_more_bios() disappears.

    The addition of the interrupt masking spin lock acquiry in dio_bio_submit()
    looks alarming.  This lock acquiry existed in that path before the recent
    dio completion patch set.  We shouldn't expect significant performance
    regression from returning to the behaviour that existed before the
    completion clean up work.

    This passed 4k block ext3 O_DIRECT fsx and aio-stress on an SMP machine.

    Signed-off-by: Zach Brown <zach.brown@oracle.com>
    Cc: Badari Pulavarty <pbadari@us.ibm.com>
    Cc: Suparna Bhattacharya <suparna@in.ibm.com>
    Cc: Jeff Moyer <jmoyer@redhat.com>
    Cc: <xfs-masters@oss.sgi.com>
    Signed-off-by: Andrew Morton <akpm@osdl.org>
    Signed-off-by: Linus Torvalds <torvalds@osdl.org>

I also backported a fix to the fix, which removed a bogus BUG_ON.  Now,
the race Zach mentioned was not reported in the wild, however, there's a
different race which can result in the waiter never being woken up (as
described by the reporter):

The following picture illustrates a race scenario which appears to lead to the
problem ...

  dio_await_one() on CPU X             |  dio_bio_end_aio() on CPU Y
---------------------------------------+---------------------------------------
                                       |
  spin_lock_irqsave                    |
    (&dio->bio_lock, flags);           |
                                       |
  Note: dio->refcount still > 1        |  Note: dio->waiter still NULL
                                       |
  if (wait_for_more_bios(dio)) {       |  waiter_holds_ref = !!dio->waiter;
                                       |
      dio->waiter = current;           |  remaining = atomic_sub_return(1,
                                       |                 (&dio->refcount));
      spin_unlock_irqrestore           |
        (&dio->bio_lock, flags);       |  Note: remaining is 1 but
                                       |        waiter_holds_ref is NULL
      io_schedule();                   |
      ^^^^^^^^^^^                      |  if (remaining == 1 &&
      wait indefinitely                |      waiter_holds_ref)
                                       |      wake_up_process(dio->waiter);
                                       |      ^^^^^^^^^^^^^^^
                                       |      not called

dio->refcount is incremented by dio_bio_submit() before the I/O is issued and
it is decremented by dio_bio_end_aio() to indicate that the I/O has completed.
However, in case of the race shown above, dio_bio_end_aio() is not aware that
a process is about to wait for the I/O that is just being completed. So, the
process is not awakened and waits indefinitely on an already completed I/O.

The symptoms in the vmcore suggest that the race has actually occurred:

 -  The process is blocked even though dio->refcount is 1 (no ongoing I/O).

 -  None of the allocated entries in the "bio" slab cache is pointing to the
    dio structure of the blocked process. If an I/O was ongoing, one would
    expect to find a bio->bi_private pointing to the dio.

A kernel built with the attached patch was sent to the customer for
testing, and it fixed the problem in their environment.  I also ran the
resulting kernel through the aio-dio-regress test suite and it passed
all of the tests.

Comments welcome.

Cheers,

Jeff

diff --git a/fs/direct-io.c b/fs/direct-io.c
index 59d889c..da7a1d6 100644
--- a/fs/direct-io.c
+++ b/fs/direct-io.c
@@ -120,8 +120,8 @@ struct dio {
 	int page_errors;		/* errno from get_user_pages() */
 
 	/* BIO completion state */
-	atomic_t refcount;		/* direct_io_worker() and bios */
 	spinlock_t bio_lock;		/* protects BIO fields below */
+	unsigned long refcount;		/* direct_io_worker() and bios */
 	struct bio *bio_list;		/* singly linked via bi_private */
 	struct task_struct *waiter;	/* waiting task (NULL if none) */
 
@@ -266,8 +266,8 @@ static int dio_bio_complete(struct dio *dio, struct bio *bio);
 static int dio_bio_end_aio(struct bio *bio, unsigned int bytes_done, int error)
 {
 	struct dio *dio = bio->bi_private;
-	int waiter_holds_ref = 0;
-	int remaining;
+	unsigned long remaining;
+	unsigned long flags;
 
 	if (bio->bi_size)
 		return 1;
@@ -275,10 +275,11 @@ static int dio_bio_end_aio(struct bio *bio, unsigned int bytes_done, int error)
 	/* cleanup the bio */
 	dio_bio_complete(dio, bio);
 
-	waiter_holds_ref = !!dio->waiter;
-	remaining = atomic_sub_return(1, (&dio->refcount));
-	if (remaining == 1 && waiter_holds_ref)
+	spin_lock_irqsave(&dio->bio_lock, flags);
+	remaining = --dio->refcount;
+	if (remaining == 1 && dio->waiter)
 		wake_up_process(dio->waiter);
+	spin_unlock_irqrestore(&dio->bio_lock, flags);
 
 	if (remaining == 0) {
 		int ret = dio_complete(dio, dio->iocb->ki_pos, 0);
@@ -307,7 +308,7 @@ static int dio_bio_end_io(struct bio *bio, unsigned int bytes_done, int error)
 	spin_lock_irqsave(&dio->bio_lock, flags);
 	bio->bi_private = dio->bio_list;
 	dio->bio_list = bio;
-	if ((atomic_sub_return(1, &dio->refcount) == 1) && dio->waiter)
+	if (--dio->refcount == 1 && dio->waiter)
 		wake_up_process(dio->waiter);
 	spin_unlock_irqrestore(&dio->bio_lock, flags);
 	return 0;
@@ -344,11 +345,17 @@ dio_bio_alloc(struct dio *dio, struct block_device *bdev,
 static void dio_bio_submit(struct dio *dio)
 {
 	struct bio *bio = dio->bio;
+	unsigned long flags;
 
 	bio->bi_private = dio;
-	atomic_inc(&dio->refcount);
+
+	spin_lock_irqsave(&dio->bio_lock, flags);
+	dio->refcount++;
+	spin_unlock_irqrestore(&dio->bio_lock, flags);
+
 	if (dio->is_async && dio->rw == READ)
 		bio_set_pages_dirty(bio);
+
 	submit_bio(dio->rw, bio);
 
 	dio->bio = NULL;
@@ -364,13 +371,6 @@ static void dio_cleanup(struct dio *dio)
 		page_cache_release(dio_get_page(dio));
 }
 
-static int wait_for_more_bios(struct dio *dio)
-{
-	assert_spin_locked(&dio->bio_lock);
-
-	return (atomic_read(&dio->refcount) > 1) && (dio->bio_list == NULL);
-}
-
 /*
  * Wait for the next BIO to complete.  Remove it and return it.  NULL is
  * returned once all BIOs have been completed.  This must only be called once
@@ -383,16 +383,20 @@ static struct bio *dio_await_one(struct dio *dio)
 	struct bio *bio = NULL;
 
 	spin_lock_irqsave(&dio->bio_lock, flags);
-	while (wait_for_more_bios(dio)) {
-		set_current_state(TASK_UNINTERRUPTIBLE);
-		if (wait_for_more_bios(dio)) {
-			dio->waiter = current;
-			spin_unlock_irqrestore(&dio->bio_lock, flags);
-			io_schedule();
-			spin_lock_irqsave(&dio->bio_lock, flags);
-			dio->waiter = NULL;
-		}
-		set_current_state(TASK_RUNNING);
+	/*
+	 * Wait as long as the list is empty and there are bios in flight.  bio
+	 * completion drops the count, maybe adds to the list, and wakes while
+	 * holding the bio_lock so we don't need set_current_state()'s barrier
+	 * and can call it after testing our condition.
+	 */
+	while (dio->refcount > 1 && dio->bio_list == NULL) {
+		__set_current_state(TASK_UNINTERRUPTIBLE);
+		dio->waiter = current;
+		spin_unlock_irqrestore(&dio->bio_lock, flags);
+		io_schedule();
+		/* wake up sets us TASK_RUNNING */
+		spin_lock_irqsave(&dio->bio_lock, flags);
+		dio->waiter = NULL;
 	}
 	if (dio->bio_list) {
 		bio = dio->bio_list;
@@ -943,6 +947,7 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
 	struct dio *dio)
 {
 	unsigned long user_addr; 
+	unsigned long flags;
 	int seg;
 	ssize_t ret = 0;
 	ssize_t ret2;
@@ -961,8 +966,8 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
 	dio->iocb = iocb;
 	dio->i_size = i_size_read(inode);
 
-	atomic_set(&dio->refcount, 1);
 	spin_lock_init(&dio->bio_lock);
+	dio->refcount = 1;
 
 	/*
 	 * In case of non-aligned buffers, we may need 2 more
@@ -1068,12 +1073,20 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
 
 	/*
 	 * Sync will always be dropping the final ref and completing the
-	 * operation.  AIO can if it was a broken operation described above
-	 * or in fact if all the bios race to complete before we get here.
-	 * In that case dio_complete() translates the EIOCBQUEUED into
-	 * the proper return code that the caller will hand to aio_complete().
+	 * operation.  AIO can if it was a broken operation described above or
+	 * in fact if all the bios race to complete before we get here.  In
+	 * that case dio_complete() translates the EIOCBQUEUED into the proper
+	 * return code that the caller will hand to aio_complete().
+	 *
+	 * This is managed by the bio_lock instead of being an atomic_t so that
+	 * completion paths can drop their ref and use the remaining count to
+	 * decide to wake the submission path atomically.
 	 */
-	if (atomic_dec_and_test(&dio->refcount)) {
+	spin_lock_irqsave(&dio->bio_lock, flags);
+	ret2 = --dio->refcount;
+	spin_unlock_irqrestore(&dio->bio_lock, flags);
+
+	if (ret2 == 0) {
 		ret = dio_complete(dio, offset, ret);
 		kfree(dio);
 	} else