Sophie

Sophie

distrib > Scientific%20Linux > 5x > x86_64 > by-pkgid > 27922b4260f65d317aabda37e42bbbff > files > 678

kernel-2.6.18-238.el5.src.rpm

From: David Teigland <teigland@redhat.com>
Date: Thu, 5 Jun 2008 14:10:33 -0500
Subject: [dlm] validate messages before processing
Message-id: 20080605191033.GI18635@redhat.com
O-Subject: [RHEL5.3 PATCH 09/18] dlm: validate messages before processing
Bugzilla: 450132
RH-Acked-by: Bob Peterson <rpeterso@redhat.com>

bz 450132  dlm: fixes for recovery of user lockspace

Fix bugs when userland apps using the dlm join/leave the lockspace,
causing recovery.

brew build including this patch
http://brewweb.devel.redhat.com/brew/taskinfo?taskID=1344633

upstream commit:

>From c54e04b00fe027da30ada5af76b6749772dd644a Mon Sep 17 00:00:00 2001
>From: David Teigland <teigland@redhat.com>
>Date: Wed, 9 Jan 2008 09:59:41 -0600
>Subject: [PATCH] dlm: validate messages before processing

There was some hit and miss validation of messages that has now been
cleaned up and unified.  Before processing a message, the new
validate_message() function checks that the lkb is the appropriate type,
process-copy or master-copy, and that the message is from the correct
nodeid for the the given lkb.  Other checks and assertions on the
lkb type and nodeid have been removed.  The assertions were particularly
bad since they would panic the machine instead of just ignoring the bad
message.

Although other recent patches have made processing old message unlikely,
it still may be possible for an old message to be processed and caught
by these checks.

Signed-off-by: David Teigland <teigland@redhat.com>

diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index c95101e..40605a2 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -3009,8 +3009,6 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
 	lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
 	lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
 
-	DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
-
 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
 		/* lkb was just created so there won't be an lvb yet */
 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
@@ -3024,16 +3022,6 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
 				struct dlm_message *ms)
 {
-	if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
-		log_error(ls, "convert_args nodeid %d %d lkid %x %x",
-			  lkb->lkb_nodeid, ms->m_header.h_nodeid,
-			  lkb->lkb_id, lkb->lkb_remid);
-		return -EINVAL;
-	}
-
-	if (!is_master_copy(lkb))
-		return -EINVAL;
-
 	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
 		return -EBUSY;
 
@@ -3049,8 +3037,6 @@ static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
 			       struct dlm_message *ms)
 {
-	if (!is_master_copy(lkb))
-		return -EINVAL;
 	if (receive_lvb(ls, lkb, ms))
 		return -ENOMEM;
 	return 0;
@@ -3066,6 +3052,50 @@ static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
 	lkb->lkb_remid = ms->m_lkid;
 }
 
+/* This is called after the rsb is locked so that we can safely inspect
+   fields in the lkb. */
+
+static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
+{
+	int from = ms->m_header.h_nodeid;
+	int error = 0;
+
+	switch (ms->m_type) {
+	case DLM_MSG_CONVERT:
+	case DLM_MSG_UNLOCK:
+	case DLM_MSG_CANCEL:
+		if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
+			error = -EINVAL;
+		break;
+
+	case DLM_MSG_CONVERT_REPLY:
+	case DLM_MSG_UNLOCK_REPLY:
+	case DLM_MSG_CANCEL_REPLY:
+	case DLM_MSG_GRANT:
+	case DLM_MSG_BAST:
+		if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
+			error = -EINVAL;
+		break;
+
+	case DLM_MSG_REQUEST_REPLY:
+		if (!is_process_copy(lkb))
+			error = -EINVAL;
+		else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
+			error = -EINVAL;
+		break;
+
+	default:
+		error = -EINVAL;
+	}
+
+	if (error)
+		log_error(lkb->lkb_resource->res_ls,
+			  "ignore invalid message %d from %d %x %x %x %d",
+			  ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
+			  lkb->lkb_flags, lkb->lkb_nodeid);
+	return error;
+}
+
 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
 {
 	struct dlm_lkb *lkb;
@@ -3127,17 +3157,21 @@ static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
 	hold_rsb(r);
 	lock_rsb(r);
 
+	error = validate_message(lkb, ms);
+	if (error)
+		goto out;
+
 	receive_flags(lkb, ms);
 	error = receive_convert_args(ls, lkb, ms);
 	if (error)
-		goto out;
+		goto out_reply;
 	reply = !down_conversion(lkb);
 
 	error = do_convert(r, lkb);
- out:
+ out_reply:
 	if (reply)
 		send_convert_reply(r, lkb, error);
-
+ out:
 	unlock_rsb(r);
 	put_rsb(r);
 	dlm_put_lkb(lkb);
@@ -3163,15 +3197,19 @@ static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
 	hold_rsb(r);
 	lock_rsb(r);
 
+	error = validate_message(lkb, ms);
+	if (error)
+		goto out;
+
 	receive_flags(lkb, ms);
 	error = receive_unlock_args(ls, lkb, ms);
 	if (error)
-		goto out;
+		goto out_reply;
 
 	error = do_unlock(r, lkb);
- out:
+ out_reply:
 	send_unlock_reply(r, lkb, error);
-
+ out:
 	unlock_rsb(r);
 	put_rsb(r);
 	dlm_put_lkb(lkb);
@@ -3199,9 +3237,13 @@ static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
 	hold_rsb(r);
 	lock_rsb(r);
 
+	error = validate_message(lkb, ms);
+	if (error)
+		goto out;
+
 	error = do_cancel(r, lkb);
 	send_cancel_reply(r, lkb, error);
-
+ out:
 	unlock_rsb(r);
 	put_rsb(r);
 	dlm_put_lkb(lkb);
@@ -3220,22 +3262,26 @@ static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
 
 	error = find_lkb(ls, ms->m_remid, &lkb);
 	if (error) {
-		log_error(ls, "receive_grant no lkb");
+		log_debug(ls, "receive_grant from %d no lkb %x",
+			  ms->m_header.h_nodeid, ms->m_remid);
 		return;
 	}
-	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
 
 	r = lkb->lkb_resource;
 
 	hold_rsb(r);
 	lock_rsb(r);
 
+	error = validate_message(lkb, ms);
+	if (error)
+		goto out;
+
 	receive_flags_reply(lkb, ms);
 	if (is_altmode(lkb))
 		munge_altmode(lkb, ms);
 	grant_lock_pc(r, lkb, ms);
 	queue_cast(r, lkb, 0);
-
+ out:
 	unlock_rsb(r);
 	put_rsb(r);
 	dlm_put_lkb(lkb);
@@ -3249,18 +3295,22 @@ static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
 
 	error = find_lkb(ls, ms->m_remid, &lkb);
 	if (error) {
-		log_error(ls, "receive_bast no lkb");
+		log_debug(ls, "receive_bast from %d no lkb %x",
+			  ms->m_header.h_nodeid, ms->m_remid);
 		return;
 	}
-	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
 
 	r = lkb->lkb_resource;
 
 	hold_rsb(r);
 	lock_rsb(r);
 
-	queue_bast(r, lkb, ms->m_bastmode);
+	error = validate_message(lkb, ms);
+	if (error)
+		goto out;
 
+	queue_bast(r, lkb, ms->m_bastmode);
+ out:
 	unlock_rsb(r);
 	put_rsb(r);
 	dlm_put_lkb(lkb);
@@ -3326,15 +3376,19 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
 
 	error = find_lkb(ls, ms->m_remid, &lkb);
 	if (error) {
-		log_error(ls, "receive_request_reply no lkb");
+		log_debug(ls, "receive_request_reply from %d no lkb %x",
+			  ms->m_header.h_nodeid, ms->m_remid);
 		return;
 	}
-	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
 
 	r = lkb->lkb_resource;
 	hold_rsb(r);
 	lock_rsb(r);
 
+	error = validate_message(lkb, ms);
+	if (error)
+		goto out;
+
 	mstype = lkb->lkb_wait_type;
 	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
 	if (error)
@@ -3467,6 +3521,10 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
 	hold_rsb(r);
 	lock_rsb(r);
 
+	error = validate_message(lkb, ms);
+	if (error)
+		goto out;
+
 	/* stub reply can happen with waiters_mutex held */
 	error = remove_from_waiters_ms(lkb, ms);
 	if (error)
@@ -3485,10 +3543,10 @@ static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
 
 	error = find_lkb(ls, ms->m_remid, &lkb);
 	if (error) {
-		log_error(ls, "receive_convert_reply no lkb");
+		log_debug(ls, "receive_convert_reply from %d no lkb %x",
+			  ms->m_header.h_nodeid, ms->m_remid);
 		return;
 	}
-	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
 
 	_receive_convert_reply(lkb, ms);
 	dlm_put_lkb(lkb);
@@ -3502,6 +3560,10 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
 	hold_rsb(r);
 	lock_rsb(r);
 
+	error = validate_message(lkb, ms);
+	if (error)
+		goto out;
+
 	/* stub reply can happen with waiters_mutex held */
 	error = remove_from_waiters_ms(lkb, ms);
 	if (error)
@@ -3533,10 +3595,10 @@ static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
 
 	error = find_lkb(ls, ms->m_remid, &lkb);
 	if (error) {
-		log_error(ls, "receive_unlock_reply no lkb");
+		log_debug(ls, "receive_unlock_reply from %d no lkb %x",
+			  ms->m_header.h_nodeid, ms->m_remid);
 		return;
 	}
-	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
 
 	_receive_unlock_reply(lkb, ms);
 	dlm_put_lkb(lkb);
@@ -3550,6 +3612,10 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
 	hold_rsb(r);
 	lock_rsb(r);
 
+	error = validate_message(lkb, ms);
+	if (error)
+		goto out;
+
 	/* stub reply can happen with waiters_mutex held */
 	error = remove_from_waiters_ms(lkb, ms);
 	if (error)
@@ -3581,10 +3647,10 @@ static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
 
 	error = find_lkb(ls, ms->m_remid, &lkb);
 	if (error) {
-		log_error(ls, "receive_cancel_reply no lkb");
+		log_debug(ls, "receive_cancel_reply from %d no lkb %x",
+			  ms->m_header.h_nodeid, ms->m_remid);
 		return;
 	}
-	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
 
 	_receive_cancel_reply(lkb, ms);
 	dlm_put_lkb(lkb);
@@ -3817,6 +3883,7 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
 		ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
 		ls->ls_stub_ms.m_result = -EINPROGRESS;
 		ls->ls_stub_ms.m_flags = lkb->lkb_flags;
+		ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
 		_receive_convert_reply(lkb, &ls->ls_stub_ms);
 
 		/* Same special case as in receive_rcom_lock_args() */
@@ -3918,6 +3985,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
 			ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
 			ls->ls_stub_ms.m_result = stub_unlock_result;
 			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
+			ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
 			_receive_unlock_reply(lkb, &ls->ls_stub_ms);
 			dlm_put_lkb(lkb);
 			break;
@@ -3927,6 +3995,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
 			ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
 			ls->ls_stub_ms.m_result = stub_cancel_result;
 			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
+			ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
 			_receive_cancel_reply(lkb, &ls->ls_stub_ms);
 			dlm_put_lkb(lkb);
 			break;