Skip to content

NimTechnology

Trình bày các công nghệ CLOUD một cách dễ hiểu.

  • Kubernetes & Container
    • Docker
    • Kubernetes
      • Ingress
      • Pod
    • Helm Chart
    • Isito-EnvoyFilter
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Vault
    • Longhorn – Storage
    • VictoriaMetrics
    • MetalLB
    • Kong Gateway
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Spinnaker
    • Jenkins
    • Harbor
    • TeamCity
    • Git
      • Bitbucket
  • Coding
    • DevSecOps
    • Terraform
      • GCP – Google Cloud
      • AWS – Amazon Web Service
    • Golang
    • Laravel
    • Python
    • Jquery & JavaScript
    • Selenium
  • Log, Monitor & Tracing
    • DataDog
    • Prometheus
    • Grafana
    • ELK
      • Kibana
      • Logstash
  • BareMetal
    • NextCloud
  • Toggle search form

[Python] Python connects MongoDB

Posted on September 15, 2023 By nim No Comments on [Python] Python connects MongoDB

Đầu tiên là 1 vài lệnh ban đầu

sudo apt install python3.10-venv
sudo python3 -m venv fastapi-mongodb/.venv
source .venv/bin/activate

pip3.10 install -r fastapi-mongodb/requirements.txt

uvicorn main:app --reload
curl -X GET "http://localhost:8000/users" -H "accept: application/json"

code:

import motor.motor_asyncio
from fastapi import FastAPI, Response, Cookie, Request, status
from pydantic import BaseModel, EmailStr
from fastapi.encoders import jsonable_encoder
from bson import ObjectId
from pprint import pprint
from passlib.context import CryptContext
from fastapi.security import HTTPBasic, HTTPBasicCredentials

### CONSTANT
MONGODB_URL = "mongodb://127.0.0.1:27017/"
COOKIE_KEY = "0e7cdba83c30045082e0af5ff21c23180a862b7e32a53f2f5072ad606f95"
APP_SESSIONS = {}

### App
app = FastAPI()
security = HTTPBasic()
pwd_context = CryptContext(schemes=["bcrypt"])

### MongoDB
client = motor.motor_asyncio.AsyncIOMotorClient(MONGODB_URL)
db = client.itforvn
collection_users = db.get_collection("users")


class User(BaseModel):
    fullname: str
    email: EmailStr
    password: str

    class Config:
        schema_extra = {
            "example": {
                "fullname": "John Doe",
                "email": "john@gmail.com",
                "password": "***",
            }
        }


class UserLogin(BaseModel):
    email: EmailStr
    password: str


def ResponseModel(data, message):
    return {"data": [data], "code": 200, "message": message}


def ErrorResponseModel(error, code, message):
    return {"error": error, "code": code, "message": message}


def user_helper(user) -> dict:
    return {
        "id": str(user["_id"]),
        "fullname": user["fullname"],
        "email": user["email"],
    }


class Hasher:
    @staticmethod
    def verify_password(plain_password, hashed_password):
        return pwd_context.verify(plain_password, hashed_password)

    @staticmethod
    def get_password_hash(password):
        return pwd_context.hash(password)

    @staticmethod
    def get_cookie_hash(email):
        return pwd_context.hash(COOKIE_KEY + email)


## User register - hashed password
@app.post("/users/register")
async def users_create(user: User, response: Response):
    data = jsonable_encoder(user)
    password_hash = Hasher.get_password_hash(data["password"])
    data["password"] = password_hash

    try:
        add_user = await collection_users.insert_one(data)
        new_user = await collection_users.find_one({"_id": add_user.inserted_id})
        return ResponseModel(user_helper(new_user), "User added successfully.")
    except:
        return ErrorResponseModel("An error occurred.", 404, "User doesn't exist.")


# @app.post("/users/login")
# async def users_login(
#     data: UserLogin,
#     request: Request,
#     response: Response,
# ):
#     login_credentials = data.dict()
#     _u_email = login_credentials["email"]
#     _u_pass = login_credentials["password"]
#     print(Hasher.get_password_hash(_u_pass))
#     try:
#         user = await collection_users.find_one({"email": _u_email})
#         user_password = user["password"]
#         print(f"hashed pass: {user_password}")

#         if Hasher.verify_password(_u_pass, user_password):
#             return {"message": "login success"}
#         else:
#             return {"message": "login fail"}
#     except:
#         return ErrorResponseModel("An error occurred.", 400, "Bad request!")


@app.get("/protected")
def users_protected(request: Request, response: Response):
    try:
        email = request.cookies.get("demo_email")
        pprint(email)
        login_cookie = request.cookies.get("demo_session")

        if email and login_cookie and APP_SESSIONS[email] == login_cookie:
            return {"message": "this is protected resource. enjoy!"}
        else:
            return ErrorResponseModel("An error occurred.", 403, "Request forbidden!")
    except:
        return ErrorResponseModel("An error occurred.", 403, "Request forbidden!")


@app.post("/users/login")
async def users_login(
    data: UserLogin,
    request: Request,
    response: Response,
):
    login_credential = data.dict()
    _u_email = login_credential["email"]
    _u_pass = login_credential["password"]

    try:
        user = await collection_users.find_one({"email": _u_email})
        user_password = user["password"]

        if Hasher.verify_password(_u_pass, user_password):
            ### Check if user logged in by cookie
            try:
                login_cookie = request.cookies.get("demo_session")
                if login_cookie and APP_SESSIONS[_u_email] == login_cookie:
                    return {"message": "your are already login"}
            except:
                ## Detect fraud
                del APP_SESSIONS[_u_email]
                return ErrorResponseModel("An error occurred.", 400, "Bad request!")

            ### If not logged in
            cookie_hash = Hasher.get_cookie_hash(_u_email)
            APP_SESSIONS[_u_email] = cookie_hash
            response.set_cookie("demo_session", cookie_hash)
            response.set_cookie("demo_email", _u_email)

            return {"message": "login success"}
        else:
            return {"message": "login fail"}
    except:
        return ErrorResponseModel("An error occurred.", 400, "Bad request!")


@app.get("/users/logout")
def users_logout(request: Request, response: Response):
    try:
        email = request.cookies.get("demo_email")
        del APP_SESSIONS[email]
        response.delete_cookie("demo_email")
        response.delete_cookie("demo_session")
        return {"message": "goodbye"}
    except:
        return ErrorResponseModel("An error occurred.", 400, "Bad request!")


@app.get("/users")
async def user_list():
    users = []
    async for user in collection_users.find():
        users.append(user_helper(user))

    if users:
        return ResponseModel(users, "Users data retrieved successfully")

    return ResponseModel(users, "Empty list returned")


@app.get("/users/{id}")
async def user_get(id: str):
    try:
        user = await collection_users.find_one({"_id": ObjectId(id)})
    except:
        return ErrorResponseModel("An error occurred.", 400, "Invalid request")

    if user:
        return user_helper(user)
    else:
        return ErrorResponseModel("An error occurred.", 404, "User doesn't exist.")


class UpdateUser(BaseModel):
    fullname: str | None = None

    class Config:
        schema_extra = {"example": {"fullname": "John Doe"}}


@app.put("/users/{id}")
async def user_update(id: str, req: UpdateUser):
    data = {k: v for k, v in req.dict().items() if v is not None}
    try:
        user = await collection_users.find_one({"_id": ObjectId(id)})
        if user:
            updated_user = await collection_users.update_one(
                {"_id": ObjectId(id)}, {"$set": data}
            )
            if updated_user:
                return ResponseModel(
                    "User with ID: {} name update is successful".format(id),
                    "User updated successfully",
                )

            return ErrorResponseModel(
                "An error occurred",
                404,
                "There was an error updating the user data.",
            )
    except:
        return ErrorResponseModel("An error occurred.", 400, "Invalid request")


@app.delete("/users/{id}")
async def user_delete(id: str):
    try:
        user = await collection_users.find_one({"_id": ObjectId(id)})
        if user:
            await collection_users.delete_one({"_id": ObjectId(id)})
            return ResponseModel(
                "User with ID: {} removed".format(id), "User deleted successfully"
            )

        return ErrorResponseModel(
            "An error occurred", 404, "User with id {0} doesn't exist".format(id)
        )
    except:
        return ErrorResponseModel(
            "An error occurred", 400, "User with id {0} is not valid".format(id)
        )

file requirements.txt

anyio==3.6.2
black==22.10.0
bson==0.5.10
click==8.1.3
dnspython==2.2.1
email-validator==1.1.2
fastapi==0.88.0
gunicorn==20.1.0
h11==0.14.0
idna==3.4
more-itertools==8.14.0
motor==3.1.1
mypy-extensions==0.4.3
passlib==1.7.4
pathspec==0.10.2
platformdirs==2.5.4
pptree==3.1
pydantic==1.10.2
pymongo==4.3.3
python-dateutil==2.8.2
python-ulid==1.1.0
six==1.16.0
sniffio==1.3.0
starlette==0.22.0
tomli==2.0.1
types-redis==4.3.21.6
typing_extensions==4.4.0
uvicorn==0.20.0
Python

Post navigation

Previous Post: [Prometheus] filter or allowlist metrics before sending to the remote storage via remote_write in Prometheus
Next Post: [Docker] Setup Docker for Windows Containers (NO Docker Desktop Needed!)

More Related Articles

[Python] what mean if __name__==”__main__”: Python
[Python] Look into FastAPI to contribute backend by Python Python
[Python] The eval() function automatically calculates a string Python
[Python] Look into “for else” Python
[Python] use Try/Except in python Python
[Python] OOP Python

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Tham Gia Group DevOps nhé!
Để Nim có nhiều động lực ra nhiều bài viết.
Để nhận được những thông báo mới nhất.

Recent Posts

  • [Monitoring] Understanding Different Types of Metrics in Monitoring Systems: Gauge, Counter, Histogram, and Summary December 6, 2023
  • [Golang/Protocol Buffers] Binarilize data by Proto Buf December 5, 2023
  • [Helm] Fail when running helm upgrade November 30, 2023
  • [AWS/EKS] Unlocking Simplicity and Security of Amazon EKS Pod Identity. November 30, 2023
  • [Gitleaks/njsscan/semgrep] Secure Your code by CAST and SAST tools November 27, 2023

Archives

  • December 2023
  • November 2023
  • October 2023
  • September 2023
  • August 2023
  • July 2023
  • June 2023
  • May 2023
  • April 2023
  • March 2023
  • February 2023
  • January 2023
  • December 2022
  • November 2022
  • October 2022
  • September 2022
  • August 2022
  • July 2022
  • June 2022
  • May 2022
  • April 2022
  • March 2022
  • February 2022
  • January 2022
  • December 2021
  • November 2021
  • October 2021
  • September 2021
  • August 2021
  • July 2021
  • June 2021

Categories

  • BareMetal
    • NextCloud
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Git
      • Bitbucket
    • Harbor
    • Jenkins
    • Spinnaker
    • TeamCity
  • Coding
    • DevSecOps
    • Golang
    • Jquery & JavaScript
    • Laravel
    • Python
    • Selenium
    • Terraform
      • AWS – Amazon Web Service
      • GCP – Google Cloud
  • Kubernetes & Container
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Docker
    • Helm Chart
    • Isito-EnvoyFilter
    • Kong Gateway
    • Kubernetes
      • Ingress
      • Pod
    • Longhorn – Storage
    • MetalLB
    • Vault
    • VictoriaMetrics
  • Log, Monitor & Tracing
    • DataDog
    • ELK
      • Kibana
      • Logstash
    • Fluent
    • Grafana
    • Prometheus
  • Uncategorized
  • Admin

Copyright © 2023 NimTechnology.