Sophie

Sophie

distrib > Mageia > 6 > i586 > media > core-release > by-pkgid > 248e2f92d9b832e75f95c6042e4252e2 > files > 2683

python-twisted-16.3.2-1.mga6.i586.rpm

#!/usr/bin/env python

# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Simple example of a db checker: define a L{ICredentialsChecker} implementation
that deals with a database backend to authenticate a user.
"""

from twisted.cred import error
from twisted.cred.credentials import IUsernameHashedPassword, IUsernamePassword
from twisted.cred.checkers import ICredentialsChecker
from twisted.internet.defer import Deferred

from zope.interface import implements


class DBCredentialsChecker(object):
    """
    This class checks the credentials of incoming connections
    against a user table in a database.
    """
    implements(ICredentialsChecker)

    def __init__(self, runQuery,
        query="SELECT username, password FROM user WHERE username = %s",
        customCheckFunc=None, caseSensitivePasswords=True):
        """
        @param runQuery: This will be called to get the info from the db.
            Generally you'd want to create a
            L{twisted.enterprice.adbapi.ConnectionPool} and pass it's runQuery
            method here. Otherwise pass a function with the same prototype.
        @type runQuery: C{callable}

        @type query: query used to authenticate user.
        @param query: C{str}

        @param customCheckFunc: Use this if the passwords in the db are stored
            as hashes. We'll just call this, so you can do the checking
            yourself. It takes the following params:
            (username, suppliedPass, dbPass) and must return a boolean.
        @type customCheckFunc: C{callable}

        @param caseSensitivePasswords: If true requires that every letter in
            C{credentials.password} is exactly the same case as the it's
            counterpart letter in the database.
            This is only relevant if C{customCheckFunc} is not used.
        @type caseSensitivePasswords: C{bool}
        """
        self.runQuery = runQuery
        self.caseSensitivePasswords = caseSensitivePasswords
        self.customCheckFunc = customCheckFunc
        # We can't support hashed password credentials if we only have a hash
        # in the DB
        if customCheckFunc:
            self.credentialInterfaces = (IUsernamePassword,)
        else:
            self.credentialInterfaces = (
                IUsernamePassword, IUsernameHashedPassword,)

        self.sql = query

    def requestAvatarId(self, credentials):
        """
        Authenticates the kiosk against the database.
        """
        # Check that the credentials instance implements at least one of our
        # interfaces
        for interface in self.credentialInterfaces:
            if interface.providedBy(credentials):
                break
        else:
            raise error.UnhandledCredentials()
        # Ask the database for the username and password
        dbDeferred = self.runQuery(self.sql, (credentials.username,))
        # Setup our deferred result
        deferred = Deferred()
        dbDeferred.addCallbacks(self._cbAuthenticate, self._ebAuthenticate,
                callbackArgs=(credentials, deferred),
                errbackArgs=(credentials, deferred))
        return deferred

    def _cbAuthenticate(self, result, credentials, deferred):
        """
        Checks to see if authentication was good. Called once the info has
        been retrieved from the DB.
        """
        if len(result) == 0:
            # Username not found in db
            deferred.errback(error.UnauthorizedLogin('Username unknown'))
        else:
            username, password = result[0]
            if self.customCheckFunc:
                # Let the owner do the checking
                if self.customCheckFunc(
                        username, credentials.password, password):
                    deferred.callback(credentials.username)
                else:
                    deferred.errback(
                        error.UnauthorizedLogin('Password mismatch'))
            else:
                # It's up to us or the credentials object to do the checking
                # now
                if IUsernameHashedPassword.providedBy(credentials):
                    # Let the hashed password checker do the checking
                    if credentials.checkPassword(password):
                        deferred.callback(credentials.username)
                    else:
                        deferred.errback(
                            error.UnauthorizedLogin('Password mismatch'))
                elif IUsernamePassword.providedBy(credentials):
                    # Compare the passwords, deciging whether or not to use
                    # case sensitivity
                    if self.caseSensitivePasswords:
                        passOk = (
                            password.lower() == credentials.password.lower())
                    else:
                        passOk = password == credentials.password
                    # See if they match
                    if passOk:
                        deferred.callback(credentials.username)
                    else:
                        deferred.errback(
                            error.UnauthorizedLogin('Password mismatch'))
                else:
                    # OK, we don't know how to check this
                    deferred.errback(error.UnhandledCredentials())

    def _ebAuthenticate(self, message, credentials, deferred):
        """
        The database lookup failed for some reason.
        """
        deferred.errback(error.LoginFailed(message))


def main():
    """
    Run a simple echo pb server to test the checker. It defines a custom query
    for dealing with sqlite special quoting, but otherwise it's a
    straightforward use of the object.

    You can test it running C{pbechoclient.py}.
    """
    import sys
    from twisted.python import log
    log.startLogging(sys.stdout)
    import os
    if os.path.isfile('testcred'):
        os.remove('testcred')
    from twisted.enterprise import adbapi
    pool = adbapi.ConnectionPool('pysqlite2.dbapi2', 'testcred')
    # Create the table that will be used
    query1 = """CREATE TABLE user (
            username string,
            password string
        )"""
    # Insert a test user
    query2 = """INSERT INTO user VALUES ('guest', 'guest')"""
    def cb(res):
        pool.runQuery(query2)
    pool.runQuery(query1).addCallback(cb)

    checker = DBCredentialsChecker(pool.runQuery,
        query="SELECT username, password FROM user WHERE username = ?")
    from twisted.cred.portal import Portal

    import pbecho
    from twisted.spread import pb
    portal = Portal(pbecho.SimpleRealm())
    portal.registerChecker(checker)
    reactor.listenTCP(pb.portno, pb.PBServerFactory(portal))


if __name__ == "__main__":
    from twisted.internet import reactor
    reactor.callWhenRunning(main)
    reactor.run()