fix: add validation to avoid path-traversal vulnerabilities (#755)
* fix: add validation to avoid path-traversal vulnerabilities * fix: update init value is_safe Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * refactor: extract zip check * fix: dont need to check relative path * fix: disable check zip file (zipfile have taken it) --------- Co-authored-by: kan_cin <kan@cinnamon.is> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: phv2312 <kat87yb@gmail.com>
This commit is contained in:
parent
ec1f6abdc4
commit
37cdc28ceb
|
@ -1059,15 +1059,18 @@ class FileIndexPage(BasePage):
|
||||||
"""Handle zip files"""
|
"""Handle zip files"""
|
||||||
zip_files = [file for file in files if file.endswith(".zip")]
|
zip_files = [file for file in files if file.endswith(".zip")]
|
||||||
remaining_files = [file for file in files if not file.endswith("zip")]
|
remaining_files = [file for file in files if not file.endswith("zip")]
|
||||||
|
errors: list[str] = []
|
||||||
|
|
||||||
# Clean-up <zip_dir> before unzip to remove old files
|
# Clean-up <zip_dir> before unzip to remove old files
|
||||||
shutil.rmtree(zip_dir, ignore_errors=True)
|
shutil.rmtree(zip_dir, ignore_errors=True)
|
||||||
|
|
||||||
|
# Unzip
|
||||||
for zip_file in zip_files:
|
for zip_file in zip_files:
|
||||||
# Prepare new zip output dir, separated for each files
|
# Prepare new zip output dir, separated for each files
|
||||||
basename = os.path.splitext(os.path.basename(zip_file))[0]
|
basename = os.path.splitext(os.path.basename(zip_file))[0]
|
||||||
zip_out_dir = os.path.join(zip_dir, basename)
|
zip_out_dir = os.path.join(zip_dir, basename)
|
||||||
os.makedirs(zip_out_dir, exist_ok=True)
|
os.makedirs(zip_out_dir, exist_ok=True)
|
||||||
|
|
||||||
with zipfile.ZipFile(zip_file, "r") as zip_ref:
|
with zipfile.ZipFile(zip_file, "r") as zip_ref:
|
||||||
zip_ref.extractall(zip_out_dir)
|
zip_ref.extractall(zip_out_dir)
|
||||||
|
|
||||||
|
@ -1084,7 +1087,7 @@ class FileIndexPage(BasePage):
|
||||||
if n_zip_file > 0:
|
if n_zip_file > 0:
|
||||||
print(f"Update zip files: {n_zip_file}")
|
print(f"Update zip files: {n_zip_file}")
|
||||||
|
|
||||||
return remaining_files
|
return remaining_files, errors
|
||||||
|
|
||||||
def index_fn(
|
def index_fn(
|
||||||
self, files, urls, reindex: bool, settings, user_id
|
self, files, urls, reindex: bool, settings, user_id
|
||||||
|
@ -1100,20 +1103,22 @@ class FileIndexPage(BasePage):
|
||||||
"""
|
"""
|
||||||
if urls:
|
if urls:
|
||||||
files = [it.strip() for it in urls.split("\n")]
|
files = [it.strip() for it in urls.split("\n")]
|
||||||
errors = []
|
errors = self.validate_urls(files)
|
||||||
else:
|
else:
|
||||||
if not files:
|
if not files:
|
||||||
gr.Info("No uploaded file")
|
gr.Info("No uploaded file")
|
||||||
yield "", ""
|
yield "", ""
|
||||||
return
|
return
|
||||||
|
files, unzip_errors = self._may_extract_zip(
|
||||||
|
files, flowsettings.KH_ZIP_INPUT_DIR
|
||||||
|
)
|
||||||
|
errors = self.validate_files(files)
|
||||||
|
errors.extend(unzip_errors)
|
||||||
|
|
||||||
files = self._may_extract_zip(files, flowsettings.KH_ZIP_INPUT_DIR)
|
if errors:
|
||||||
|
gr.Warning(", ".join(errors))
|
||||||
errors = self.validate(files)
|
yield "", ""
|
||||||
if errors:
|
return
|
||||||
gr.Warning(", ".join(errors))
|
|
||||||
yield "", ""
|
|
||||||
return
|
|
||||||
|
|
||||||
gr.Info(f"Start indexing {len(files)} files...")
|
gr.Info(f"Start indexing {len(files)} files...")
|
||||||
|
|
||||||
|
@ -1569,7 +1574,7 @@ class FileIndexPage(BasePage):
|
||||||
selected_item["files"],
|
selected_item["files"],
|
||||||
)
|
)
|
||||||
|
|
||||||
def validate(self, files: list[str]):
|
def validate_files(self, files: list[str]):
|
||||||
"""Validate if the files are valid"""
|
"""Validate if the files are valid"""
|
||||||
paths = [Path(file) for file in files]
|
paths = [Path(file) for file in files]
|
||||||
errors = []
|
errors = []
|
||||||
|
@ -1598,6 +1603,14 @@ class FileIndexPage(BasePage):
|
||||||
|
|
||||||
return errors
|
return errors
|
||||||
|
|
||||||
|
def validate_urls(self, urls: list[str]):
|
||||||
|
"""Validate if the urls are valid"""
|
||||||
|
errors = []
|
||||||
|
for url in urls:
|
||||||
|
if not url.startswith("http") and not url.startswith("https"):
|
||||||
|
errors.append(f"Invalid url `{url}`")
|
||||||
|
return errors
|
||||||
|
|
||||||
|
|
||||||
class FileSelector(BasePage):
|
class FileSelector(BasePage):
|
||||||
"""File selector UI in the Chat page"""
|
"""File selector UI in the Chat page"""
|
||||||
|
|
Loading…
Reference in New Issue
Block a user