https://github.com/vercel/vercel-py.git
pip install vercel
This package provides both synchronous and asynchronous clients to interact with the Vercel API.
from typing import Callable
from fastapi import FastAPI, Request
from vercel.headers import geolocation, ip_address, set_headers
app = FastAPI()
@app.middleware("http")
async def vercel_context_middleware(request: Request, call_next: Callable):
set_headers(request.headers)
return await call_next(request)
@app.get("/api/headers")
async def headers_info(request: Request):
ip = ip_address(request.headers)
geo = geolocation(request)
return {"ip": ip, "geo": geo}
from vercel.cache import get_cache
def main():
cache = get_cache(namespace="demo")
cache.delete("greeting")
cache.set("greeting", {"hello": "world"}, {"ttl": 60, "tags": ["demo"]})
value = cache.get("greeting") # dict or None
cache.expire_tag("demo") # invalidate by tag
from vercel.cache import RuntimeCache
cache = RuntimeCache(namespace="demo")
def main():
cache.delete("greeting")
cache.set("greeting", {"hello": "world"}, {"ttl": 60, "tags": ["demo"]})
value = cache.get("greeting") # dict or None
cache.expire_tag("demo") # invalidate by tag
from vercel.cache.aio import get_cache
async def main():
cache = get_cache(namespace="demo")
await cache.delete("greeting")
await cache.set("greeting", {"hello": "world"}, {"ttl": 60, "tags": ["demo"]})
value = await cache.get("greeting") # dict or None
await cache.expire_tag("demo") # invalidate by tag
from vercel.cache import AsyncRuntimeCache
cache = AsyncRuntimeCache(namespace="demo")
async def main():
await await cache.delete("greeting")
await await cache.set("greeting", {"hello": "world"}, {"ttl": 60, "tags": ["demo"]})
value = await cache.get("greeting") # dict or None
await cache.expire_tag("demo") # invalidate by tag
from typing import Callable
from fastapi import FastAPI, Request
from vercel.oidc import decode_oidc_payload, get_vercel_oidc_token
# async
# from vercel.oidc.aio import get_vercel_oidc_token
app = FastAPI()
@app.middleware("http")
async def vercel_context_middleware(request: Request, call_next: Callable):
set_headers(request.headers)
return await call_next(request)
@app.get("/oidc")
def oidc():
token = get_vercel_oidc_token()
payload = decode_oidc_payload(token)
user_id = payload.get("user_id")
project_id = payload.get("project_id")
return {
"user_id": user_id,
"project_id" project_id,
}
Notes:
.vercel/project.json.Requires BLOB_READ_WRITE_TOKEN to be set as an env var or token to be set when constructing a client
BlobClient and AsyncBlobClient keep a long-lived HTTP transport for the life of
the client instance. Prefer with BlobClient(...) / async with AsyncBlobClient(...)
or call close() / aclose() explicitly when done.
from vercel.blob import BlobClient
with BlobClient() as client: # or BlobClient(token="...")
# Create a folder entry, upload a local file, list, then download
client.create_folder("examples/assets", overwrite=True)
uploaded = client.upload_file(
"./README.md",
"examples/assets/readme-copy.txt",
access="public",
content_type="text/plain",
)
listing = client.list_objects(prefix="examples/assets/")
client.download_file(uploaded.url, "/tmp/readme-copy.txt", overwrite=True)
Async usage:
import asyncio
from vercel.blob import AsyncBlobClient
async def main():
async with AsyncBlobClient() as client: # uses BLOB_READ_WRITE_TOKEN from env
# Upload bytes
uploaded = await client.put(
"examples/assets/hello.txt",
b"hello from python",
access="public",
content_type="text/plain",
)
# Inspect metadata, list, download bytes, then delete
meta = await client.head(uploaded.url)
listing = await client.list_objects(prefix="examples/assets/")
content = await client.get(uploaded.url)
await client.delete([b.url for b in listing.blobs])
asyncio.run(main())
Synchronous usage:
from vercel.blob import BlobClient
with BlobClient() as client: # or BlobClient(token="...")
# Create a folder entry, upload a local file, list, then download
client.create_folder("examples/assets", overwrite=True)
uploaded = client.upload_file(
"./README.md",
"examples/assets/readme-copy.txt",
access="public",
content_type="text/plain",
)
listing = client.list_objects(prefix="examples/assets/")
client.download_file(uploaded.url, "/tmp/readme-copy.txt", overwrite=True)
For large files, the SDK provides three approaches with different trade-offs:
The SDK handles everything automatically:
from vercel.blob import auto_multipart_upload
# Synchronous
result = auto_multipart_upload(
"large-file.bin",
large_data, # bytes, file object, or iterator
part_size=8 * 1024 * 1024, # 8MB parts (default)
)
# Asynchronous
result = await auto_multipart_upload_async(
"large-file.bin",
large_data,
)
A middle-ground that provides a clean API while giving you control over parts and concurrency:
from vercel.blob import BlobClient, create_multipart_uploader
# Create the uploader (initializes the upload)
with BlobClient() as client:
uploader = client.create_multipart_uploader(
"large-file.bin",
content_type="application/octet-stream",
)
# Upload parts (you control when and how)
parts = []
for i, chunk in enumerate(chunks, start=1):
part = uploader.upload_part(i, chunk)
parts.append(part)
# Complete the upload
result = uploader.complete(parts)
Async version with concurrent uploads:
from vercel.blob import AsyncBlobClient, create_multipart_uploader_async
async with AsyncBlobClient() as client:
uploader = await client.create_multipart_uploader("large-file.bin")
# Upload parts concurrently
tasks = [uploader.upload_part(i, chunk) for i, chunk in enumerate(chunks, start=1)]
parts = await asyncio.gather(*tasks)
# Complete
result = await uploader.complete(parts)
The uploader pattern is ideal when you:
add_random_suffix defaults to True for the uploader (matches TS SDK); manual create defaults to False.Full control over each step, but more verbose:
from vercel.blob import (
create_multipart_upload,
upload_part,
complete_multipart_upload,
)
# Phase 1: Create
resp = create_multipart_upload("large-file.bin")
upload_id = resp["uploadId"]
key = resp["key"]
# Phase 2: Upload parts
part1 = upload_part(
"large-file.bin",
chunk1,
upload_id=upload_id,
key=key,
part_number=1,
)
part2 = upload_part(
"large-file.bin",
chunk2,
upload_id=upload_id,
key=key,
part_number=2,
)
# Phase 3: Complete
result = complete_multipart_upload(
"large-file.bin",
[part1, part2],
upload_id=upload_id,
key=key,
)
See examples/multipart_uploader.py for complete working examples.
uv pip install -e .[dev]
uv run ruff format --check && uv run ruff check . && uv run mypy src && uv run pytest -v
vX.Y.Z) that matches project.version to publish via PyPI Trusted Publishing.MIT