#!/usr/bin/env python2.5 """ A test OpenID consumer and provider based on JanRain's Python OpenID library, version 2.1.1, from http://www.openidenabled.com/python-openid/. See http://code.google.com/p/openid-test/ and http://code.google.com/p/openid-test/wiki/TestSpec. Responds to the following URLs: http://janrain-python-test.openid.net//identity/will-sign http://janrain-python-test.openid.net//identity/will-setup-then-sign http://janrain-python-test.openid.net//identity/will-setup-then-cancel http://janrain-python-test.openid.net//rp where is "1.1" or "2.0" or "1.1,2.0". Make sure your realm has "-test" or "openid-test" in its path. For more about OpenID, see: http://openid.net/ http://openid.net/about.bml Copyright 2008 Ryan Barrett """ __author__ = 'Ryan Barrett ' import cgitb; cgitb.enable() import BaseHTTPServer import cgi import datetime import logging import os import pprint import re import sys import traceback import urllib import urlparse import wsgiref.handlers import wsgiref.util from openid.server.server import Server as OpenIDServer from openid.store.filestore import FileOpenIDStore from openid.consumer import discover from openid.consumer.consumer import Consumer as OpenIDConsumer logging.getLogger().addHandler(logging.StreamHandler()) logging.getLogger().setLevel(logging.DEBUG) # supported openid protocol versions PROTOCOL_VERSIONS = ('1.1', '2.0', '1.1,2.0') # trust root (realm) to send to provider when acting as consumer TRUST_ROOT = 'http://' + os.environ['SERVER_NAME'] if os.environ['SERVER_PORT'] != '80': TRUST_ROOT += ':' + os.environ['SERVER_PORT'] # base url, usually the URI path to this script with a trailing / if 'SCRIPT_NAME' in os.environ: BASE_URL = TRUST_ROOT + os.environ['SCRIPT_NAME'] + '/' else: # default BASE_URL = TRUST_ROOT + '/sandbox/cgi-bin/janrain-python.cgi/' # regex used to parse URL paths PATH_REGEX = re.compile( """(caps | (%s)/ (identity/will-sign | identity/will-setup-then-sign | identity/will-setup-then-cancel | rp | rp\.return_to) )$ """ % '|'.join(PROTOCOL_VERSIONS), re.VERBOSE) # HTML templates DOCTYPE = """ """ HEADER = """ """ % BASE_URL FOOTER = '' ERROR = """

Error: %(message)s

Request: %(request)s

Form: %(form)s

""" ENDPOINT = """

Hi! This is a test OpenID endpoint. You can use this URL as an OpenID identifier in a OpenID consumer site:

%(uri)s

""" CONSUMER_FORM = """
Operation:


Association mode:



""" OK = 'OK\n' DISCOVERY = """ user_specified_url: %(claimed_id)s openid_provider: %(openid_provider)s local_id: %(local_id)s """ ASSOCIATION = """ handle: %(handle)s type: %(type)s """ FRONT_PAGE = """ Welcome to the JanRain Python library's OpenID interopability test server. I'm part of the openid-test project.
  • Relying party (consumer) URLs:
      %s
  • Identity URLs which return signatures right away:
      %s
  • Identity URLs which fail, but after setup succeed:
      %s
  • Identity URLs which fail, and then after setup do openid.mode=cancel:
      %s
""" % tuple( ['\n'.join([template % (ver, ver) for ver in PROTOCOL_VERSIONS]) for template in [ '
  • %s/rp
  • ', '
  • %s/identity/will-sign
  • ', '
  • %s/identity/will-setup-then-sign
  • ', '
  • %s/identity/will-setup-then-cancel
  • ', ]]) CAPS = """ # Based on JanRain's Python OpenID library, version 2.1.1. # See http://www.openidenabled.com/python-openid/ #openid1.1 openid2.0 # xri """ class Handler: """Base request handler class.""" def __init__(self, environ, start_response): self.environ = environ self.uri = wsgiref.util.request_uri(self.environ, include_query=False) self.request = wsgiref.util.request_uri(self.environ, include_query=True) self.start_response = start_response self.output = [] def arg_dict(self): """Flattens the URL parameters and form data into a dictionary. Only includes the first value for each parameter. """ form = cgi.FieldStorage() return dict([(arg, form.getfirst(arg)) for arg in form]) def get_openid_request(self): """Creates an OpenIDRequest for this request, if appropriate. If this request is not an OpenID request, returns False. If an error occurs while parsing the arguments, renders an error page and returns None. (Ugh. I know.) """ try: oidrequest = self.openid_server.decodeRequest(self.arg_dict()) if oidrequest: logging.debug('Received OpenID request: %s' % oidrequest) else: return False return oidrequest except: trace = ''.join(traceback.format_exception(*sys.exc_info())) self.render(ERROR, 'Error parsing OpenID request:\n%s' % trace, '400 Error') return None def respond(self, oidresponse): """Send an OpenID response. """ logging.debug('Sending OpenID response: %s' % oidresponse) encoded_response = self.openid_server.encodeResponse(oidresponse) # build the HTTP status code string message = BaseHTTPServer.BaseHTTPRequestHandler.responses.get( encoded_response.code, '') status_str = '%s %s' % (encoded_response.code, message) headers = encoded_response.headers.items() headers.append(('Content-Type', 'text/plain')) self.start_response(status_str, headers) logging.debug('Sending encoded response: %s' % encoded_response) if encoded_response.body: logging.debug('Sending response body: %s' % encoded_response.body) self.output = [encoded_response.body] def render(self, template, message='', status_code='200 OK', content_type='text/html', data={}): """Render the given template, including the extra (optional) values. """ self.start_response(status_code, [('Content-Type', content_type)]) form = cgi.FieldStorage() data_dict = { 'uri': self.uri, 'request': self.request, 'form': pprint.pformat(form), 'message': message, } data_dict.update(self.arg_dict()) data_dict.update(data) if content_type == 'text/html': self.output = [DOCTYPE, HEADER % data_dict, template % data_dict, FOOTER] else: self.output = [template % data_dict] # work around a bug in wsgiref.handlers that makes it not support unicode. self.output = [str(part) for part in self.output] def get(self): self.render(ENDPOINT, 'Welcome!') class Error(Handler): """Displays an error.""" def __init__(self, environ, start_response): Handler.__init__(self, environ, start_response) def get(self): self.render(ERROR, '404 Not found', '404 Not found') post = get class FrontPage(Handler): """Displays the front page with all of the supported links.""" def get(self): self.render(FRONT_PAGE) class Capabilities(Handler): """Displays the capabilities page.""" def get(self): self.render(CAPS, content_type='text/plain') class Provider(Handler): """The base OpenID provider request handler class.""" def __init__(self, environ, start_response, checkid_immediate=True, checkid_setup=True): """The keyword argument values determine how we respond to each mode.""" Handler.__init__(self, environ, start_response) self.checkid_immediate = checkid_immediate self.checkid_setup = checkid_setup self.openid_server = OpenIDServer(FileOpenIDStore('.'), self.uri) def post(self): """Handles associate and check_authentication requests.""" oidrequest = self.get_openid_request() if oidrequest: if oidrequest.mode in ('associate', 'check_authentication'): self.respond(self.openid_server.handleRequest(oidrequest)) else: self.render(ERROR, 'Expected an OpenID request', '404 Not found') def get(self): """Handles checkid_* requests.""" oidrequest = self.get_openid_request() if oidrequest and oidrequest.mode in ('checkid_immediate', 'checkid_setup'): self.respond(oidrequest.answer(getattr(self, oidrequest.mode))) elif oidrequest is False: Handler.get(self) class Sign(Provider): """Signs a requests and verifies the signature.""" def __init__(self, environ, start_response): """The keyword argument values determine how we respond to each mode.""" Provider.__init__(self, environ, start_response, checkid_immediate=False, checkid_setup=True) class SetupThenSign(Sign): """Does setup for a new request, then signs and verifies a signature.""" def __init__(self, environ, start_response): """The keyword argument values determine how we respond to each mode.""" Provider.__init__(self, environ, start_response, checkid_immediate=False, checkid_setup=True) class SetupThenCancel(Sign): """Does setup for a new request, then signs it and sends a cancel back.""" def __init__(self, environ, start_response): """The keyword argument values determine how we respond to each mode.""" Provider.__init__(self, environ, start_response, checkid_immediate=False, checkid_setup=False) class Consumer(Handler): """The base consumer handler.""" def __init__(self, *args): Handler.__init__(self, *args) # set up the association store based on the assoc_mode query parameter store = None assoc_mode = self.arg_dict().get('assoc_mode') if assoc_mode: if assoc_mode == 'stateful': store = FileOpenIDStore('.') elif assoc_mode == 'stateful_new': # TODO(ryanb) store = FileOpenIDStore('.') elif assoc_mode == 'stateless': store = None elif assoc_mode == 'use_handle': # TODO(ryanb) pass else: assert False, 'bad assoc_mode value: %s' % assoc_mode # note that we don't support persistent session data self.openid_consumer = OpenIDConsumer({}, store) def get(self): """Handles consumer GET requests.""" args = self.arg_dict() if 'openid.mode' in args: # this is a redirect from a provider. attempt to verify the login. response = self.openid_consumer.complete(args, self.uri) if response.status == 'success': self.render(OK, content_type='text/plain') else: self.render(ERROR, 'Provider responded: %s' % response.status) elif 'openid_identifier' in args: # start with discovery auth_request = self.openid_consumer.begin(args['openid_identifier']) # this is a form post. what to do...? if args.get('op') == 'disco': # return the discovery details claimed_id = auth_request.endpoint.claimed_id local_id = auth_request.endpoint.local_id if local_id == claimed_id: local_id = '' discovered = {'claimed_id': claimed_id, 'openid_provider': auth_request.endpoint.server_url, 'local_id': local_id} self.render(DISCOVERY, content_type='text/plain', data=discovered) return elif args.get('op') == 'assoc': # return the association we would have used if auth_request.assoc: assoc = {'handle': auth_request.assoc.handle, 'type': auth_request.assoc.assoc_type } else: assoc = {'handle': '', 'type': ''} self.render(ASSOCIATION, content_type='text/plain', data=assoc) else: # start the flow! return_to = self.uri + '.return_to' redirect_to = auth_request.redirectURL(TRUST_ROOT, return_to) self.start_response('302 Found', [('Location', redirect_to)]) else: # this is an initial GET. show the form. self.render(CONSUMER_FORM, 'unused') return def app(environ, start_response): """Pass the request to the right handler.""" handlers = {'caps': Capabilities, 'identity/will-sign': Sign, 'identity/will-setup-then-sign': SetupThenSign, 'identity/will-setup-then-cancel': SetupThenCancel, 'rp': Consumer, 'rp.return_to': Consumer, } uri = wsgiref.util.request_uri(environ, include_query=False) match = PATH_REGEX.search(uri) method = environ['REQUEST_METHOD'] # default to not found handler_cls = FrontPage if match and method in ('GET', 'POST'): # TODO(ryanb); ugh, this is awful. make this not awful. caps, openid_version, action = match.groups() if caps == 'caps': action = 'caps' if action in handlers: handler_cls = handlers[action] handler = handler_cls(environ, start_response) getattr(handler, method.lower())() return handler.output def main(argv): """FUCKING GET ME A TOWEL!!!""" wsgiref.handlers.CGIHandler().run(app) if __name__ == '__main__': main(sys.argv)