Đầu tiên là 1 vài lệnh ban đầu
sudo apt install python3.10-venv sudo python3 -m venv fastapi-mongodb/.venv source .venv/bin/activate pip3.10 install -r fastapi-mongodb/requirements.txt uvicorn main:app --reload

curl -X GET "http://localhost:8000/users" -H "accept: application/json"
code:
import motor.motor_asyncio
from fastapi import FastAPI, Response, Cookie, Request, status
from pydantic import BaseModel, EmailStr
from fastapi.encoders import jsonable_encoder
from bson import ObjectId
from pprint import pprint
from passlib.context import CryptContext
from fastapi.security import HTTPBasic, HTTPBasicCredentials
### CONSTANT
MONGODB_URL = "mongodb://127.0.0.1:27017/"
COOKIE_KEY = "0e7cdba83c30045082e0af5ff21c23180a862b7e32a53f2f5072ad606f95"
APP_SESSIONS = {}
### App
app = FastAPI()
security = HTTPBasic()
pwd_context = CryptContext(schemes=["bcrypt"])
### MongoDB
client = motor.motor_asyncio.AsyncIOMotorClient(MONGODB_URL)
db = client.itforvn
collection_users = db.get_collection("users")
class User(BaseModel):
fullname: str
email: EmailStr
password: str
class Config:
schema_extra = {
"example": {
"fullname": "John Doe",
"email": "john@gmail.com",
"password": "***",
}
}
class UserLogin(BaseModel):
email: EmailStr
password: str
def ResponseModel(data, message):
return {"data": [data], "code": 200, "message": message}
def ErrorResponseModel(error, code, message):
return {"error": error, "code": code, "message": message}
def user_helper(user) -> dict:
return {
"id": str(user["_id"]),
"fullname": user["fullname"],
"email": user["email"],
}
class Hasher:
@staticmethod
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
@staticmethod
def get_password_hash(password):
return pwd_context.hash(password)
@staticmethod
def get_cookie_hash(email):
return pwd_context.hash(COOKIE_KEY + email)
## User register - hashed password
@app.post("/users/register")
async def users_create(user: User, response: Response):
data = jsonable_encoder(user)
password_hash = Hasher.get_password_hash(data["password"])
data["password"] = password_hash
try:
add_user = await collection_users.insert_one(data)
new_user = await collection_users.find_one({"_id": add_user.inserted_id})
return ResponseModel(user_helper(new_user), "User added successfully.")
except:
return ErrorResponseModel("An error occurred.", 404, "User doesn't exist.")
# @app.post("/users/login")
# async def users_login(
# data: UserLogin,
# request: Request,
# response: Response,
# ):
# login_credentials = data.dict()
# _u_email = login_credentials["email"]
# _u_pass = login_credentials["password"]
# print(Hasher.get_password_hash(_u_pass))
# try:
# user = await collection_users.find_one({"email": _u_email})
# user_password = user["password"]
# print(f"hashed pass: {user_password}")
# if Hasher.verify_password(_u_pass, user_password):
# return {"message": "login success"}
# else:
# return {"message": "login fail"}
# except:
# return ErrorResponseModel("An error occurred.", 400, "Bad request!")
@app.get("/protected")
def users_protected(request: Request, response: Response):
try:
email = request.cookies.get("demo_email")
pprint(email)
login_cookie = request.cookies.get("demo_session")
if email and login_cookie and APP_SESSIONS[email] == login_cookie:
return {"message": "this is protected resource. enjoy!"}
else:
return ErrorResponseModel("An error occurred.", 403, "Request forbidden!")
except:
return ErrorResponseModel("An error occurred.", 403, "Request forbidden!")
@app.post("/users/login")
async def users_login(
data: UserLogin,
request: Request,
response: Response,
):
login_credential = data.dict()
_u_email = login_credential["email"]
_u_pass = login_credential["password"]
try:
user = await collection_users.find_one({"email": _u_email})
user_password = user["password"]
if Hasher.verify_password(_u_pass, user_password):
### Check if user logged in by cookie
try:
login_cookie = request.cookies.get("demo_session")
if login_cookie and APP_SESSIONS[_u_email] == login_cookie:
return {"message": "your are already login"}
except:
## Detect fraud
del APP_SESSIONS[_u_email]
return ErrorResponseModel("An error occurred.", 400, "Bad request!")
### If not logged in
cookie_hash = Hasher.get_cookie_hash(_u_email)
APP_SESSIONS[_u_email] = cookie_hash
response.set_cookie("demo_session", cookie_hash)
response.set_cookie("demo_email", _u_email)
return {"message": "login success"}
else:
return {"message": "login fail"}
except:
return ErrorResponseModel("An error occurred.", 400, "Bad request!")
@app.get("/users/logout")
def users_logout(request: Request, response: Response):
try:
email = request.cookies.get("demo_email")
del APP_SESSIONS[email]
response.delete_cookie("demo_email")
response.delete_cookie("demo_session")
return {"message": "goodbye"}
except:
return ErrorResponseModel("An error occurred.", 400, "Bad request!")
@app.get("/users")
async def user_list():
users = []
async for user in collection_users.find():
users.append(user_helper(user))
if users:
return ResponseModel(users, "Users data retrieved successfully")
return ResponseModel(users, "Empty list returned")
@app.get("/users/{id}")
async def user_get(id: str):
try:
user = await collection_users.find_one({"_id": ObjectId(id)})
except:
return ErrorResponseModel("An error occurred.", 400, "Invalid request")
if user:
return user_helper(user)
else:
return ErrorResponseModel("An error occurred.", 404, "User doesn't exist.")
class UpdateUser(BaseModel):
fullname: str | None = None
class Config:
schema_extra = {"example": {"fullname": "John Doe"}}
@app.put("/users/{id}")
async def user_update(id: str, req: UpdateUser):
data = {k: v for k, v in req.dict().items() if v is not None}
try:
user = await collection_users.find_one({"_id": ObjectId(id)})
if user:
updated_user = await collection_users.update_one(
{"_id": ObjectId(id)}, {"$set": data}
)
if updated_user:
return ResponseModel(
"User with ID: {} name update is successful".format(id),
"User updated successfully",
)
return ErrorResponseModel(
"An error occurred",
404,
"There was an error updating the user data.",
)
except:
return ErrorResponseModel("An error occurred.", 400, "Invalid request")
@app.delete("/users/{id}")
async def user_delete(id: str):
try:
user = await collection_users.find_one({"_id": ObjectId(id)})
if user:
await collection_users.delete_one({"_id": ObjectId(id)})
return ResponseModel(
"User with ID: {} removed".format(id), "User deleted successfully"
)
return ErrorResponseModel(
"An error occurred", 404, "User with id {0} doesn't exist".format(id)
)
except:
return ErrorResponseModel(
"An error occurred", 400, "User with id {0} is not valid".format(id)
)
file requirements.txt
anyio==3.6.2 black==22.10.0 bson==0.5.10 click==8.1.3 dnspython==2.2.1 email-validator==1.1.2 fastapi==0.88.0 gunicorn==20.1.0 h11==0.14.0 idna==3.4 more-itertools==8.14.0 motor==3.1.1 mypy-extensions==0.4.3 passlib==1.7.4 pathspec==0.10.2 platformdirs==2.5.4 pptree==3.1 pydantic==1.10.2 pymongo==4.3.3 python-dateutil==2.8.2 python-ulid==1.1.0 six==1.16.0 sniffio==1.3.0 starlette==0.22.0 tomli==2.0.1 types-redis==4.3.21.6 typing_extensions==4.4.0 uvicorn==0.20.0