Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions backend/src/functions/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,17 @@ async def generate_code(input: GenerateCodeInput) -> GenerateCodeOutput:
return GenerateCodeOutput(dockerfile=data.dockerfile, files=files_list)


def is_safe_path(base_dir, file_path):
"""Check if a file path is safely contained within a base directory."""
# Normalize paths
base_dir = os.path.abspath(base_dir)
joined_path = os.path.abspath(os.path.join(base_dir, file_path))

# Check if the joined path starts with the base dir
return (joined_path.startswith(base_dir + os.path.sep) or
joined_path == base_dir)


@dataclass
class RunCodeInput:
dockerfile: str
Expand Down Expand Up @@ -174,11 +185,28 @@ async def run_locally(input: RunCodeInput) -> RunCodeOutput:

# Write each file
for file_item in input.files:
file_path = os.path.join(run_folder, file_item["filename"])
# Get the original filename
original_filename = file_item["filename"]

# Check if the path is safe (no path traversal)
if not is_safe_path(run_folder, original_filename):
log.warning(f"Path traversal attempt detected in filename: {original_filename}")
# Use a safe alternative: replace the filename with just its basename
safe_filename = os.path.basename(original_filename)
if not safe_filename: # In case basename is empty
safe_filename = "unnamed_file"
file_path = os.path.join(run_folder, safe_filename)
log.info(f"Using safe alternative: {safe_filename}")
else:
file_path = os.path.join(run_folder, original_filename)

# Create directory structure if needed
os.makedirs(os.path.dirname(file_path), exist_ok=True)

# Write the file
with open(file_path, "w", encoding="utf-8") as ff:
ff.write(file_item["content"])
log.info(f"Writing file {file_item['filename']} to {file_path}")
log.info(f"Writing file {original_filename} to {file_path}")

# Now run docker build, connecting to Docker-in-Docker at DOCKER_HOST
build_cmd = ["docker", "build", "-t", "myapp", run_folder]
Expand Down Expand Up @@ -237,4 +265,4 @@ async def validate_output(input: ValidateOutputInput) -> ValidateOutputOutput:
data = result.parsed
updated_files = [{"filename": f.filename, "content": f.content} for f in data.files] if data.files is not None else None

return ValidateOutputOutput(result=data.result, dockerfile=data.dockerfile, files=updated_files)
return ValidateOutputOutput(result=data.result, dockerfile=data.dockerfile, files=updated_files)