Skip to content

OAuth2 Scopes

Esmerald supports OAuth2 scopes, offering a detailed permission system that adheres to the OAuth2 standard. This feature integrates seamlessly into your OpenAPI application and its API documentation.

OAuth2 scopes are widely used by major providers such as Facebook, Google, GitHub, Microsoft, and Twitter. Whenever an application allows you to "log in with" these platforms, it leverages OAuth2 with scopes to define specific permissions.

In this guide, we’ll explore how to manage authentication and authorization using OAuth2 scopes in Esmerald.

Warning

This section delves into advanced concepts, so beginners may prefer to skip it for now.

While OAuth2 scopes aren't mandatory, they offer a structured way to handle permissions, seamlessly integrating with OpenAPI and API documentation. However, it’s crucial to enforce scopes or any other security measures directly in your code.

In many cases, OAuth2 scopes might be excessive. Still, if your application requires them or you're eager to learn, continue reading to explore their implementation and benefits.

OAuth2 Scopes and OpenAPI

OAuth2 defines "scopes" as a list of space-separated strings representing permissions.

In OpenAPI, you can define "security schemes" that use OAuth2 and declare scopes.

Each scope is a string (without spaces) used to specify security permissions, such as:

  • users:read or users:write
  • instagram_basic (used by Facebook/Instagram)
  • https://www.googleapis.com/auth/drive (used by Google)

Info

In OAuth2, a "scope" is simply a string representing a specific permission. The format, such as including characters like : or even being a URL, is entirely implementation-specific. From the OAuth2 perspective, these are treated as plain strings without inherent meaning outside their defined context.

Global View

We’ll quickly review the updates in the main OAuth2 with Password, Bearer with JWT tokens, now enhanced with OAuth2 scopes:

from datetime import datetime, timedelta, timezone
from typing import Dict, List

import jwt
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError

from esmerald import (
    Esmerald,
    Gateway,
    HTTPException,
    Inject,
    Injects,
    Security,
    get,
    post,
    status,
)
from esmerald.security.scopes import Scopes
from esmerald.params import Form
from esmerald.security.oauth2 import OAuth2PasswordBearer, OAuth2PasswordRequestForm


SECRET_KEY = "adec4de83525abdd446b258d0df8a3cc151ee65e95ae8b8ccf51b643df71afcf"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# Pasword context
password_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(
    tokenUrl="/token",
    scopes={"me": "Read information about the current user.", "items": "Read items."},
)

fake_users_db = {
    "janedoe": {
        "username": "janedoe",
        "full_name": "Jane Doe",
        "email": "janedoe@example.com",
        "hashed_password": "$2a$12$KplebFTPwFcgGQosJgI4De0PyB2AoRCSxasxHpFoYZPp6uQV/xLzm",
        "disabled": False,
    }
}


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: str | None = None
    scopes: List[str] = []


class User(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None
    disabled: bool | None = None


class UserDB(User):
    hashed_password: str


def verify_password(plain_password: str, hashed_password: str) -> bool:
    return password_context.verify(plain_password, hashed_password)


def get_password_hash(password: str) -> str:
    return password_context.hash(password)


def get_user(db: Dict[str, Dict[str, str]], username: str) -> User | None:
    user_dict = db.get(username)
    return User(**user_dict) if user_dict else None


def authenticate_user(fake_db, username: str, password: str) -> UserDB | None:
    user = get_user(fake_db, username)
    if user and verify_password(password, user.hashed_password):
        return user
    return None


def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


async def get_current_user(
    scopes: Scopes, token: str = Security(oauth2_scheme, scopes=["me"])
) -> User:
    if scopes.scopes:
        authenticate_value = f'Bearer scope="{scopes.scope_str}"'
    else:
        authenticate_value = "Bearer"

    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if not username:
            raise credentials_exception
        token_scopes = payload.get("scopes", [])
        token_data = TokenData(scopes=token_scopes, username=username)
    except (InvalidTokenError, ValidationError):
        raise credentials_exception

    user = get_user(fake_users_db, username)
    if not user or user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")

    for scope in scopes.scopes:
        if scope not in token_data.scopes:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Not enough permissions",
                headers={"WWW-Authenticate": authenticate_value},
            )
    return user


@post(
    "/token",
    dependencies={"form_data": Inject(OAuth2PasswordRequestForm)},
    security=[oauth2_scheme],
)
async def login(form_data: OAuth2PasswordRequestForm = Form()) -> Dict[str, str]:
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username, "scopes": form_data.scopes}, expires_delta=access_token_expires
    )
    return Token(access_token=access_token, token_type="bearer")


@get(
    "/users/me",
    dependencies={"current_user": Inject(get_current_user)},
    security=[oauth2_scheme],
)
async def me(
    current_user: User = Injects(),
) -> User:
    return current_user


@get(
    "/users/me/items",
    dependencies={"current_user": Inject(get_current_user)},
    security=[oauth2_scheme],
)
async def get_user_items(current_user: User = Injects()) -> List[Dict[str, str]]:
    return [{"item_id": "Foo", "owner": current_user.username}]


app = Esmerald(
    routes=[
        Gateway(handler=login),
        Gateway(handler=me),
        Gateway(handler=get_user_items),
    ],
)

Next, we’ll break these changes down step by step for better understanding.

OAuth2 Security Scheme

First, we declare the OAuth2 security scheme with two scopes: me and items.

The scopes parameter is a dict with each scope as a key and its description as the value:

from datetime import datetime, timedelta, timezone
from typing import Dict, List

import jwt
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError

from esmerald import (
    Esmerald,
    Gateway,
    HTTPException,
    Inject,
    Injects,
    Security,
    get,
    post,
    status,
)
from esmerald.security.scopes import Scopes
from esmerald.params import Form
from esmerald.security.oauth2 import OAuth2PasswordBearer, OAuth2PasswordRequestForm


SECRET_KEY = "adec4de83525abdd446b258d0df8a3cc151ee65e95ae8b8ccf51b643df71afcf"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# Pasword context
password_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(
    tokenUrl="/token",
    scopes={"me": "Read information about the current user.", "items": "Read items."},
)

fake_users_db = {
    "janedoe": {
        "username": "janedoe",
        "full_name": "Jane Doe",
        "email": "janedoe@example.com",
        "hashed_password": "$2a$12$KplebFTPwFcgGQosJgI4De0PyB2AoRCSxasxHpFoYZPp6uQV/xLzm",
        "disabled": False,
    }
}


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: str | None = None
    scopes: List[str] = []


class User(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None
    disabled: bool | None = None


class UserDB(User):
    hashed_password: str


def verify_password(plain_password: str, hashed_password: str) -> bool:
    return password_context.verify(plain_password, hashed_password)


def get_password_hash(password: str) -> str:
    return password_context.hash(password)


def get_user(db: Dict[str, Dict[str, str]], username: str) -> User | None:
    user_dict = db.get(username)
    return User(**user_dict) if user_dict else None


def authenticate_user(fake_db, username: str, password: str) -> UserDB | None:
    user = get_user(fake_db, username)
    if user and verify_password(password, user.hashed_password):
        return user
    return None


def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


async def get_current_user(
    scopes: Scopes, token: str = Security(oauth2_scheme, scopes=["me"])
) -> User:
    if scopes.scopes:
        authenticate_value = f'Bearer scope="{scopes.scope_str}"'
    else:
        authenticate_value = "Bearer"

    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if not username:
            raise credentials_exception
        token_scopes = payload.get("scopes", [])
        token_data = TokenData(scopes=token_scopes, username=username)
    except (InvalidTokenError, ValidationError):
        raise credentials_exception

    user = get_user(fake_users_db, username)
    if not user or user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")

    for scope in scopes.scopes:
        if scope not in token_data.scopes:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Not enough permissions",
                headers={"WWW-Authenticate": authenticate_value},
            )
    return user


@post(
    "/token",
    dependencies={"form_data": Inject(OAuth2PasswordRequestForm)},
    security=[oauth2_scheme],
)
async def login(form_data: OAuth2PasswordRequestForm = Form()) -> Dict[str, str]:
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username, "scopes": form_data.scopes}, expires_delta=access_token_expires
    )
    return Token(access_token=access_token, token_type="bearer")


@get(
    "/users/me",
    dependencies={"current_user": Inject(get_current_user)},
    security=[oauth2_scheme],
)
async def me(
    current_user: User = Injects(),
) -> User:
    return current_user


@get(
    "/users/me/items",
    dependencies={"current_user": Inject(get_current_user)},
    security=[oauth2_scheme],
)
async def get_user_items(current_user: User = Injects()) -> List[Dict[str, str]]:
    return [{"item_id": "Foo", "owner": current_user.username}]


app = Esmerald(
    routes=[
        Gateway(handler=login),
        Gateway(handler=me),
        Gateway(handler=get_user_items),
    ],
)

These scopes will appear in the API docs when you log in/authorize, allowing you to select which scopes to grant access to: me and items.

This is similar to granting permissions when logging in with Facebook, Google, GitHub, etc:

Scopes

JWT Token with Scopes

Next, update the token path operation to include the requested scopes in the response.

We continue to use OAuth2PasswordRequestForm, which has a scopes property containing the list of scopes from the request.

These scopes will be included in the JWT token returned.

Danger

To keep things simple, we directly include the received scopes in the token.

In your application, make sure to only include scopes that the user is permitted to have.

from datetime import datetime, timedelta, timezone
from typing import Dict, List

import jwt
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError

from esmerald import (
    Esmerald,
    Gateway,
    HTTPException,
    Inject,
    Injects,
    Security,
    get,
    post,
    status,
)
from esmerald.security.scopes import Scopes
from esmerald.params import Form
from esmerald.security.oauth2 import OAuth2PasswordBearer, OAuth2PasswordRequestForm


SECRET_KEY = "adec4de83525abdd446b258d0df8a3cc151ee65e95ae8b8ccf51b643df71afcf"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# Pasword context
password_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(
    tokenUrl="/token",
    scopes={"me": "Read information about the current user.", "items": "Read items."},
)

fake_users_db = {
    "janedoe": {
        "username": "janedoe",
        "full_name": "Jane Doe",
        "email": "janedoe@example.com",
        "hashed_password": "$2a$12$KplebFTPwFcgGQosJgI4De0PyB2AoRCSxasxHpFoYZPp6uQV/xLzm",
        "disabled": False,
    }
}


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: str | None = None
    scopes: List[str] = []


class User(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None
    disabled: bool | None = None


class UserDB(User):
    hashed_password: str


def verify_password(plain_password: str, hashed_password: str) -> bool:
    return password_context.verify(plain_password, hashed_password)


def get_password_hash(password: str) -> str:
    return password_context.hash(password)


def get_user(db: Dict[str, Dict[str, str]], username: str) -> User | None:
    user_dict = db.get(username)
    return User(**user_dict) if user_dict else None


def authenticate_user(fake_db, username: str, password: str) -> UserDB | None:
    user = get_user(fake_db, username)
    if user and verify_password(password, user.hashed_password):
        return user
    return None


def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


async def get_current_user(
    scopes: Scopes, token: str = Security(oauth2_scheme, scopes=["me"])
) -> User:
    if scopes.scopes:
        authenticate_value = f'Bearer scope="{scopes.scope_str}"'
    else:
        authenticate_value = "Bearer"

    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if not username:
            raise credentials_exception
        token_scopes = payload.get("scopes", [])
        token_data = TokenData(scopes=token_scopes, username=username)
    except (InvalidTokenError, ValidationError):
        raise credentials_exception

    user = get_user(fake_users_db, username)
    if not user or user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")

    for scope in scopes.scopes:
        if scope not in token_data.scopes:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Not enough permissions",
                headers={"WWW-Authenticate": authenticate_value},
            )
    return user


@post(
    "/token",
    dependencies={"form_data": Inject(OAuth2PasswordRequestForm)},
    security=[oauth2_scheme],
)
async def login(form_data: OAuth2PasswordRequestForm = Form()) -> Dict[str, str]:
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username, "scopes": form_data.scopes}, expires_delta=access_token_expires
    )
    return Token(access_token=access_token, token_type="bearer")


@get(
    "/users/me",
    dependencies={"current_user": Inject(get_current_user)},
    security=[oauth2_scheme],
)
async def me(
    current_user: User = Injects(),
) -> User:
    return current_user


@get(
    "/users/me/items",
    dependencies={"current_user": Inject(get_current_user)},
    security=[oauth2_scheme],
)
async def get_user_items(current_user: User = Injects()) -> List[Dict[str, str]]:
    return [{"item_id": "Foo", "owner": current_user.username}]


app = Esmerald(
    routes=[
        Gateway(handler=login),
        Gateway(handler=me),
        Gateway(handler=get_user_items),
    ],
)

Declare Scopes in Path Operations and Dependencies

To require the items scope for the /users/me/items/ path operation, use Security from Esmerald. This works similarly to Inject, but includes a scopes parameter.

Pass the get_current_user dependency function to Security, along with the required scopes (in this case, items).

Info

You don't need to declare scopes in multiple locations. This example shows how Esmerald manages scopes defined at different levels.

from datetime import datetime, timedelta, timezone
from typing import Dict, List

import jwt
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError

from esmerald import (
    Esmerald,
    Gateway,
    HTTPException,
    Inject,
    Injects,
    Security,
    get,
    post,
    status,
)
from esmerald.security.scopes import Scopes
from esmerald.params import Form
from esmerald.security.oauth2 import OAuth2PasswordBearer, OAuth2PasswordRequestForm


SECRET_KEY = "adec4de83525abdd446b258d0df8a3cc151ee65e95ae8b8ccf51b643df71afcf"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# Pasword context
password_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(
    tokenUrl="/token",
    scopes={"me": "Read information about the current user.", "items": "Read items."},
)

fake_users_db = {
    "janedoe": {
        "username": "janedoe",
        "full_name": "Jane Doe",
        "email": "janedoe@example.com",
        "hashed_password": "$2a$12$KplebFTPwFcgGQosJgI4De0PyB2AoRCSxasxHpFoYZPp6uQV/xLzm",
        "disabled": False,
    }
}


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: str | None = None
    scopes: List[str] = []


class User(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None
    disabled: bool | None = None


class UserDB(User):
    hashed_password: str


def verify_password(plain_password: str, hashed_password: str) -> bool:
    return password_context.verify(plain_password, hashed_password)


def get_password_hash(password: str) -> str:
    return password_context.hash(password)


def get_user(db: Dict[str, Dict[str, str]], username: str) -> User | None:
    user_dict = db.get(username)
    return User(**user_dict) if user_dict else None


def authenticate_user(fake_db, username: str, password: str) -> UserDB | None:
    user = get_user(fake_db, username)
    if user and verify_password(password, user.hashed_password):
        return user
    return None


def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


async def get_current_user(
    scopes: Scopes, token: str = Security(oauth2_scheme, scopes=["me"])
) -> User:
    if scopes.scopes:
        authenticate_value = f'Bearer scope="{scopes.scope_str}"'
    else:
        authenticate_value = "Bearer"

    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if not username:
            raise credentials_exception
        token_scopes = payload.get("scopes", [])
        token_data = TokenData(scopes=token_scopes, username=username)
    except (InvalidTokenError, ValidationError):
        raise credentials_exception

    user = get_user(fake_users_db, username)
    if not user or user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")

    for scope in scopes.scopes:
        if scope not in token_data.scopes:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Not enough permissions",
                headers={"WWW-Authenticate": authenticate_value},
            )
    return user


@post(
    "/token",
    dependencies={"form_data": Inject(OAuth2PasswordRequestForm)},
    security=[oauth2_scheme],
)
async def login(form_data: OAuth2PasswordRequestForm = Form()) -> Dict[str, str]:
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username, "scopes": form_data.scopes}, expires_delta=access_token_expires
    )
    return Token(access_token=access_token, token_type="bearer")


@get(
    "/users/me",
    dependencies={"current_user": Inject(get_current_user)},
    security=[oauth2_scheme],
)
async def me(
    current_user: User = Injects(),
) -> User:
    return current_user


@get(
    "/users/me/items",
    dependencies={"current_user": Inject(get_current_user)},
    security=[oauth2_scheme],
)
async def get_user_items(current_user: User = Injects()) -> List[Dict[str, str]]:
    return [{"item_id": "Foo", "owner": current_user.username}]


app = Esmerald(
    routes=[
        Gateway(handler=login),
        Gateway(handler=me),
        Gateway(handler=get_user_items),
    ],
)

Using SecurityScopes

Update the get_current_user dependency to use the previously created OAuth2 scheme.

Since this function does not require scopes itself, use Inject with oauth2_scheme.

Declare a SecurityScopes parameter, imported from esmerald.security.scopes.

from datetime import datetime, timedelta, timezone
from typing import Dict, List

import jwt
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError

from esmerald import (
    Esmerald,
    Gateway,
    HTTPException,
    Inject,
    Injects,
    Security,
    get,
    post,
    status,
)
from esmerald.security.scopes import Scopes
from esmerald.params import Form
from esmerald.security.oauth2 import OAuth2PasswordBearer, OAuth2PasswordRequestForm


SECRET_KEY = "adec4de83525abdd446b258d0df8a3cc151ee65e95ae8b8ccf51b643df71afcf"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# Pasword context
password_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(
    tokenUrl="/token",
    scopes={"me": "Read information about the current user.", "items": "Read items."},
)

fake_users_db = {
    "janedoe": {
        "username": "janedoe",
        "full_name": "Jane Doe",
        "email": "janedoe@example.com",
        "hashed_password": "$2a$12$KplebFTPwFcgGQosJgI4De0PyB2AoRCSxasxHpFoYZPp6uQV/xLzm",
        "disabled": False,
    }
}


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: str | None = None
    scopes: List[str] = []


class User(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None
    disabled: bool | None = None


class UserDB(User):
    hashed_password: str


def verify_password(plain_password: str, hashed_password: str) -> bool:
    return password_context.verify(plain_password, hashed_password)


def get_password_hash(password: str) -> str:
    return password_context.hash(password)


def get_user(db: Dict[str, Dict[str, str]], username: str) -> User | None:
    user_dict = db.get(username)
    return User(**user_dict) if user_dict else None


def authenticate_user(fake_db, username: str, password: str) -> UserDB | None:
    user = get_user(fake_db, username)
    if user and verify_password(password, user.hashed_password):
        return user
    return None


def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


async def get_current_user(
    scopes: Scopes, token: str = Security(oauth2_scheme, scopes=["me"])
) -> User:
    if scopes.scopes:
        authenticate_value = f'Bearer scope="{scopes.scope_str}"'
    else:
        authenticate_value = "Bearer"

    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if not username:
            raise credentials_exception
        token_scopes = payload.get("scopes", [])
        token_data = TokenData(scopes=token_scopes, username=username)
    except (InvalidTokenError, ValidationError):
        raise credentials_exception

    user = get_user(fake_users_db, username)
    if not user or user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")

    for scope in scopes.scopes:
        if scope not in token_data.scopes:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Not enough permissions",
                headers={"WWW-Authenticate": authenticate_value},
            )
    return user


@post(
    "/token",
    dependencies={"form_data": Inject(OAuth2PasswordRequestForm)},
    security=[oauth2_scheme],
)
async def login(form_data: OAuth2PasswordRequestForm = Form()) -> Dict[str, str]:
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username, "scopes": form_data.scopes}, expires_delta=access_token_expires
    )
    return Token(access_token=access_token, token_type="bearer")


@get(
    "/users/me",
    dependencies={"current_user": Inject(get_current_user)},
    security=[oauth2_scheme],
)
async def me(
    current_user: User = Injects(),
) -> User:
    return current_user


@get(
    "/users/me/items",
    dependencies={"current_user": Inject(get_current_user)},
    security=[oauth2_scheme],
)
async def get_user_items(current_user: User = Injects()) -> List[Dict[str, str]]:
    return [{"item_id": "Foo", "owner": current_user.username}]


app = Esmerald(
    routes=[
        Gateway(handler=login),
        Gateway(handler=me),
        Gateway(handler=get_user_items),
    ],
)

Using the Scopes

The scopes parameter will be of type SecurityScopes.

It includes a scopes property, which is a list of all the scopes required by itself and any dependencies that use it as a sub-dependency. This might sound confusing, but it will be explained further below.

The scopes object also has a scope_str attribute, which is a single string containing all the scopes separated by spaces (we will use this later).

We create an HTTPException that can be reused (raise) at various points.

In this exception, we include the required scopes (if any) as a space-separated string (using scope_str). This string is placed in the WWW-Authenticate header, as specified by the OAuth2 standard.

from datetime import datetime, timedelta, timezone
from typing import Dict, List

import jwt
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError

from esmerald import (
    Esmerald,
    Gateway,
    HTTPException,
    Inject,
    Injects,
    Security,
    get,
    post,
    status,
)
from esmerald.security.scopes import Scopes
from esmerald.params import Form
from esmerald.security.oauth2 import OAuth2PasswordBearer, OAuth2PasswordRequestForm


SECRET_KEY = "adec4de83525abdd446b258d0df8a3cc151ee65e95ae8b8ccf51b643df71afcf"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# Pasword context
password_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(
    tokenUrl="/token",
    scopes={"me": "Read information about the current user.", "items": "Read items."},
)

fake_users_db = {
    "janedoe": {
        "username": "janedoe",
        "full_name": "Jane Doe",
        "email": "janedoe@example.com",
        "hashed_password": "$2a$12$KplebFTPwFcgGQosJgI4De0PyB2AoRCSxasxHpFoYZPp6uQV/xLzm",
        "disabled": False,
    }
}


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: str | None = None
    scopes: List[str] = []


class User(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None
    disabled: bool | None = None


class UserDB(User):
    hashed_password: str


def verify_password(plain_password: str, hashed_password: str) -> bool:
    return password_context.verify(plain_password, hashed_password)


def get_password_hash(password: str) -> str:
    return password_context.hash(password)


def get_user(db: Dict[str, Dict[str, str]], username: str) -> User | None:
    user_dict = db.get(username)
    return User(**user_dict) if user_dict else None


def authenticate_user(fake_db, username: str, password: str) -> UserDB | None:
    user = get_user(fake_db, username)
    if user and verify_password(password, user.hashed_password):
        return user
    return None


def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


async def get_current_user(
    scopes: Scopes, token: str = Security(oauth2_scheme, scopes=["me"])
) -> User:
    if scopes.scopes:
        authenticate_value = f'Bearer scope="{scopes.scope_str}"'
    else:
        authenticate_value = "Bearer"

    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if not username:
            raise credentials_exception
        token_scopes = payload.get("scopes", [])
        token_data = TokenData(scopes=token_scopes, username=username)
    except (InvalidTokenError, ValidationError):
        raise credentials_exception

    user = get_user(fake_users_db, username)
    if not user or user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")

    for scope in scopes.scopes:
        if scope not in token_data.scopes:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Not enough permissions",
                headers={"WWW-Authenticate": authenticate_value},
            )
    return user


@post(
    "/token",
    dependencies={"form_data": Inject(OAuth2PasswordRequestForm)},
    security=[oauth2_scheme],
)
async def login(form_data: OAuth2PasswordRequestForm = Form()) -> Dict[str, str]:
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username, "scopes": form_data.scopes}, expires_delta=access_token_expires
    )
    return Token(access_token=access_token, token_type="bearer")


@get(
    "/users/me",
    dependencies={"current_user": Inject(get_current_user)},
    security=[oauth2_scheme],
)
async def me(
    current_user: User = Injects(),
) -> User:
    return current_user


@get(
    "/users/me/items",
    dependencies={"current_user": Inject(get_current_user)},
    security=[oauth2_scheme],
)
async def get_user_items(current_user: User = Injects()) -> List[Dict[str, str]]:
    return [{"item_id": "Foo", "owner": current_user.username}]


app = Esmerald(
    routes=[
        Gateway(handler=login),
        Gateway(handler=me),
        Gateway(handler=get_user_items),
    ],
)

Verify username and Data Structure

Check the username and extract the scopes.

Use a Pydantic model to validate the data, raising an HTTPException if validation fails.

Update the TokenData Pydantic model to include a scopes property.

Ensure the data structure is correct to prevent security issues.

Confirm the user exists, raising an exception if not.

from datetime import datetime, timedelta, timezone
from typing import Dict, List

import jwt
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError

from esmerald import (
    Esmerald,
    Gateway,
    HTTPException,
    Inject,
    Injects,
    Security,
    get,
    post,
    status,
)
from esmerald.security.scopes import Scopes
from esmerald.params import Form
from esmerald.security.oauth2 import OAuth2PasswordBearer, OAuth2PasswordRequestForm


SECRET_KEY = "adec4de83525abdd446b258d0df8a3cc151ee65e95ae8b8ccf51b643df71afcf"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# Pasword context
password_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(
    tokenUrl="/token",
    scopes={"me": "Read information about the current user.", "items": "Read items."},
)

fake_users_db = {
    "janedoe": {
        "username": "janedoe",
        "full_name": "Jane Doe",
        "email": "janedoe@example.com",
        "hashed_password": "$2a$12$KplebFTPwFcgGQosJgI4De0PyB2AoRCSxasxHpFoYZPp6uQV/xLzm",
        "disabled": False,
    }
}


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: str | None = None
    scopes: List[str] = []


class User(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None
    disabled: bool | None = None


class UserDB(User):
    hashed_password: str


def verify_password(plain_password: str, hashed_password: str) -> bool:
    return password_context.verify(plain_password, hashed_password)


def get_password_hash(password: str) -> str:
    return password_context.hash(password)


def get_user(db: Dict[str, Dict[str, str]], username: str) -> User | None:
    user_dict = db.get(username)
    return User(**user_dict) if user_dict else None


def authenticate_user(fake_db, username: str, password: str) -> UserDB | None:
    user = get_user(fake_db, username)
    if user and verify_password(password, user.hashed_password):
        return user
    return None


def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


async def get_current_user(
    scopes: Scopes, token: str = Security(oauth2_scheme, scopes=["me"])
) -> User:
    if scopes.scopes:
        authenticate_value = f'Bearer scope="{scopes.scope_str}"'
    else:
        authenticate_value = "Bearer"

    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if not username:
            raise credentials_exception
        token_scopes = payload.get("scopes", [])
        token_data = TokenData(scopes=token_scopes, username=username)
    except (InvalidTokenError, ValidationError):
        raise credentials_exception

    user = get_user(fake_users_db, username)
    if not user or user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")

    for scope in scopes.scopes:
        if scope not in token_data.scopes:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Not enough permissions",
                headers={"WWW-Authenticate": authenticate_value},
            )
    return user


@post(
    "/token",
    dependencies={"form_data": Inject(OAuth2PasswordRequestForm)},
    security=[oauth2_scheme],
)
async def login(form_data: OAuth2PasswordRequestForm = Form()) -> Dict[str, str]:
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username, "scopes": form_data.scopes}, expires_delta=access_token_expires
    )
    return Token(access_token=access_token, token_type="bearer")


@get(
    "/users/me",
    dependencies={"current_user": Inject(get_current_user)},
    security=[oauth2_scheme],
)
async def me(
    current_user: User = Injects(),
) -> User:
    return current_user


@get(
    "/users/me/items",
    dependencies={"current_user": Inject(get_current_user)},
    security=[oauth2_scheme],
)
async def get_user_items(current_user: User = Injects()) -> List[Dict[str, str]]:
    return [{"item_id": "Foo", "owner": current_user.username}]


app = Esmerald(
    routes=[
        Gateway(handler=login),
        Gateway(handler=me),
        Gateway(handler=get_user_items),
    ],
)

Verify the Scopes

Check that the token includes all necessary scopes. If any required scopes are missing, raise an HTTPException.

from datetime import datetime, timedelta, timezone
from typing import Dict, List

import jwt
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError

from esmerald import (
    Esmerald,
    Gateway,
    HTTPException,
    Inject,
    Injects,
    Security,
    get,
    post,
    status,
)
from esmerald.security.scopes import Scopes
from esmerald.params import Form
from esmerald.security.oauth2 import OAuth2PasswordBearer, OAuth2PasswordRequestForm


SECRET_KEY = "adec4de83525abdd446b258d0df8a3cc151ee65e95ae8b8ccf51b643df71afcf"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# Pasword context
password_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(
    tokenUrl="/token",
    scopes={"me": "Read information about the current user.", "items": "Read items."},
)

fake_users_db = {
    "janedoe": {
        "username": "janedoe",
        "full_name": "Jane Doe",
        "email": "janedoe@example.com",
        "hashed_password": "$2a$12$KplebFTPwFcgGQosJgI4De0PyB2AoRCSxasxHpFoYZPp6uQV/xLzm",
        "disabled": False,
    }
}


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: str | None = None
    scopes: List[str] = []


class User(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None
    disabled: bool | None = None


class UserDB(User):
    hashed_password: str


def verify_password(plain_password: str, hashed_password: str) -> bool:
    return password_context.verify(plain_password, hashed_password)


def get_password_hash(password: str) -> str:
    return password_context.hash(password)


def get_user(db: Dict[str, Dict[str, str]], username: str) -> User | None:
    user_dict = db.get(username)
    return User(**user_dict) if user_dict else None


def authenticate_user(fake_db, username: str, password: str) -> UserDB | None:
    user = get_user(fake_db, username)
    if user and verify_password(password, user.hashed_password):
        return user
    return None


def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


async def get_current_user(
    scopes: Scopes, token: str = Security(oauth2_scheme, scopes=["me"])
) -> User:
    if scopes.scopes:
        authenticate_value = f'Bearer scope="{scopes.scope_str}"'
    else:
        authenticate_value = "Bearer"

    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if not username:
            raise credentials_exception
        token_scopes = payload.get("scopes", [])
        token_data = TokenData(scopes=token_scopes, username=username)
    except (InvalidTokenError, ValidationError):
        raise credentials_exception

    user = get_user(fake_users_db, username)
    if not user or user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")

    for scope in scopes.scopes:
        if scope not in token_data.scopes:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Not enough permissions",
                headers={"WWW-Authenticate": authenticate_value},
            )
    return user


@post(
    "/token",
    dependencies={"form_data": Inject(OAuth2PasswordRequestForm)},
    security=[oauth2_scheme],
)
async def login(form_data: OAuth2PasswordRequestForm = Form()) -> Dict[str, str]:
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username, "scopes": form_data.scopes}, expires_delta=access_token_expires
    )
    return Token(access_token=access_token, token_type="bearer")


@get(
    "/users/me",
    dependencies={"current_user": Inject(get_current_user)},
    security=[oauth2_scheme],
)
async def me(
    current_user: User = Injects(),
) -> User:
    return current_user


@get(
    "/users/me/items",
    dependencies={"current_user": Inject(get_current_user)},
    security=[oauth2_scheme],
)
async def get_user_items(current_user: User = Injects()) -> List[Dict[str, str]]:
    return [{"item_id": "Foo", "owner": current_user.username}]


app = Esmerald(
    routes=[
        Gateway(handler=login),
        Gateway(handler=me),
        Gateway(handler=get_user_items),
    ],
)

More Details about SecurityScopes

You can use SecurityScopes at any point in the dependency tree.

It will always contain the scopes declared by the current Security dependencies and all dependants for that specific path operation.

Use SecurityScopes to verify token scopes in a central dependency function, with different scope requirements for each path operation.

About Third-Party Integrations

This example demonstrates the OAuth2 "password" flow, which is ideal for logging into your own application using your frontend.

For applications where users connect through third-party providers (like Facebook, Google, GitHub), other OAuth2 flows are more appropriate.

The implicit flow is commonly used, while the authorization code flow is more secure but also more complex.

Note

Authentication providers might use different names for their flows, but they all adhere to the OAuth2 standard. Esmerald provides utilities for all OAuth2 authentication flows in esmerald.security.oauth2.

Notes

These step by step guides were inspired by FastAPI great work of providing simple and yet effective examples for everyone to understand.

Esmerald adopts a different implementation internally but with the same purposes as any other framework to achieve that.