OAuth2 with Password and Bearer¶
Now, let's build upon the previous chapter and add the missing parts to complete the security flow.
The following examples were inspired by the same examples of FastAPI so it is normal if you feel familiar. The reson for that its to make sure you don't need to have a new learning curve in terms of understanding and flow.
The username
and password
¶
We’re going to use Esmerald security utilities to handle the username
and password
.
According to the OAuth2 specification, when using the "password flow" (which we are using), the client/user must send username
and password
fields as form data.
The specification requires these fields to be named exactly as username
and password
so names like user-name
or email
won’t work in this case.
However, don’t worry, you can display these fields however you like in the frontend, and your database models can use different names if needed.
But for the login path operation, we need to follow these names to stay compliant with the specification (and to ensure compatibility with tools like the integrated API documentation).
Additionally, the spec specifies that the username
and password
should be sent as form data, so no JSON here.
The scope
¶
The specification also allows the client to send another form field, scope
.
The field name must be scope
(in singular), but it is actually a string containing "scopes" separated by spaces.
Each "scope" is a single string without spaces, and they are typically used to define specific security permissions. For example:
users:read
orusers:write
are common scopes.instagram_basic
is used by Facebook/Instagram.https://www.googleapis.com/auth/drive
is used by Google.
These scopes help specify the level of access or permissions the user or client is requesting.
Info
In OAuth2, a "scope" is simply a string that declares a specific permission required.
It doesn't matter if the string includes other characters like :
or if it's a URL.
These details are implementation-specific, but for OAuth2, scopes are just strings.
The operation to get the username
and password
¶
Let us use the Esmerald built-ins to perform this operation.
OAuth2PasswordRequestForm¶
First, import OAuth2PasswordRequestForm
, and use it as a dependency with Security
, Inject
and Injects
in the path operation for /token
:
from typing import Dict
from pydantic import BaseModel
from esmerald import Esmerald, Gateway, HTTPException, Inject, Injects, Security, get, post, status
from esmerald.security.oauth2 import OAuth2PasswordBearer, OAuth2PasswordRequestForm
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
fake_users_db = {
"janedoe": {
"username": "janedoe",
"full_name": "Jane Doe",
"email": "janedoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
"peter": {
"username": "peter",
"full_name": "Peter Parker",
"email": "pparker@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
}
def fake_hash_password(password: str):
return "fakehashed" + password
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
class UserDB(User):
hashed_password: str
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserDB(**user_dict)
def fake_decode_token(token: str):
user = get_user(fake_users_db, token)
return user
async def get_current_user(token: str = Security(oauth2_scheme)):
user = fake_decode_token(token)
if user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
@post(
"/token",
dependencies={"form_data": Inject(OAuth2PasswordRequestForm)},
security=[oauth2_scheme],
)
async def login(form_data: OAuth2PasswordRequestForm = Injects()) -> Dict[str, str]:
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
@get("/users/me", dependencies={"current_user": Inject(get_current_user)})
async def read_users_me(
current_user: User = Injects(),
) -> User:
return current_user
app = Esmerald(
routes=[
Gateway(handler=login),
Gateway(handler=read_users_me),
],
debug=True,
)
Note
The Inject
and Injects()
are what makes Esmerald dependency injection quite unique and layer based.
The OAuth2PasswordRequestForm
is a class dependency that defines a form body containing the following fields:
- The
username
. - The
password
. - An optional
scope
field, which is a single string made up of multiple strings separated by spaces. - An optional
grant_type
.
Tip
According to the OAuth2 specification, the grant_type
field is required and must have a fixed value of password
. However, OAuth2PasswordRequestForm
does not enforce this requirement.
If you need to strictly enforce the grant_type
field, you can use OAuth2PasswordRequestFormStrict
instead of OAuth2PasswordRequestForm
.
- An optional
client_id
(not needed for our example). - An optional
client_secret
(also not needed for our example).
Info
The OAuth2PasswordRequestForm
is not a special class in Esmerald, unlike OAuth2PasswordBearer
.
OAuth2PasswordBearer
informs Esmerald that it represents a security scheme, which is why it gets added as such to the OpenAPI schema.
In contrast, OAuth2PasswordRequestForm
is simply a convenience class dependency. You could have written it yourself or declared the Form
parameters directly.
Since it's a common use case, Esmerald provides this class out of the box to make your work easier.
The form data¶
Tip
The instance of the OAuth2PasswordRequestForm
dependency class won’t have a scope
attribute containing the long string separated by spaces. Instead, it will have a scopes
attribute, which is a list of individual strings representing each scope sent.
Although we’re not using scopes
in this example, the functionality is available if you need it.
Retrieve the user data from the (fake) database using the username
from the form field.
If no user is found, raise an HTTPException
with the message: "Incorrect username or password".
from typing import Dict
from pydantic import BaseModel
from esmerald import Esmerald, Gateway, HTTPException, Inject, Injects, Security, get, post, status
from esmerald.security.oauth2 import OAuth2PasswordBearer, OAuth2PasswordRequestForm
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
fake_users_db = {
"janedoe": {
"username": "janedoe",
"full_name": "Jane Doe",
"email": "janedoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
"peter": {
"username": "peter",
"full_name": "Peter Parker",
"email": "pparker@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
}
def fake_hash_password(password: str):
return "fakehashed" + password
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
class UserDB(User):
hashed_password: str
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserDB(**user_dict)
def fake_decode_token(token: str):
user = get_user(fake_users_db, token)
return user
async def get_current_user(token: str = Security(oauth2_scheme)):
user = fake_decode_token(token)
if user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
@post(
"/token",
dependencies={"form_data": Inject(OAuth2PasswordRequestForm)},
security=[oauth2_scheme],
)
async def login(form_data: OAuth2PasswordRequestForm = Injects()) -> Dict[str, str]:
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
@get("/users/me", dependencies={"current_user": Inject(get_current_user)})
async def read_users_me(
current_user: User = Injects(),
) -> User:
return current_user
app = Esmerald(
routes=[
Gateway(handler=login),
Gateway(handler=read_users_me),
],
debug=True,
)
Checking the Password¶
Now that we have the user data from our database, we need to verify the password.
First, we will place the user data into the Pydantic UserDB
model.
Since storing plaintext passwords is unsafe, we'll use a (fake) password hashing system for verification.
If the passwords don’t match, we'll return the same error as before.
What is Password Hashing?¶
Hashing transforms a value (like a password) into a seemingly random sequence of bytes (a string) that looks like gibberish.
- Providing the same input (password) always produces the same hash.
- However, it is a one-way process. You cannot reverse a hash back to the original password.
Why Use Password Hashing?¶
If your database is compromised, the attacker won't have access to the user's plaintext passwords—only the hashes.
This protects users because the attacker cannot reuse their passwords on other systems (a common risk since many people reuse passwords).
from typing import Dict
from pydantic import BaseModel
from esmerald import Esmerald, Gateway, HTTPException, Inject, Injects, Security, get, post, status
from esmerald.security.oauth2 import OAuth2PasswordBearer, OAuth2PasswordRequestForm
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
fake_users_db = {
"janedoe": {
"username": "janedoe",
"full_name": "Jane Doe",
"email": "janedoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
"peter": {
"username": "peter",
"full_name": "Peter Parker",
"email": "pparker@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
}
def fake_hash_password(password: str):
return "fakehashed" + password
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
class UserDB(User):
hashed_password: str
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserDB(**user_dict)
def fake_decode_token(token: str):
user = get_user(fake_users_db, token)
return user
async def get_current_user(token: str = Security(oauth2_scheme)):
user = fake_decode_token(token)
if user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
@post(
"/token",
dependencies={"form_data": Inject(OAuth2PasswordRequestForm)},
security=[oauth2_scheme],
)
async def login(form_data: OAuth2PasswordRequestForm = Injects()) -> Dict[str, str]:
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
@get("/users/me", dependencies={"current_user": Inject(get_current_user)})
async def read_users_me(
current_user: User = Injects(),
) -> User:
return current_user
app = Esmerald(
routes=[
Gateway(handler=login),
Gateway(handler=read_users_me),
],
debug=True,
)
About the **user_dict
¶
UserDB(**user_dict)
means:
It takes the keys and values from the user_dict
and passes them directly as key-value arguments to the UserDB
constructor. This is equivalent to:
UserDB(
username=user_dict["username"],
email=user_dict["email"],
full_name=user_dict["full_name"],
disabled=user_dict["disabled"],
hashed_password=user_dict["hashed_password"],
)
Returning the Token¶
The response from the token
endpoint should be a JSON object containing:
- A
token_type
. Since we're using "Bearer" tokens, it should be set to"bearer"
. - An
access_token
, which is a string containing the actual token.
In this simplified example, we'll just return the username
as the token (though this is insecure).
Tip
In the next chapter, we'll implement a secure version using password hashing and JSON Web Tokens (JWT). But for now, let's focus on the key details.
from typing import Dict
from pydantic import BaseModel
from esmerald import Esmerald, Gateway, HTTPException, Inject, Injects, Security, get, post, status
from esmerald.security.oauth2 import OAuth2PasswordBearer, OAuth2PasswordRequestForm
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
fake_users_db = {
"janedoe": {
"username": "janedoe",
"full_name": "Jane Doe",
"email": "janedoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
"peter": {
"username": "peter",
"full_name": "Peter Parker",
"email": "pparker@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
}
def fake_hash_password(password: str):
return "fakehashed" + password
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
class UserDB(User):
hashed_password: str
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserDB(**user_dict)
def fake_decode_token(token: str):
user = get_user(fake_users_db, token)
return user
async def get_current_user(token: str = Security(oauth2_scheme)):
user = fake_decode_token(token)
if user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
@post(
"/token",
dependencies={"form_data": Inject(OAuth2PasswordRequestForm)},
security=[oauth2_scheme],
)
async def login(form_data: OAuth2PasswordRequestForm = Injects()) -> Dict[str, str]:
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
@get("/users/me", dependencies={"current_user": Inject(get_current_user)})
async def read_users_me(
current_user: User = Injects(),
) -> User:
return current_user
app = Esmerald(
routes=[
Gateway(handler=login),
Gateway(handler=read_users_me),
],
debug=True,
)
Info
According to the spec, the response should include a JSON with an access_token
and a token_type
, as shown in this example.
This is something you must implement in your code, ensuring the correct use of these JSON keys.
It's almost the only part you need to manage manually to comply with the specifications. For everything else, Esmerald takes care of it for you.
Updating the Dependencies¶
Now, let's update our dependencies.
We want to retrieve the current_user
only if the user is active. To do this, we will create a new dependency, get_current_active_user
, which will rely on get_current_user
as a sub-dependency.
Both dependencies will raise an HTTP error if the user doesn't exist or if the user is inactive.
With this update, the endpoint will only return a user if the user exists, is authenticated correctly, and is active.
from typing import Dict
from pydantic import BaseModel
from esmerald import Esmerald, Gateway, HTTPException, Inject, Injects, Security, get, post, status
from esmerald.security.oauth2 import OAuth2PasswordBearer, OAuth2PasswordRequestForm
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
fake_users_db = {
"janedoe": {
"username": "janedoe",
"full_name": "Jane Doe",
"email": "janedoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
"peter": {
"username": "peter",
"full_name": "Peter Parker",
"email": "pparker@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
}
def fake_hash_password(password: str):
return "fakehashed" + password
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
class UserDB(User):
hashed_password: str
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserDB(**user_dict)
def fake_decode_token(token: str):
user = get_user(fake_users_db, token)
return user
async def get_current_user(token: str = Security(oauth2_scheme)):
user = fake_decode_token(token)
if user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
@post(
"/token",
dependencies={"form_data": Inject(OAuth2PasswordRequestForm)},
security=[oauth2_scheme],
)
async def login(form_data: OAuth2PasswordRequestForm = Injects()) -> Dict[str, str]:
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
@get("/users/me", dependencies={"current_user": Inject(get_current_user)})
async def read_users_me(
current_user: User = Injects(),
) -> User:
return current_user
app = Esmerald(
routes=[
Gateway(handler=login),
Gateway(handler=read_users_me),
],
debug=True,
)
Info
The additional WWW-Authenticate
header with the value Bearer
is part of the OAuth2 specification.
Any HTTP error with a status code 401 "UNAUTHORIZED" should include this header. For bearer tokens (like in our case), the header's value should be Bearer
.
While you can technically omit this header and it will still function, including it ensures compliance with the specification. Additionally, some tools may expect and use this header, either now or in the future, which could be helpful for you or your users.
That's the advantage of following standards.
Go ahead and test it¶
Open the OpenAPI documentation and check it out: http://localhost:8000/docs/swagger.
Authenticate¶
Click the Authorize button and use the following credentials:
- User:
janedoe
- Password:
secret
.
After pressing the authenticate, you should be able to see something like this:
Get the data¶
Now it is time to test and get the data using the GET
method provided in the examples /users/me
.
You will get a payload similar to this:
{
"username": "johndoe",
"email": "johndoe@example.com",
"full_name": "John Doe",
"disabled": false,
"hashed_password": "fakehashedsecret"
}
Now, if you logout by clicking in the logout icon, you should receive a 401.
Inactive users¶
Now you can try with an inactive user and see what happens.
- User:
peter
- Password:
secret2
.
You should have an error like this:
{
"detail": "Inactive user"
}
As you can see, we have now implemented a simple and yet effective authentication.