Example
Sending email with FastAPI-Mail¶
Standard way of sending email with FastAPI¶
from fastapi import FastAPI
from starlette.responses import JSONResponse
from fastapi_mail import FastMail, MessageSchema, ConnectionConfig, MessageType
from pydantic import EmailStr, BaseModel
from typing import List
class EmailSchema(BaseModel):
email: List[EmailStr]
conf = ConnectionConfig(
MAIL_USERNAME = "username",
MAIL_PASSWORD = "**********",
MAIL_FROM = "test@email.com",
MAIL_PORT = 587,
MAIL_SERVER = "mail server",
MAIL_FROM_NAME="Desired Name",
MAIL_STARTTLS = True,
MAIL_SSL_TLS = False,
USE_CREDENTIALS = True,
VALIDATE_CERTS = True
)
app = FastAPI()
@app.post("/email")
async def simple_send(email: EmailSchema) -> JSONResponse:
html = """<p>Hi this test mail, thanks for using Fastapi-mail</p> """
message = MessageSchema(
subject="Fastapi-Mail module",
recipients=email.dict().get("email"),
body=html,
subtype=MessageType.html)
fm = FastMail(conf)
await fm.send_message(message)
return JSONResponse(status_code=200, content={"message": "email has been sent"})
Email as background task¶
@app.post("/emailbackground")
async def send_in_background(
background_tasks: BackgroundTasks,
email: EmailSchema
) -> JSONResponse:
message = MessageSchema(
subject="Fastapi mail module",
recipients=email.dict().get("email"),
body="Simple background task",
subtype=MessageType.plain)
fm = FastMail(conf)
background_tasks.add_task(fm.send_message,message)
return JSONResponse(status_code=200, content={"message": "email has been sent"})
Sending files¶
@app.post("/file")
async def send_file(
background_tasks: BackgroundTasks,
file: UploadFile = File(...),
email:EmailStr = Form(...)
) -> JSONResponse:
message = MessageSchema(
subject="Fastapi mail module",
recipients=[email],
body="Simple background task",
subtype=MessageType.html,
attachments=[file])
fm = FastMail(conf)
background_tasks.add_task(fm.send_message,message)
return JSONResponse(status_code=200, content={"message": "email has been sent"})
Using Jinja2 HTML Templates¶
You can enable Jinja2 HTML Template emails by setting the TEMPLATE_FOLDER
configuration option, and supplying a
value (which is just the name of the template file within the TEMPLATE_FOLDER
dir) for the template_name
parameter
in FastMail.send_message()
. You then can pass a Dict as the template_body
property of your MessageSchema
object:
class EmailSchema(BaseModel):
email: List[EmailStr]
body: Dict[str, Any]
conf = ConnectionConfig(
MAIL_USERNAME = "YourUsername",
MAIL_PASSWORD = "strong_password",
MAIL_FROM = "your@email.com",
MAIL_PORT = 587,
MAIL_SERVER = "your mail server",
MAIL_STARTTLS = True,
MAIL_SSL_TLS = False,
TEMPLATE_FOLDER = Path(__file__).parent / 'templates',
)
@app.post("/email")
async def send_with_template(email: EmailSchema) -> JSONResponse:
message = MessageSchema(
subject="Fastapi-Mail module",
recipients=email.dict().get("email"),
template_body=email.dict().get("body"),
subtype=MessageType.html,
)
fm = FastMail(conf)
await fm.send_message(message, template_name="email_template.html")
return JSONResponse(status_code=200, content={"message": "email has been sent"})
For example, assume we pass a template_body
of:
{
"first_name": "Fred",
"last_name": "Fredsson"
}
We can reference the variables in our Jinja templates as per normal:
...
<span>Hello, {{ first_name }}!</span>
...
Legacy Behaviour (<= 0.4.0)¶
The original behaviour in <= 0.4.0 was to wrap the Dict you provide in a variable named body
when it was provided to
Jinja behind the scenes. In these versions, you can then access your dict in your template like so:
...
<span>Hello, body.first_name !</span>
...
As you can see our keys in our dict are no longer the top level, they are part of the body
variable. Nesting works
as per normal below this level also.
Customizing attachments by headers and MIME type¶
Used for example for referencing Content-ID images in html of email
message = MessageSchema(
subject='Fastapi-Mail module',
recipients=recipients,
body="<img src='cid:logo_image@fastapi-mail'>",
subtype=MessageType.html,
attachments=[
{
"file": "/path/to/file.png",
"headers": {
"Content-ID": "<logo_image@fastapi-mail>",
"Content-Disposition": "inline; filename=\"file.png\"", # For inline images only
},
"mime_type": "image",
"mime_subtype": "png",
}
],
)
fm = FastMail(conf)
await fm.send_message(message)
Adding custom SMTP headers¶
message = MessageSchema(
subject='Fastapi-Mail module',
recipients=recipients,
headers={"your custom header": "your custom value"}
)
fm = FastMail(conf)
await fm.send_message(message)
Guide for email utils¶
The utility allows you to check temporary email addresses, you can block any email or domain. You can connect Redis to save and check email addresses. If you do not provide a Redis configuration, then the utility will save it in the list or set by default.
Check disposable email address¶
async def default_checker():
checker = DefaultChecker() # you can pass source argument for your own email domains
await checker.fetch_temp_email_domains() # require to fetch temporary email domains
return checker
@app.get('/email/disposable')
async def simple_send(
domain: str = Query(...),
checker: DefaultChecker = Depends(default_checker)
) -> JSONResponse:
if await checker.is_disposable(domain):
return JSONResponse(status_code=400, content={'message': 'this is disposable domain'})
return JSONResponse(status_code=200, content={'message': 'email has been sent'})
Add disposable email address¶
@app.post('/email/disposable')
async def add_disp_domain(
domains: list = Body(...,embed=True),
checker: DefaultChecker = Depends(default_checker)
) -> JSONResponse:
res = await checker.add_temp_domain(domains)
return JSONResponse(status_code=200, content={'result': res})
Add domain to blocked list¶
@app.post('/email/blocked/domains')
async def block_domain(
domain: str = Query(...),
checker: DefaultChecker = Depends(default_checker)
) -> JSONResponse:
await checker.blacklist_add_domain(domain)
return JSONResponse(status_code=200, content={'message': f'{domain} added to blacklist'})
Check domain blocked or not¶
@app.get('/email/blocked/domains')
async def get_blocked_domain(
domain: str = Query(...),
checker: DefaultChecker = Depends(default_checker)
) -> JSONResponse:
res = await checker.is_blocked_domain(domain)
return JSONResponse(status_code=200, content={"result": res})
Add email address to blocked list¶
@app.post('/email/blocked/address')
async def block_address(
email: str = Query(...),
checker: DefaultChecker = Depends(default_checker)
) -> JSONResponse:
await checker.blacklist_add_email(email)
return JSONResponse(status_code=200, content={"result": True})
Check email blocked or not¶
@app.get('/email/blocked/address')
async def get_block_address(
email: str = Query(...),
checker: DefaultChecker = Depends(default_checker)) -> JSONResponse:
res = await checker.is_blocked_address(email)
return JSONResponse(status_code=200, content={"result": res})
Check MX record¶
@app.get('/email/mx')
async def test_mx(
email: EmailStr = Query(...),
full_result: bool = Query(False) ,
checker: DefaultChecker = Depends(default_checker)
) -> JSONResponse:
domain = email.split("@")[-1]
res = await checker.check_mx_record(domain,full_result)
return JSONResponse(status_code=200, content=res)
Remove email address from blocked list¶
@app.delete('/email/blocked/address')
async def del_blocked_address(
email: str = Query(...),
checker: DefaultChecker = Depends(default_checker)
) -> JSONResponse:
res = await checker.blacklist_rm_email(email)
return JSONResponse(status_code=200, content={"result": res})
Remove domain from blocked list¶
@app.delete('/email/blocked/domains')
async def del_blocked_domain(
domain: str = Query(...),
checker: DefaultChecker = Depends(default_checker)
) -> JSONResponse:
res = await checker.blacklist_rm_domain(domain)
return JSONResponse(status_code=200, content={"result": res})
Remove domain from temporary list¶
@app.delete('/email/disposable')
async def del_disp_domain(
domains: list = Body(...,embed=True),
checker: DefaultChecker = Depends(default_checker)
) -> JSONResponse:
res = await checker.blacklist_rm_temp(domains)
return JSONResponse(status_code=200, content={'result': res})
Use email utils with Redis¶
async def default_checker():
checker = DefaultChecker(db_provider="redis")
await checker.init_redis()
return checker
WhoIsXmlApi¶
from email_utils import WhoIsXmlApi
who_is = WhoIsXmlApi(token="Your access token", email="your@mailaddress.com")
print(who_is.smtp_check_()) #check smtp server
print(who_is.is_disposable()) # check email is disposable or not
print(who_is.check_mx_record()) # check domain mx records
print(who_is.free_check) # check email domain is free or not
Unittests using FastapiMail¶
Fastapi mails allows you to write unittest for your application without sending emails to non existent email address by mocking the email to be sent. To mock sending out mails, set the suppress configuraton to true. Suppress send defaults to False to prevent mocking within applications.
application.py
conf = ConnectionConfig(
MAIL_USERNAME = "YourUsername",
MAIL_PASSWORD = "strong_password",
MAIL_FROM = "your@email.com",
MAIL_PORT = 587,
MAIL_SERVER = "your mail server",
MAIL_STARTTLS = True,
MAIL_SSL_TLS = False,
TEMPLATE_FOLDER = Path(__file__).parent / 'templates',
# if no indicated SUPPRESS_SEND defaults to 0 (false) as below
# SUPPRESS_SEND=1
)
fm = FastMail(conf)
@app.post("/email")
async def simple_send(email: EmailSchema) -> JSONResponse:
message = MessageSchema(
subject="Testing",
recipients=email.dict().get("email"),
body=html,
subtype=MessageType.html,
)
await fm.send_message(message)
return JSONResponse(status_code=200, content={"message": "email has been sent"})
test.py
from starlette.testclient import TestClient
from .application import app, fm
client = TestClient(app)
def test_send_msg():
fm.config.SUPPRESS_SEND = 1
with fm.record_messages() as outbox:
payload = {"email": [ "user@example.com"]}
response = client.post("/email",json=payload)
assert response.status_code == 200
assert len(outbox) == 1
assert outbox[0]['from'] == "your@email.com"
assert outbox[0]['To'] == "user@example.com"