Skip to content

Token scope permission class #721

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 12, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rest_framework/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,9 +453,11 @@ def apply_markdown(text):
from provider.oauth2 import backends as oauth2_provider_backends
from provider.oauth2 import models as oauth2_provider_models
from provider.oauth2 import forms as oauth2_provider_forms
from provider import scope as oauth2_provider_scope

except ImportError:
oauth2_provider = None
oauth2_provider_backends = None
oauth2_provider_models = None
oauth2_provider_forms = None
oauth2_provider_scope = None
32 changes: 32 additions & 0 deletions rest_framework/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

SAFE_METHODS = ['GET', 'HEAD', 'OPTIONS']

from rest_framework.compat import oauth2_provider_scope


class BasePermission(object):
"""
Expand Down Expand Up @@ -125,3 +127,33 @@ def has_permission(self, request, view):
request.user.has_perms(perms)):
return True
return False


class TokenHasReadWriteScope(BasePermission):
"""
The request is authenticated as a user and the token used has the right scope
"""

def has_permission(self, request, view):
if not request.auth:
return False

read_only = request.method in SAFE_METHODS
if hasattr(request.auth, 'resource'): # oauth 1
if read_only:
return True
elif request.auth.resource.is_readonly is False:
return True
return False
elif hasattr(request.auth, 'scope'): # oauth 2
scope_valid = lambda scope_wanted_key, scope_had: oauth2_provider_scope.check(
oauth2_provider_scope.SCOPE_NAME_DICT[scope_wanted_key], scope_had)

if read_only and scope_valid('read', request.auth.scope):
return True
elif scope_valid('write', request.auth.scope):
return True
return False
else:
# Improperly configured!
pass
65 changes: 63 additions & 2 deletions rest_framework/tests/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
)
from rest_framework.authtoken.models import Token
from rest_framework.compat import patterns, url, include
from rest_framework.compat import oauth2_provider, oauth2_provider_models
from rest_framework.compat import oauth2_provider, oauth2_provider_models, oauth2_provider_scope
from rest_framework.compat import oauth, oauth_provider
from rest_framework.tests.utils import RequestFactory
from rest_framework.views import APIView
Expand Down Expand Up @@ -47,13 +47,17 @@ def put(self, request):
(r'^basic/$', MockView.as_view(authentication_classes=[BasicAuthentication])),
(r'^token/$', MockView.as_view(authentication_classes=[TokenAuthentication])),
(r'^auth-token/$', 'rest_framework.authtoken.views.obtain_auth_token'),
(r'^oauth/$', MockView.as_view(authentication_classes=[OAuthAuthentication]))
(r'^oauth/$', MockView.as_view(authentication_classes=[OAuthAuthentication])),
(r'^oauth-with-scope/$', MockView.as_view(authentication_classes=[OAuthAuthentication],
permission_classes=[permissions.TokenHasReadWriteScope]))
)

if oauth2_provider is not None:
urlpatterns += patterns('',
url(r'^oauth2/', include('provider.oauth2.urls', namespace='oauth2')),
url(r'^oauth2-test/$', MockView.as_view(authentication_classes=[OAuth2Authentication])),
url(r'^oauth2-with-scope-test/$', MockView.as_view(authentication_classes=[OAuth2Authentication],
permission_classes=[permissions.TokenHasReadWriteScope])),
)


Expand Down Expand Up @@ -389,6 +393,39 @@ def test_post_hmac_sha1_signature_passes(self):
response = self.csrf_client.post('/oauth/', HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 200)

@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_get_form_with_readonly_resource_passing_auth(self):
"""Ensure POSTing with a readonly resource instead of a write scope fails"""
read_only_access_token = self.token
read_only_access_token.resource.is_readonly = True
read_only_access_token.resource.save()
params = self._create_authorization_url_parameters()
response = self.csrf_client.get('/oauth-with-scope/', params)
self.assertEqual(response.status_code, 200)

@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_post_form_with_readonly_resource_failing_auth(self):
"""Ensure POSTing with a readonly resource instead of a write scope fails"""
read_only_access_token = self.token
read_only_access_token.resource.is_readonly = True
read_only_access_token.resource.save()
params = self._create_authorization_url_parameters()
response = self.csrf_client.post('/oauth-with-scope/', params)
self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN))

@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_post_form_with_write_resource_passing_auth(self):
"""Ensure POSTing with a write resource succeed"""
read_write_access_token = self.token
read_write_access_token.resource.is_readonly = False
read_write_access_token.resource.save()
params = self._create_authorization_url_parameters()
response = self.csrf_client.post('/oauth-with-scope/', params)
self.assertEqual(response.status_code, 200)


class OAuth2Tests(TestCase):
"""OAuth 2.0 authentication"""
Expand Down Expand Up @@ -514,3 +551,27 @@ def test_post_form_with_expired_access_token_failing_auth(self):
response = self.csrf_client.post('/oauth2-test/', params, HTTP_AUTHORIZATION=auth)
self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN))
self.assertIn('Invalid token', response.content)

@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_post_form_with_invalid_scope_failing_auth(self):
"""Ensure POSTing with a readonly scope instead of a write scope fails"""
read_only_access_token = self.access_token
read_only_access_token.scope = oauth2_provider_scope.SCOPE_NAME_DICT['read']
read_only_access_token.save()
auth = self._create_authorization_header(token=read_only_access_token.token)
params = self._client_credentials_params()
response = self.csrf_client.get('/oauth2-with-scope-test/', params, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 200)
response = self.csrf_client.post('/oauth2-with-scope-test/', params, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)

@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_post_form_with_valid_scope_passing_auth(self):
"""Ensure POSTing with a write scope succeed"""
read_write_access_token = self.access_token
read_write_access_token.scope = oauth2_provider_scope.SCOPE_NAME_DICT['write']
read_write_access_token.save()
auth = self._create_authorization_header(token=read_write_access_token.token)
params = self._client_credentials_params()
response = self.csrf_client.post('/oauth2-with-scope-test/', params, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 200)