AsyncFileSystem
Section titled “AsyncFileSystem”class AsyncFileSystem()Provides file system operations within a Sandbox.
This class implements a high-level interface to file system operations that can be performed within a Daytona Sandbox.
AsyncFileSystem.__init__
Section titled “AsyncFileSystem.__init__”def __init__(api_client: FileSystemApi)Initializes a new FileSystem instance.
Arguments:
api_clientFileSystemApi - API client for Sandbox file system operations.
AsyncFileSystem.create_folder
Section titled “AsyncFileSystem.create_folder”@intercept_errors(message_prefix="Failed to create folder: ")@with_instrumentation()async def create_folder(path: str, mode: str) -> NoneCreates a new directory in the Sandbox at the specified path with the given permissions.
Arguments:
pathstr - Path where the folder should be created. Relative paths are resolved based on the sandbox working directory.modestr - Folder permissions in octal format (e.g., “755” for rwxr-xr-x).
Example:
# Create a directory with standard permissionsawait sandbox.fs.create_folder("workspace/data", "755")
# Create a private directoryawait sandbox.fs.create_folder("workspace/secrets", "700")AsyncFileSystem.delete_file
Section titled “AsyncFileSystem.delete_file”@intercept_errors(message_prefix="Failed to delete file: ")@with_instrumentation()async def delete_file(path: str, recursive: bool = False) -> NoneDeletes a file from the Sandbox.
Arguments:
pathstr - Path to the file to delete. Relative paths are resolved based on the sandbox working directory.recursivebool - If the file is a directory, this must be true to delete it.
Example:
# Delete a fileawait sandbox.fs.delete_file("workspace/data/old_file.txt")AsyncFileSystem.download_file
Section titled “AsyncFileSystem.download_file”@overloadasync def download_file(remote_path: str, timeout: int = 30 * 60) -> bytesDownloads a file from the Sandbox. Returns the file contents as a bytes object. This method is useful when you want to load the file into memory without saving it to disk. It can only be used for smaller files.
Arguments:
remote_pathstr - Path to the file in the Sandbox. Relative paths are resolved based on the sandbox working directory.timeoutint - Timeout for the download operation in seconds. 0 means no timeout. Default is 30 minutes.
Returns:
bytes- The file contents as a bytes object.
Example:
# Download and save a file locallycontent = await sandbox.fs.download_file("workspace/data/file.txt")with open("local_copy.txt", "wb") as f: f.write(content)
# Download and process text contentcontent = await sandbox.fs.download_file("workspace/data/config.json")config = json.loads(content.decode('utf-8'))AsyncFileSystem.download_file
Section titled “AsyncFileSystem.download_file”@overloadasync def download_file(remote_path: str, local_path: str, timeout: int = 30 * 60) -> NoneDownloads a file from the Sandbox and saves it to a local file using stream. This method is useful when you want to download larger files that may not fit into memory.
Arguments:
remote_pathstr - Path to the file in the Sandbox. Relative paths are resolved based on the sandbox working directory.local_pathstr - Path to save the file locally.timeoutint - Timeout for the download operation in seconds. 0 means no timeout. Default is 30 minutes.
Example:
local_path = "local_copy.txt"await sandbox.fs.download_file("tmp/large_file.txt", local_path)size_mb = os.path.getsize(local_path) / 1024 / 1024print(f"Size of the downloaded file {local_path}: {size_mb} MB")AsyncFileSystem.download_file_stream
Section titled “AsyncFileSystem.download_file_stream”@intercept_errors(message_prefix="Failed to download file: ")@with_instrumentation()async def download_file_stream( remote_path: str, timeout: int = 30 * 60, on_progress: Callable[[DownloadProgress], Awaitable[None] | None] | None = None, cancel_event: CancelEvent | None = None) -> AsyncIterator[bytes]Downloads a single file from the Sandbox as a stream without buffering the entire file into memory. Returns an async iterator that yields file content in chunks, which can be piped directly to an HTTP response, written to a file incrementally, or processed on the fly.
Arguments:
remote_pathstr - Path to the file in the Sandbox. Relative paths are resolved based on the sandbox working directory.timeoutint - Timeout for the download operation in seconds. 0 means no timeout. Default is 30 minutes.on_progressCallable[[DownloadProgress], Awaitable[None] | None] | None - Optional callback invoked with cumulative bytes received and total bytes, when known, as the download progresses. May be either a regular function or anasync defcoroutine — coroutine returns are awaited before the next chunk is yielded. Default is None.cancel_eventCancelEvent | None - Optionalasyncio.Event-compatible token. When set during streaming, the next chunk raisesDaytonaErrorand the underlying HTTP connection is closed. Standardasyncio.CancelledErrorfrom task cancellation is also honoured automatically by the generator.
Returns:
AsyncIterator[bytes]- An async iterator yielding chunks of file content as bytes.
Raises:
DaytonaError- If the file does not exist, access is denied, or the download is cancelled viacancel_event.
Example:
# Stream to a local file without loading into memoryasync with aiofiles.open("local_copy.bin", "wb") as f: async for chunk in await sandbox.fs.download_file_stream("workspace/large-file.bin"): await f.write(chunk)
# Cancel a download from another coroutineimport asynciocancel = asyncio.Event()asyncio.get_running_loop().call_later(5.0, cancel.set)async for chunk in await sandbox.fs.download_file_stream("workspace/big.bin", cancel_event=cancel): process(chunk)AsyncFileSystem.download_files
Section titled “AsyncFileSystem.download_files”@intercept_errors(message_prefix="Failed to download files: ")@with_instrumentation()async def download_files(files: list[FileDownloadRequest], timeout: int = 30 * 60) -> list[FileDownloadResponse]Downloads multiple files from the Sandbox. If the files already exist locally, they will be overwritten.
Arguments:
fileslist[FileDownloadRequest] - List of files to download.timeoutint - Timeout for the download operation in seconds. 0 means no timeout. Default is 30 minutes.
Returns:
list[FileDownloadResponse]- List of download results.
Raises:
Exception- Only if the request itself fails (network issues, invalid request/response, etc.). Individual file download errors are returned inFileDownloadResponse.error. When the daemon provides structured per-file metadata, it is also available inFileDownloadResponse.error_details.
Example:
# Download multiple filesresults = await sandbox.fs.download_files([ FileDownloadRequest(source="tmp/data.json"), FileDownloadRequest(source="tmp/config.json", destination="local_config.json")])for result in results: if result.error: print(f"Error downloading {result.source}: {result.error}") elif result.result: print(f"Downloaded {result.source} to {result.result}")AsyncFileSystem.find_files
Section titled “AsyncFileSystem.find_files”@intercept_errors(message_prefix="Failed to find files: ")@with_instrumentation()async def find_files(path: str, pattern: str) -> list[Match]Searches for files containing a pattern, similar to the grep command.
Arguments:
pathstr - Path to the file or directory to search. If the path is a directory, the search will be performed recursively. Relative paths are resolved based on the sandbox working directory.patternstr - Search pattern to match against file contents.
Returns:
list[Match]- List of matches found in files. Each Match object includes:- file: Path to the file containing the match
- line: The line number where the match was found
- content: The matching line content
Example:
# Search for TODOs in Python filesmatches = await sandbox.fs.find_files("workspace/src", "TODO:")for match in matches: print(f"{match.file}:{match.line}: {match.content.strip()}")AsyncFileSystem.get_file_info
Section titled “AsyncFileSystem.get_file_info”@intercept_errors(message_prefix="Failed to get file info: ")@with_instrumentation()async def get_file_info(path: str) -> FileInfoGets detailed information about a file or directory, including its size, permissions, and timestamps.
Arguments:
pathstr - Path to the file or directory. Relative paths are resolved based on the sandbox working directory.
Returns:
FileInfo- Detailed file information including:- name: File name
- is_dir: Whether the path is a directory
- size: File size in bytes
- mode: File permissions
- mod_time: Last modification timestamp
- permissions: File permissions in octal format
- owner: File owner
- group: File group
Example:
# Get file metadatainfo = await sandbox.fs.get_file_info("workspace/data/file.txt")print(f"Size: {info.size} bytes")print(f"Modified: {info.mod_time}")print(f"Mode: {info.mode}")
# Check if path is a directoryinfo = await sandbox.fs.get_file_info("workspace/data")if info.is_dir: print("Path is a directory")AsyncFileSystem.list_files
Section titled “AsyncFileSystem.list_files”@intercept_errors(message_prefix="Failed to list files: ")@with_instrumentation()async def list_files(path: str) -> list[FileInfo]Lists files and directories in a given path and returns their information, similar to the ls -l command.
Arguments:
pathstr - Path to the directory to list contents from. Relative paths are resolved based on the sandbox working directory.
Returns:
list[FileInfo]- List of file and directory information. Each FileInfo object includes the same fields as described in get_file_info().
Example:
# List directory contentsfiles = await sandbox.fs.list_files("workspace/data")
# Print files and their sizesfor file in files: if not file.is_dir: print(f"{file.name}: {file.size} bytes")
# List only directoriesdirs = [f for f in files if f.is_dir]print("Subdirectories:", ", ".join(d.name for d in dirs))AsyncFileSystem.move_files
Section titled “AsyncFileSystem.move_files”@intercept_errors(message_prefix="Failed to move files: ")@with_instrumentation()async def move_files(source: str, destination: str) -> NoneMoves or renames a file or directory. The parent directory of the destination must exist.
Arguments:
sourcestr - Path to the source file or directory. Relative paths are resolved based on the sandbox working directory.destinationstr - Path to the destination. Relative paths are resolved based on the sandbox working directory.
Example:
# Rename a fileawait sandbox.fs.move_files( "workspace/data/old_name.txt", "workspace/data/new_name.txt")
# Move a file to a different directoryawait sandbox.fs.move_files( "workspace/data/file.txt", "workspace/archive/file.txt")
# Move a directoryawait sandbox.fs.move_files( "workspace/old_dir", "workspace/new_dir")AsyncFileSystem.replace_in_files
Section titled “AsyncFileSystem.replace_in_files”@intercept_errors(message_prefix="Failed to replace in files: ")@with_instrumentation()async def replace_in_files(files: list[str], pattern: str, new_value: str) -> list[ReplaceResult]Performs search and replace operations across multiple files.
Arguments:
fileslist[str] - List of file paths to perform replacements in. Relative paths are resolved based on the sandbox working directory.patternstr - Pattern to search for.new_valuestr - Text to replace matches with.
Returns:
list[ReplaceResult]- List of results indicating replacements made in each file. Each ReplaceResult includes:- file: Path to the modified file
- success: Whether the operation was successful
- error: Error message if the operation failed
Example:
# Replace in specific filesresults = await sandbox.fs.replace_in_files( files=["workspace/src/file1.py", "workspace/src/file2.py"], pattern="old_function", new_value="new_function")
# Print resultsfor result in results: if result.success: print(f"{result.file}: {result.success}") else: print(f"{result.file}: {result.error}")AsyncFileSystem.search_files
Section titled “AsyncFileSystem.search_files”@intercept_errors(message_prefix="Failed to search files: ")@with_instrumentation()async def search_files(path: str, pattern: str) -> SearchFilesResponseSearches for files and directories whose names match the specified pattern. The pattern can be a simple string or a glob pattern.
Arguments:
pathstr - Path to the root directory to start search from. Relative paths are resolved based on the sandbox working directory.patternstr - Pattern to match against file names. Supports glob patterns (e.g., “*.py” for Python files).
Returns:
SearchFilesResponse- Search results containing:- files: List of matching file and directory paths
Example:
# Find all Python filesresult = await sandbox.fs.search_files("workspace", "*.py")for file in result.files: print(file)
# Find files with specific prefixresult = await sandbox.fs.search_files("workspace/data", "test_*")print(f"Found {len(result.files)} test files")AsyncFileSystem.set_file_permissions
Section titled “AsyncFileSystem.set_file_permissions”@intercept_errors(message_prefix="Failed to set file permissions: ")@with_instrumentation()async def set_file_permissions(path: str, mode: str | None = None, owner: str | None = None, group: str | None = None) -> NoneSets permissions and ownership for a file or directory. Any of the parameters can be None to leave that attribute unchanged.
Arguments:
pathstr - Path to the file or directory. Relative paths are resolved based on the sandbox working directory.modestr | None - File mode/permissions in octal format (e.g., “644” for rw-r—r—).ownerstr | None - User owner of the file.groupstr | None - Group owner of the file.
Example:
# Make a file executableawait sandbox.fs.set_file_permissions( path="workspace/scripts/run.sh", mode="755" # rwxr-xr-x)
# Change file ownerawait sandbox.fs.set_file_permissions( path="workspace/data/file.txt", owner="daytona", group="daytona")AsyncFileSystem.upload_file
Section titled “AsyncFileSystem.upload_file”@overloadasync def upload_file(file: bytes, remote_path: str, timeout: int = 30 * 60) -> NoneUploads a file to the specified path in the Sandbox. If a file already exists at the destination path, it will be overwritten. This method is useful when you want to upload small files that fit into memory.
Arguments:
filebytes - File contents as a bytes object.remote_pathstr - Path to the destination file. Relative paths are resolved based on the sandbox working directory.timeoutint - Timeout for the upload operation in seconds. 0 means no timeout. Default is 30 minutes.
Example:
# Upload a text filecontent = b"Hello, World!"await sandbox.fs.upload_file(content, "tmp/hello.txt")
# Upload a local filewith open("local_file.txt", "rb") as f: content = f.read()await sandbox.fs.upload_file(content, "tmp/file.txt")
# Upload binary dataimport jsondata = {"key": "value"}content = json.dumps(data).encode('utf-8')await sandbox.fs.upload_file(content, "tmp/config.json")AsyncFileSystem.upload_file
Section titled “AsyncFileSystem.upload_file”@overloadasync def upload_file(local_path: str, remote_path: str, timeout: int = 30 * 60) -> NoneUploads a file from the local file system to the specified path in the Sandbox. If a file already exists at the destination path, it will be overwritten. This method uses streaming to upload the file, so it is useful when you want to upload larger files that may not fit into memory.
Arguments:
local_pathstr - Path to the local file to upload.remote_pathstr - Path to the destination file in the Sandbox. Relative paths are resolved based on the sandbox working directory.timeoutint - Timeout for the upload operation in seconds. 0 means no timeout. Default is 30 minutes.
Example:
await sandbox.fs.upload_file("local_file.txt", "tmp/large_file.txt")AsyncFileSystem.upload_files
Section titled “AsyncFileSystem.upload_files”@intercept_errors(message_prefix="Failed to upload files: ")@with_instrumentation()async def upload_files(files: list[FileUpload], timeout: int = 30 * 60) -> NoneUploads multiple files to the Sandbox. If files already exist at the destination paths, they will be overwritten.
Arguments:
fileslist[FileUpload] - List of files to upload.timeoutint - Timeout for the upload operation in seconds. 0 means no timeout. Default is 30 minutes.
Example:
# Upload multiple text filesfiles = [ FileUpload( source=b"Content of file 1", destination="/tmp/file1.txt" ), FileUpload( source="workspace/data/file2.txt", destination="/tmp/file2.txt" ), FileUpload( source=b'{"key": "value"}', destination="/tmp/config.json" )]await sandbox.fs.upload_files(files)AsyncFileSystem.upload_file_stream
Section titled “AsyncFileSystem.upload_file_stream”@intercept_errors(message_prefix="Failed to upload file: ")@with_instrumentation()async def upload_file_stream(source: bytes | bytearray | str | io.IOBase | AsyncIterable[bytes] | object, remote_path: str, timeout: int = 30 * 60, on_progress: Callable[[UploadProgress], Awaitable[None] | None] | None = None, cancel_event: CancelEvent | None = None) -> NoneUploads a single file to the Sandbox using true streaming, with optional progress tracking and cancellation. Memory usage stays flat regardless of source size. The HTTP layer uses chunked transfer encoding, so the source’s natural EOF terminates the upload — no advance size is needed.
Arguments:
-
source- Data to upload. Accepts:bytes/bytearray— uploaded from memory.str— treated as a local file path and read in chunks.- sync file-like (anything with
.read(n) -> bytes) — streamed as-is. - async file-like (anything with
async def read(n) -> bytes, e.g. anaiofileshandle) — streamed without ever blocking the loop. AsyncIterable[bytes]— yielded chunks are forwarded to the wire.
-
remote_pathstr - Destination path in the Sandbox. -
timeoutint - Timeout in seconds. 0 means no timeout. Default is 30 minutes. -
on_progressCallable[[UploadProgress], Awaitable[None] | None] | None - Optional callback invoked with cumulative bytes sent. May be sync orasync defwhen paired with an async source (async file-like orAsyncIterable[bytes]). Sync sources (bytes,strpath, sync file-like) require a sync callback because the underlying httpx multipart serializer pulls bytes through a synchronous.read(); passing an async callback alongside a sync source raisesDaytonaError. -
cancel_eventCancelEvent | None - Optionalasyncio.Event-compatible token. When set during streaming, the next chunk raisesDaytonaErrorand tears down the request. Standardasyncio.CancelledErrorfrom task cancellation is also honoured automatically.
Raises:
DaytonaError- If the upload fails or is cancelled viacancel_event.
Example:
import aiofiles, asynciocancel = asyncio.Event()async with aiofiles.open("large_dataset.csv", "rb") as f: await sandbox.fs.upload_file_stream( f, "tmp/dataset.csv", on_progress=lambda p: print(f"{p.bytes_sent} bytes sent"), cancel_event=cancel, )CancelEvent
Section titled “CancelEvent”@runtime_checkableclass CancelEvent(Protocol)Duck-typed cancellation token. Compatible with threading.Event and
asyncio.Event (both expose is_set()). When supplied to a streaming
download, the next chunk read after the event becomes set raises
DaytonaError, closing the underlying HTTP connection.
FileUpload
Section titled “FileUpload”@dataclassclass FileUpload()Represents a file to be uploaded to the Sandbox.
Attributes:
sourcebytes | str - File contents as a bytes object or a local file path. If a bytes object is provided, make sure it fits into memory, otherwise use the local file path which content will be streamed to the Sandbox.destinationstr - Absolute destination path in the Sandbox. Relative paths are resolved based on the sandbox working directory.
DownloadProgress
Section titled “DownloadProgress”@dataclassclass DownloadProgress()Progress information for a streaming download.
Attributes:
bytes_receivedint - Cumulative bytes received so far.total_bytesint | None - Total bytes expected, if known.
UploadProgress
Section titled “UploadProgress”@dataclassclass UploadProgress()Progress information for a streaming upload.
Attributes:
bytes_sentint - Cumulative bytes sent so far.
FileDownloadRequest
Section titled “FileDownloadRequest”@dataclassclass FileDownloadRequest()Represents a request to download a single file from the Sandbox.
Attributes:
sourcestr - Source path in the Sandbox. Relative paths are resolved based on the user’s root directory.destinationstr | None - Destination path in the local filesystem where the file content will be streamed to. If not provided, the file will be downloaded in the bytes buffer (might cause memory issues if the file is large).
FileDownloadResponse
Section titled “FileDownloadResponse”@dataclassclass FileDownloadResponse()Represents the response to a single file download request.
Attributes:
sourcestr - The original source path requested for download.resultstr | bytes | None - The download result - file path (if destination provided in the request) or bytes content (if no destination in the request), None if failed or no data received.errorstr | None - Error message if the download failed, None if successful.error_detailsFileDownloadErrorDetails | None - Structured error metadata when the server provides it.
FileDownloadErrorDetails
Section titled “FileDownloadErrorDetails”@dataclassclass FileDownloadErrorDetails()Structured error metadata for a failed bulk file download item.
create_file_download_error
Section titled “create_file_download_error”def create_file_download_error(response: FileDownloadResponse) -> DaytonaErrorCreate the appropriate Daytona exception for a failed file download response.
raise_if_stream_error
Section titled “raise_if_stream_error”def raise_if_stream_error(remote_path: str, error_text: str | None, error_details: FileDownloadErrorDetails | None, received_file_data: bool) -> NoneRaise the appropriate error after streaming a single-file multipart download.
parse_file_download_error_payload
Section titled “parse_file_download_error_payload”def parse_file_download_error_payload( payload: bytes, content_type: str | None) -> tuple[str, FileDownloadErrorDetails | None]Parse a bulk-download error part into the legacy message and structured metadata.